|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- from __future__ import print_function
- import rospy
- import sys
- import math
- from perception_msgs.msg import PerceptionLocalization
- from perception_msgs.msg import PerceptionObjects
- from perception_msgs.msg import Object
- from common_msgs.msg import VehicleCmd
- from common_msgs.msg import Retrieval
- import math
- import rosbag
- import threading
- import time
- import subprocess
- from datetime import datetime
- from subprocess import Popen, PIPE
- import signal
- import os
- import datetime
- from threading import Thread
- from fnmatch import fnmatch
- from fnmatch import fnmatch, fnmatchcase
- from rosbag import Bag
- from scipy import stats
- obj_lock = threading.Lock()
- difference_lock = threading.Lock()
- ego_Steering_real=[]
- ego_throttle_real=[]
- ego_Brake_real=[]
- ego_Steering_cmd=[]
- ego_throttle_cmd=[]
- ego_Brake_cmd=[]
- obj_dic={}
- #Strg_Angle_Real_Value=0
- num_count_data_read=0
- angular_velocity_z=0
- num_count_pji_Control_Command=0
- def callback_cicv_location(data):
- global angular_velocity_z
- angular_velocity_z=data.angular_velocity_z
-
- def callback_pji_Control_Command(data):
- global ego_Steering_cmd
- global ego_throttle_cmd
- global num_count_pji_Control_Command
-
- num_count_pji_Control_Command+=1
- if num_count_pji_Control_Command == 10:
- ego_Steering_cmd.append(data.ICPV_Cmd_StrAngle)
- ego_throttle_cmd.append(data.ICPV_Cmd_AccPelPosAct)
- num_count_pji_Control_Command=0
-
-
- def callback_data_read(data):
- global Strg_Angle_Real_Value
- global ego_Steering_real
- global ego_throttle_real
- global num_count_data_read
-
- num_count_data_read+=1
- if num_count_data_read == 10:
- ego_Steering_real.append(data.ActStrWhAng)
- ego_throttle_real.append(data.AccPed2)
- num_count_data_read=0
-
- Strg_Angle_Real_Value=data.ActStrWhAng
-
- def callback_tpperception(data):
- global obj_dic
- obj_list=data.objs
- for obj in obj_list:
-
- if obj.x<=5 or abs(obj.y)>=10: # (obj.type !=1 and obj.type !=0 )
- continue
- #print(f"obj.type={obj.type},obj.id={obj.id},obj.x={obj.x},obj.y={obj.y}")
- if obj.id not in obj_dic:
- obj_dic[obj.id]=[]
- obj_dic[obj.id].append(obj.y)
- #print("----------------------------------------------------")
-
-
- def if_cutin(obj_y_list):
- global Strg_Angle_Real_Value
- global angular_velocity_z
- #print(obj_y_list)
- flag_cutin=False
- for i, obj_y in enumerate(obj_y_list):
- if abs(obj_y)>=1.1 and abs(angular_velocity_z)<=0.6:
- for j in range(len(obj_y_list)-i-1):
- if abs(obj_y_list[1+i+j])<=0.6 and abs(angular_velocity_z)<=0.6:
- flag_cutin=True
- #print('front car cut in')
- return flag_cutin
- return flag_cutin
- def if_cutout(obj_y_list):
- global Strg_Angle_Real_Value
- global angular_velocity_z
- #print(angular_velocity_z)
- #print(obj_y_list)
- #print('----------------------------------------------------------------------------')
-
- flag_cutout=False
- for i, obj_y in enumerate(obj_y_list):
- if abs(obj_y)<=0.6 and abs(angular_velocity_z)<=0.6:
- for j in range(len(obj_y_list)-i-1):
- if abs(obj_y_list[1+i+j])>=1.2 and abs(angular_velocity_z)<=0.6:
- flag_cutout=True
- #print('front car cut out')
- return flag_cutout
- return flag_cutout
- def if_difference(ego_Steering_real,ego_throttle_real,ego_Steering_cmd,ego_throttle_cmd): # 利用曼惠特尼U检验的方式判断车端数据和驾驶员控制数据是否存在显著差异
-
- difference_flag=False
- statistic_Steering, p_value_Steering = stats.mannwhitneyu(ego_Steering_real, ego_Steering_cmd)
- statistic_throttle, p_value_throttle = stats.mannwhitneyu(ego_throttle_real, ego_throttle_cmd)
- if p_value_Steering < 0.05 or p_value_throttle < 0.05:
- difference_flag=True
- return difference_flag
-
- def final_callback(event):
- global obj_dic
- global ego_Steering_real
- global ego_throttle_real
- global ego_Steering_cmd
- global ego_throttle_cmd
- cutin_flag = False
- cutout_flag = False
- flag_difference = False
- #print('ego_Steering_real=')
- #print(ego_Steering_real)
- #print('--------------------------------------------------------------')
-
- with obj_lock:
- # 先判断前车是否切入
- for obj_id, obj_value in obj_dic.items():
- if len(obj_value) <= 5:
- continue
- cutin_flag=if_cutin(obj_value)
- if cutin_flag :
- continue
-
- # 再判断前车是否切出
- for obj_id, obj_value in obj_dic.items():
- if len(obj_value) <= 5:
- continue
- cutout_flag=if_cutout(obj_value)
- if cutout_flag:
- continue
-
- with difference_lock:
- if cutin_flag:
- print('The cutting-in behavior of the front car was detected')
- flag_difference=if_difference(ego_Steering_real,ego_throttle_real,ego_Steering_cmd,ego_throttle_cmd)
- if flag_difference:
- event_label='cutin_difference'
- print('********cutin_difference********')
-
- if cutout_flag:
- print('The cutting-out behavior of the front car was detected')
- flag_difference=if_difference(ego_Steering_real,ego_throttle_real,ego_Steering_cmd,ego_throttle_cmd)
- if flag_difference:
- event_label='cutout_difference'
- print('********cutout_difference********')
- '''
- print('ego_Steering_real=')
- print(ego_Steering_real)
- print('--------------------------------------------------------------')
- print('ego_Steering_cmd=')
- print(ego_Steering_cmd)
- print('--------------------------------------------------------------')
- print('ego_throttle_real=')
- print(ego_throttle_real)
- print('--------------------------------------------------------------')
- print('ego_throttle_cmd=')
- print(ego_throttle_cmd)
- print('--------------------------------------------------------------')
- #print('----------------------------------------------------------------------------')
- '''
- #ego_Steering_real.clear()
- #ego_throttle_real.clear()
- #ego_Steering_cmd.clear()
- #ego_throttle_cmd.clear()
- ego_Steering_real=[1]
- ego_throttle_real=[1]
- ego_Steering_cmd=[1]
- ego_throttle_cmd=[1]
-
- obj_dic.clear()
-
- def listener():
- rospy.init_node('listener', anonymous=True)
- rospy.Subscriber("/cicv_location",PerceptionLocalization,callback_cicv_location)
- rospy.Subscriber("/tpperception",PerceptionObjects,callback_tpperception)
- rospy.Subscriber("/pj_control_pub",VehicleCmd,callback_pji_Control_Command)
- rospy.Subscriber("/data_read",Retrieval,callback_data_read)
- timer1 = rospy.Timer(rospy.Duration(3.5), final_callback)
- rospy.spin()
-
- if __name__ == '__main__':
- listener()
-
-
-
-
-
|