#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import print_function import rospy import sys import math from perception_msgs.msg import PerceptionLocalization from perception_msgs.msg import PerceptionObjects from perception_msgs.msg import Object from common_msgs.msg import VehicleCmd from common_msgs.msg import Retrieval import math import rosbag import threading import time import subprocess from datetime import datetime from subprocess import Popen, PIPE import signal import os import datetime from threading import Thread from fnmatch import fnmatch from fnmatch import fnmatch, fnmatchcase from rosbag import Bag from scipy import stats obj_lock = threading.Lock() difference_lock = threading.Lock() ego_Steering_real=[] ego_throttle_real=[] ego_Brake_real=[] ego_Steering_cmd=[] ego_throttle_cmd=[] ego_Brake_cmd=[] obj_dic={} #Strg_Angle_Real_Value=0 num_count_data_read=0 angular_velocity_z=0 num_count_pji_Control_Command=0 def callback_cicv_location(data): global angular_velocity_z angular_velocity_z=data.angular_velocity_z def callback_pji_Control_Command(data): global ego_Steering_cmd global ego_throttle_cmd global num_count_pji_Control_Command num_count_pji_Control_Command+=1 if num_count_pji_Control_Command == 10: ego_Steering_cmd.append(data.ICPV_Cmd_StrAngle) ego_throttle_cmd.append(data.ICPV_Cmd_AccPelPosAct) num_count_pji_Control_Command=0 def callback_data_read(data): global Strg_Angle_Real_Value global ego_Steering_real global ego_throttle_real global num_count_data_read num_count_data_read+=1 if num_count_data_read == 10: ego_Steering_real.append(data.ActStrWhAng) ego_throttle_real.append(data.AccPed2) num_count_data_read=0 Strg_Angle_Real_Value=data.ActStrWhAng def callback_tpperception(data): global obj_dic obj_list=data.objs for obj in obj_list: if obj.x<=5 or abs(obj.y)>=10: # (obj.type !=1 and obj.type !=0 ) continue #print(f"obj.type={obj.type},obj.id={obj.id},obj.x={obj.x},obj.y={obj.y}") if obj.id not in obj_dic: obj_dic[obj.id]=[] obj_dic[obj.id].append(obj.y) #print("----------------------------------------------------") def if_cutin(obj_y_list): global Strg_Angle_Real_Value global angular_velocity_z #print(obj_y_list) flag_cutin=False for i, obj_y in enumerate(obj_y_list): if abs(obj_y)>=1.1 and abs(angular_velocity_z)<=0.6: for j in range(len(obj_y_list)-i-1): if abs(obj_y_list[1+i+j])<=0.6 and abs(angular_velocity_z)<=0.6: flag_cutin=True #print('front car cut in') return flag_cutin return flag_cutin def if_cutout(obj_y_list): global Strg_Angle_Real_Value global angular_velocity_z #print(angular_velocity_z) #print(obj_y_list) #print('----------------------------------------------------------------------------') flag_cutout=False for i, obj_y in enumerate(obj_y_list): if abs(obj_y)<=0.6 and abs(angular_velocity_z)<=0.6: for j in range(len(obj_y_list)-i-1): if abs(obj_y_list[1+i+j])>=1.2 and abs(angular_velocity_z)<=0.6: flag_cutout=True #print('front car cut out') return flag_cutout return flag_cutout def if_difference(ego_Steering_real,ego_throttle_real,ego_Steering_cmd,ego_throttle_cmd): # 利用曼惠特尼U检验的方式判断车端数据和驾驶员控制数据是否存在显著差异 difference_flag=False statistic_Steering, p_value_Steering = stats.mannwhitneyu(ego_Steering_real, ego_Steering_cmd) statistic_throttle, p_value_throttle = stats.mannwhitneyu(ego_throttle_real, ego_throttle_cmd) if p_value_Steering < 0.05 or p_value_throttle < 0.05: difference_flag=True return difference_flag def final_callback(event): global obj_dic global ego_Steering_real global ego_throttle_real global ego_Steering_cmd global ego_throttle_cmd cutin_flag = False cutout_flag = False flag_difference = False #print('ego_Steering_real=') #print(ego_Steering_real) #print('--------------------------------------------------------------') with obj_lock: # 先判断前车是否切入 for obj_id, obj_value in obj_dic.items(): if len(obj_value) <= 5: continue cutin_flag=if_cutin(obj_value) # 再判断前车是否切出 for obj_id, obj_value in obj_dic.items(): if len(obj_value) <= 5: continue cutout_flag=if_cutout(obj_value) with difference_lock: if cutin_flag: print('The cutting-in behavior of the front car was detected') flag_difference=if_difference(ego_Steering_real,ego_throttle_real,ego_Steering_cmd,ego_throttle_cmd) if flag_difference: event_label='cutin_difference' print('********cutin_difference********') if cutout_flag: print('The cutting-out behavior of the front car was detected') flag_difference=if_difference(ego_Steering_real,ego_throttle_real,ego_Steering_cmd,ego_throttle_cmd) if flag_difference: event_label='cutout_difference' print('********cutout_difference********') ''' print('ego_Steering_real=') print(ego_Steering_real) print('--------------------------------------------------------------') print('ego_Steering_cmd=') print(ego_Steering_cmd) print('--------------------------------------------------------------') print('ego_throttle_real=') print(ego_throttle_real) print('--------------------------------------------------------------') print('ego_throttle_cmd=') print(ego_throttle_cmd) print('--------------------------------------------------------------') #print('----------------------------------------------------------------------------') ''' #ego_Steering_real.clear() #ego_throttle_real.clear() #ego_Steering_cmd.clear() #ego_throttle_cmd.clear() ego_Steering_real=[1] ego_throttle_real=[1] ego_Steering_cmd=[1] ego_throttle_cmd=[1] obj_dic.clear() def listener(): rospy.init_node('listener', anonymous=True) rospy.Subscriber("/cicv_location",PerceptionLocalization,callback_cicv_location) # done rospy.Subscriber("/tpperception",PerceptionObjects,callback_tpperception) rospy.Subscriber("/pj_control_pub",VehicleCmd,callback_pji_Control_Command) rospy.Subscriber("/data_read",Retrieval,callback_data_read) timer1 = rospy.Timer(rospy.Duration(3.5), final_callback) rospy.spin() if __name__ == '__main__': listener()