#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function import rospy import sys import math from perception_msgs.msg import PerceptionLocalization from perception_msgs.msg import PerceptionObjects from perception_msgs.msg import Object from control_msgs.msg import Jinlong_Control_Command from common_msgs.msg import Retrieval from std_msgs.msg import Bool import math import rosbag import threading import time import subprocess from datetime import datetime from subprocess import Popen, PIPE import signal import os import datetime from threading import Thread from fnmatch import fnmatch from fnmatch import fnmatch, fnmatchcase from rosbag import Bag from scipy import stats obj_lock = threading.Lock() difference_lock = threading.Lock() ego_Steering_real=[1] ego_throttle_real=[1] ego_brake_real=[1] ego_Steering_cmd=[1] ego_throttle_cmd=[1] ego_brake_cmd=[1] obj_dic={} #Strg_Angle_Real_Value=0 num_count_data_read=0 angular_velocity_z=0 num_count_jinlong_Control_Command=0 def callback_cicv_location(data): global angular_velocity_z angular_velocity_z=data.angular_velocity_z def callback_Jinlong_Control_Command(data): global ego_Steering_cmd global ego_throttle_cmd global ego_brake_cmd global num_count_jinlong_Control_Command num_count_jinlong_Control_Command+=1 if num_count_jinlong_Control_Command == 10: ego_Steering_cmd.append(data.AS_Strg_Angle_Req) ego_throttle_cmd.append(data.AS_AutoD_Accel_Pos_Req) ego_brake_cmd.append(data.AS_AutoD_BrkPelPos_Req) num_count_jinlong_Control_Command=0 def callback_data_read(data): #global Strg_Angle_Real_Value global ego_Steering_real global ego_throttle_real global ego_brake_real global num_count_data_read num_count_data_read+=1 if num_count_data_read == 10: ego_Steering_real.append(data.Strg_Angle_Real_Value) ego_throttle_real.append(data.VCU_Accel_Pos_Value) ego_brake_real.append(data.VCU_Brkpel_Pos_Value) num_count_data_read=0 #Strg_Angle_Real_Value=data.ActStrWhAng def callback_tpperception(data): global obj_dic obj_list=data.objs for obj in obj_list: if (obj.type !=1 and obj.type !=0 ) or obj.x<=5 or abs(obj.y)>=10: # (obj.type !=1 and obj.type !=0 ) continue #print(f"obj.type={obj.type},obj.id={obj.id},obj.x={obj.x},obj.y={obj.y}") if obj.id not in obj_dic: obj_dic[obj.id]=[] obj_dic[obj.id].append(obj.y) #print("----------------------------------------------------") def if_cutin(obj_y_list): global Strg_Angle_Real_Value global angular_velocity_z print(obj_y_list) flag_cutin=False for i, obj_y in enumerate(obj_y_list): if abs(obj_y)>=1.3 and abs(angular_velocity_z)<=0.6: for j in range(len(obj_y_list)-i-1): if abs(obj_y_list[1+i+j])<=0.7 and abs(angular_velocity_z)<=0.6: flag_cutin=True print('front car cut in') return flag_cutin return flag_cutin def if_cutout(obj_y_list): global Strg_Angle_Real_Value global angular_velocity_z #print(angular_velocity_z) #print(obj_y_list) #print('----------------------------------------------------------------------------') flag_cutout=False for i, obj_y in enumerate(obj_y_list): if abs(obj_y)<=0.6 and abs(angular_velocity_z)<=0.6: for j in range(len(obj_y_list)-i-1): if abs(obj_y_list[1+i+j])>=1.2 and abs(angular_velocity_z)<=0.6: flag_cutout=True print('front car cut out') return flag_cutout return flag_cutout def if_difference(ego_Steering_real,ego_throttle_real,ego_brake_real, ego_Steering_cmd,ego_throttle_cmd,ego_brake_cmd): # 利用曼惠特尼U检验的方式判断车端数据和驾驶员控制数据是否存在显著差异 difference_flag=False statistic_Steering, p_value_Steering = stats.mannwhitneyu(ego_Steering_real, ego_Steering_cmd) statistic_throttle, p_value_throttle = stats.mannwhitneyu(ego_throttle_real, ego_throttle_cmd) statistic_brake, p_value_brake = stats.mannwhitneyu(ego_brake_real, ego_brake_cmd) if p_value_Steering < 0.05 or p_value_throttle < 0.05 or p_value_brake < 0.05: difference_flag=True return difference_flag def callback_failure_lidar(data): if data.data: #为true,代表该传感器未匹配的目标过多 event_label='failurelidar' def callback_failure_radar(data): if data.data: #为true,代表该传感器未匹配的目标过多 event_label='failureradar' def callback_failure_camera(data): if data.data: #为true,代表该传感器未匹配的目标过多 event_label='failurecamera' def final_callback(event): global obj_dic global ego_Steering_real global ego_throttle_real global ego_brake_real global ego_Steering_cmd global ego_throttle_cmd global ego_brake_cmd cutin_flag = False cutout_flag = False flag_difference = False #print('ego_Steering_real=') #print(ego_Steering_real) #print('--------------------------------------------------------------') with obj_lock: # 先判断前车是否切入 for obj_id, obj_value in obj_dic.items(): if len(obj_value) <= 5: continue cutin_flag=if_cutin(obj_value) if cutin_flag : continue # 再判断前车是否切出 for obj_id, obj_value in obj_dic.items(): if len(obj_value) <= 5: continue cutout_flag=if_cutout(obj_value) if cutout_flag: continue with difference_lock: if cutin_flag: print('The cutting-in behavior of the front car was detected') flag_difference=if_difference(ego_Steering_real,ego_throttle_real,ego_brake_real, ego_Steering_cmd,ego_throttle_cmd,ego_brake_cmd) if flag_difference: event_label='cutin_difference' print('********cutin_difference********') if cutout_flag: print('The cutting-out behavior of the front car was detected') flag_difference=if_difference(ego_Steering_real,ego_throttle_real,ego_brake_real, ego_Steering_cmd,ego_throttle_cmd,ego_brake_cmd) if flag_difference: event_label='cutout_difference' print('********cutout_difference********') ''' print('ego_Steering_real=') print(ego_Steering_real) print('--------------------------------------------------------------') print('ego_Steering_cmd=') print(ego_Steering_cmd) print('--------------------------------------------------------------') print('ego_throttle_real=') print(ego_throttle_real) print('--------------------------------------------------------------') print('ego_throttle_cmd=') print(ego_throttle_cmd) print('--------------------------------------------------------------') #print('----------------------------------------------------------------------------') ''' #ego_Steering_real.clear() #ego_throttle_real.clear() #ego_Steering_cmd.clear() #ego_throttle_cmd.clear() ego_Steering_real=[1] ego_throttle_real=[1] ego_Steering_cmd=[1] ego_throttle_cmd=[1] obj_dic.clear() def listener(): rospy.init_node('listener', anonymous=True) rospy.Subscriber("/cicv_location",PerceptionLocalization,callback_cicv_location) rospy.Subscriber("/tpperception",PerceptionObjects,callback_tpperception) rospy.Subscriber("/jinlong_control_pub",Jinlong_Control_Command,callback_Jinlong_Control_Command) rospy.Subscriber("/data_read",Retrieval,callback_data_read) rospy.Subscriber("/failure/lidar",Bool,callback_failure_lidar) rospy.Subscriber("/failure/radar",Bool,callback_failure_radar) rospy.Subscriber("/failure/camera",Bool,callback_failure_camera) timer1 = rospy.Timer(rospy.Duration(4), final_callback) rospy.spin() if __name__ == '__main__': listener()