# -*- coding: utf-8 -*- import subprocess import time import oss2 from rosbag import Bag, Compression import logging import pymysql import os import urllib2 import json path1 = '/root/cicv_data_closedloop-data_processing/python2-algorithm-exam/' path2 = '/root/cicv_data_closedloop-data_processing/python2-algorithm-exam/json/' path3 = '/root/cicv_data_closedloop-data_processing/python2-algorithm-exam/bag/' path4 = '/root/cicv_data_closedloop-data_processing/python2-algorithm-exam/bag/merged/' logging.basicConfig(filename=path1 + 'log/algorithm-exam.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') key1 = 'pjisuv/' key2 = 'data/' key3 = 'data_merge/' key4 = 'data_parse/' sleep_time = 5 # 每多少秒扫描一次 compress_way = Compression.BZ2 score_script_path = '/root/competition/Final_Compet_619.py' def merge(source_bag_files, target_bag_file, target_bag_file2): try: with Bag(target_bag_file, 'w', compression=compress_way) as o: for i in range(len(source_bag_files)): with Bag(source_bag_files[i], 'r') as ib: for topic, msg, t in ib: o.write(topic, msg, t) with Bag(target_bag_file2, 'w', compression=compress_way) as o2: for i in range(len(source_bag_files)): with Bag(source_bag_files[i], 'r') as ib: for topic, msg, t in ib: if topic == '/pj_vehicle_fdb_pub': o2.write(topic, msg, t) except Exception as e: logging.exception("bag包合并报错: %s", e) def convert_unix_timestamp(timestamp): return time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(float(timestamp))) def format_timestamp(timestamp): return time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(float(timestamp))) def seconds_between_timestamps(time_begin, time_end): begin_seconds = int(time_begin) end_seconds = int(time_end) current_seconds = begin_seconds formatted_times = [] while current_seconds <= end_seconds: formatted_time = format_timestamp(current_seconds) formatted_times.append(formatted_time) current_seconds += 1 return formatted_times def parse_light_6(time_begin1, time_end1): a = 6 result1 = [] try: # 定义 API 地址 url = 'http://10.14.65.102:30087/event_rsu_rsu2cloud_spat_alias/_search' logging.info('信号灯开始时间为: %s', str(int(time_begin1) * 1000)) logging.info('信号灯结束时间为: %s', str(int(time_end1) * 1000)) # 定义请求体 data1 = { "query": { "bool": { "must": [ { "term": { "deviceId": { "value": "R-HK0511" } } }, { "range": { "timestamp": { "gte": int(time_begin1) * 1000, "lte": int(time_end1) * 1000 } } } ] } }, "size": 1000, "sort": [ { "timestamp": { "order": "asc" } } ] } # 将请求体转换为 JSON 格式 json_data = json.dumps(data1) # 发起 HTTP POST 请求 request = urllib2.Request(url, json_data) request.add_header('Content-Type', 'application/json; charset=UTF-8') # 接收响应 response = urllib2.urlopen(request) # 读取响应内容 response_data = response.read() logging.info("信号灯数据结果为:%s", response_data) # 将响应内容转换为 JSON 对象 json_response = json.loads(response_data) hits_array = list(json_response.get('hits').get('hits')) data_array = [] for hits in hits_array: _source = hits.get('_source') intersections = list(_source.get('intersections')) for intersection in intersections: intersectionTimestamp = intersection.get('intersectionTimestamp') phases = list(intersection.get('phases')) for phase in phases: if phase.get('phaseId') == a: phaseStates = list(phase.get('phaseStates')) for phaseState in phaseStates: startTime = phaseState.get('startTime') if startTime == 0: data_array.append( {'intersectionTimestamp': intersectionTimestamp, 'light': phaseState.get('light')}) currentIntersectionTimestamp = data_array[0].get('intersectionTimestamp') if data_array[0].get('light') == 2 or data_array[0].get('light') == 3: currentLightCustom = 1 else: currentLightCustom = 0 # logging.info("原始信号灯数据为 %s" % data_array) # 0 绿灯 1 红灯 for e in data_array: intersectionTimestamp = e.get('intersectionTimestamp') tempLight = e.get('light') if tempLight == 2 or tempLight == 3: # 1红灯 0绿灯 tempCurrentLightCustom = 1 else: tempCurrentLightCustom = 0 if tempCurrentLightCustom == currentLightCustom: continue else: result1.append({'beginTime': currentIntersectionTimestamp, 'endTime': intersectionTimestamp, 'light': tempCurrentLightCustom}) currentIntersectionTimestamp = intersectionTimestamp currentLightCustom = tempCurrentLightCustom if currentLightCustom == 1: result1.append( {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000, 'light': 1}) else: result1.append( {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000, 'light': 0}) except Exception as e: logging.exception('无法获取信号灯数据,默认全程绿灯 %s' % str(e)) result1.append({'beginTime': int(time_begin1) * 1000, 'endTime': int(time_end1) * 1000 + 10000, 'light': 0}) # 打印JSON文件的内容 logging.info("信号灯数据为 %s" % result1) # 定义要保存为JSON文件的列表 # 指定要保存的JSON文件路径 file_path = path2 + str(time_begin1) + '_6.json' # 将列表保存为JSON文件 with open(file_path, "w") as json_file: json.dump(result1, json_file) logging.info("信号灯数据文件路径为 %s" % json_file) return file_path def parse_light_13(time_begin1, time_end1): a = 13 result1 = [] try: # 定义 API 地址 url = 'http://10.14.65.102:30087/event_rsu_rsu2cloud_spat_alias/_search' logging.info('信号灯开始时间为: %s', str(int(time_begin1) * 1000)) logging.info('信号灯结束时间为: %s', str(int(time_end1) * 1000)) # 定义请求体 data1 = { "query": { "bool": { "must": [ { "term": { "deviceId": { "value": "R-HK0511" } } }, { "range": { "timestamp": { "gte": int(time_begin1) * 1000, "lte": int(time_end1) * 1000 } } } ] } }, "size": 1000, "sort": [ { "timestamp": { "order": "asc" } } ] } # 将请求体转换为 JSON 格式 json_data = json.dumps(data1) # 发起 HTTP POST 请求 request = urllib2.Request(url, json_data) request.add_header('Content-Type', 'application/json; charset=UTF-8') # 接收响应 response = urllib2.urlopen(request) # 读取响应内容 response_data = response.read() # 将响应内容转换为 JSON 对象 json_response = json.loads(response_data) hits_array = list(json_response.get('hits').get('hits')) data_array = [] for hits in hits_array: _source = hits.get('_source') intersections = list(_source.get('intersections')) for intersection in intersections: intersectionTimestamp = intersection.get('intersectionTimestamp') phases = list(intersection.get('phases')) for phase in phases: if phase.get('phaseId') == a: phaseStates = list(phase.get('phaseStates')) for phaseState in phaseStates: startTime = phaseState.get('startTime') if startTime == 0: data_array.append( {'intersectionTimestamp': intersectionTimestamp, 'light': phaseState.get('light')}) currentIntersectionTimestamp = data_array[0].get('intersectionTimestamp') if data_array[0].get('light') == 2 or data_array[0].get('light') == 3: currentLightCustom = 1 else: currentLightCustom = 0 # logging.info("原始信号灯数据为 %s" % data_array) # 0 绿灯 1 红灯 for e in data_array: intersectionTimestamp = e.get('intersectionTimestamp') tempLight = e.get('light') if tempLight == 2 or tempLight == 3: # 1红灯 0绿灯 tempCurrentLightCustom = 1 else: tempCurrentLightCustom = 0 if tempCurrentLightCustom == currentLightCustom: continue else: result1.append({'beginTime': currentIntersectionTimestamp, 'endTime': intersectionTimestamp, 'light': tempCurrentLightCustom}) currentIntersectionTimestamp = intersectionTimestamp currentLightCustom = tempCurrentLightCustom if currentLightCustom == 1: result1.append( {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000, 'light': 1}) else: result1.append( {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000, 'light': 0}) except Exception as e: logging.exception('无法获取信号灯数据,默认全程绿灯 %s' % str(e)) result1.append({'beginTime': int(time_begin1) * 1000, 'endTime': int(time_end1) * 1000 + 10000, 'light': 0}) # 打印JSON文件的内容 logging.info("信号灯数据为 %s" % result1) # 定义要保存为JSON文件的列表 # 指定要保存的JSON文件路径 file_path = path2 + str(time_begin1) + '_13.json' # 将列表保存为JSON文件 with open(file_path, "w") as json_file: json.dump(result1, json_file) logging.info("信号灯数据文件路径为 %s" % json_file) return file_path def parse_light_1(time_begin1, time_end1): a = 1 result1 = [] try: # 定义 API 地址 url = 'http://10.14.65.102:30087/event_rsu_rsu2cloud_spat_alias/_search' logging.info('信号灯开始时间为: %s', str(int(time_begin1) * 1000)) logging.info('信号灯结束时间为: %s', str(int(time_end1) * 1000)) # 定义请求体 data1 = { "query": { "bool": { "must": [ { "term": { "deviceId": { "value": "R-HK0511" } } }, { "range": { "timestamp": { "gte": int(time_begin1) * 1000, "lte": int(time_end1) * 1000 } } } ] } }, "size": 1000, "sort": [ { "timestamp": { "order": "asc" } } ] } # 将请求体转换为 JSON 格式 json_data = json.dumps(data1) # 发起 HTTP POST 请求 request = urllib2.Request(url, json_data) request.add_header('Content-Type', 'application/json; charset=UTF-8') # 接收响应 response = urllib2.urlopen(request) # 读取响应内容 response_data = response.read() # 将响应内容转换为 JSON 对象 json_response = json.loads(response_data) hits_array = list(json_response.get('hits').get('hits')) data_array = [] for hits in hits_array: _source = hits.get('_source') intersections = list(_source.get('intersections')) for intersection in intersections: intersectionTimestamp = intersection.get('intersectionTimestamp') phases = list(intersection.get('phases')) for phase in phases: if phase.get('phaseId') == a: phaseStates = list(phase.get('phaseStates')) for phaseState in phaseStates: startTime = phaseState.get('startTime') if startTime == 0: data_array.append( {'intersectionTimestamp': intersectionTimestamp, 'light': phaseState.get('light')}) currentIntersectionTimestamp = data_array[0].get('intersectionTimestamp') if data_array[0].get('light') == 2 or data_array[0].get('light') == 3: currentLightCustom = 1 else: currentLightCustom = 0 # logging.info("原始信号灯数据为 %s" % data_array) # 0 绿灯 1 红灯 for e in data_array: intersectionTimestamp = e.get('intersectionTimestamp') tempLight = e.get('light') if tempLight == 2 or tempLight == 3: # 1红灯 0绿灯 tempCurrentLightCustom = 1 else: tempCurrentLightCustom = 0 if tempCurrentLightCustom == currentLightCustom: continue else: result1.append({'beginTime': currentIntersectionTimestamp, 'endTime': intersectionTimestamp, 'light': tempCurrentLightCustom}) currentIntersectionTimestamp = intersectionTimestamp currentLightCustom = tempCurrentLightCustom if currentLightCustom == 1: result1.append( {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000, 'light': 1}) else: result1.append( {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000, 'light': 0}) except Exception as e: logging.exception('无法获取信号灯数据,默认全程绿灯 %s' % str(e)) result1.append({'beginTime': int(time_begin1) * 1000, 'endTime': int(time_end1) * 1000 + 10000, 'light': 0}) # 打印JSON文件的内容 logging.info("信号灯数据为 %s" % result1) # 定义要保存为JSON文件的列表 # 指定要保存的JSON文件路径 file_path = path2 + str(time_begin1) + '_1.json' # 将列表保存为JSON文件 with open(file_path, "w") as json_file: json.dump(result1, json_file) logging.info("信号灯数据文件路径为 %s" % json_file) return file_path ''' 1-东进口左转 2-东进口直行 3-东进口右转 4-东进口非机动车 5–南进口左转 6-南进口直行 7-南进口右转 8-南进口非机动车 9-西进口左转 10-西进口直行 11-西进口右转 12-西进口非机动车 13-北进口左转 14-北进口直行 15-北进口右转 16-北进口非机动车 ''' ''' 1.南进口直行 6 2.北进口左转 13 3.东进口左转 1 ''' if __name__ == '__main__': # 1 创建阿里云对象 auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d') # cname = 'http://open-bucket.oss.icvdc.com' # bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True) endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com' bucket = oss2.Bucket(auth, endpoint, 'open-bucket') while True: try: # 连接到MySQL数据库 conn = pymysql.connect( host='10.14.85.241', # host='36.110.106.156', port=3306, # 默认MySQL端口为3306 user='root', password='1qaz2wsx!', database='dataclosedloop', charset='utf8mb4', # 设置字符集 cursorclass=pymysql.cursors.DictCursor, # 设置游标类型为字典型 connect_timeout=60000, # 连接超时,单位是秒 read_timeout=60000, # 读取超时,单位是秒 write_timeout=60000 # 写入超时,单位是秒 ) # 创建一个游标对象 with conn.cursor() as cursor: # 编写你的SQL查询语句,这里以连接查询为例 sql = """ SELECT * FROM exam where score_online = 0 and (details is null or details = '') and end_time != '2006-01-02 15:04:05' """ # 执行查询 cursor.execute(sql) # 获取查询结果 result = cursor.fetchall() if len(result) == 0: logging.info("未查询到待评分的记录。") time.sleep(1) continue conn.close() # 输出结果 for row in result: equipment_no = str(row['equipment_no']) # 车辆编号 logging.info("即将对队伍 %s 进行评分。", row['team_name'].encode('utf-8')) timestamp_begin = time.mktime(row['begin_time'].timetuple()) timestamp_end = time.mktime(row['end_time'].timetuple()) time_begin_for_light = str(int(timestamp_begin)) time_begin_for_bag = str(int(timestamp_begin) - (8 * 60 * 60)) time_end_for_light = str(int(timestamp_end)) time_end_for_bag = str(int(time_end_for_light) - (8 * 60 * 60)) interval = seconds_between_timestamps(time_begin_for_light, time_end_for_light) interval_for_bag = seconds_between_timestamps(time_begin_for_bag, time_end_for_bag) logging.info('时间戳 %s 到 %s 改为自定义格式 %s ', time_begin_for_bag, time_end_for_bag, interval_for_bag) oss_bags = [] local_bags = [] local_delete_list = [] for time_prefix in interval_for_bag: # 添加待下载的bag列表 for obj1 in oss2.ObjectIterator(bucket, prefix='competition/' + equipment_no + '/' + time_prefix): oss_bags.append(str(obj1.key)) # 下载准备merge logging.info("合并区间内的bag包:%s -> %s,共 %s 个", interval_for_bag[0], interval_for_bag[len(interval_for_bag) - 1], len(interval_for_bag)) for oss_bag in oss_bags: bag_name = str(oss_bag).split('/')[-1] equipment_dir = path3 + equipment_no if not os.path.exists(equipment_dir) or not os.path.isdir(equipment_dir): os.makedirs(equipment_dir) local_bag_path = equipment_dir + '/' + bag_name bucket.get_object_to_file(oss_bag, local_bag_path) local_bags.append(local_bag_path) local_delete_list.append(local_bag_path) # 3 合并 merged_bag_dir = path4 + str(interval[0]) + '/' if not os.path.exists(merged_bag_dir) or not os.path.isdir(merged_bag_dir): os.makedirs(merged_bag_dir) merged_bag_path = merged_bag_dir + str(interval[0]) + '.bag' merged_bag_path2 = merged_bag_dir + str(interval[0]) + '_2.bag' merge(local_bags, merged_bag_path, merged_bag_path2) # 4 清理临时文件 if len(local_delete_list) > 0: for local_delete in local_delete_list: try: os.remove(local_delete) except Exception as e: logging.exception("删除本地临时文件报错: %s" % str(e)) logging.info("合并后的数据包为 %s" % merged_bag_path) # 5 查询信号灯数据 try: json_path_6 = parse_light_6(time_begin_for_light, time_end_for_light) json_path_13 = parse_light_13(time_begin_for_light, time_end_for_light) json_path_1 = parse_light_1(time_begin_for_light, time_end_for_light) except Exception as e: logging.exception("未获取信号灯数据: %s" % str(e)) continue # 6 调用bag解析程序 command1 = 'rosrun carcompetition jiexi_node ' + merged_bag_path logging.info("生成csv: %s" % str(command1)) process1 = subprocess.Popen(command1, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process1.communicate() if process1.returncode == 0: print("Output:", output) else: print("Error:", error) # 7 调用评分脚本获取评分结果 # command2 = 'python /root/competition/demo.py /root/candata.csv /root/gpsdata.csv' command2 = 'python3 ' + score_script_path + ' ' + merged_bag_dir + 'candata.csv ' + merged_bag_dir + 'gpsdata.csv ' + json_path_6 + ' ' + json_path_13 + ' ' + json_path_1 + ' ' + merged_bag_path2 logging.info("执行打分命令 %s", command2) process2 = subprocess.Popen(command2, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process2.communicate() if process2.returncode == 0: text = str(output) logging.info('评分结果为:%s' % str(output)) # 使用 json.loads() 方法将 JSON 字符串解析为 Python 字典 data = json.loads(str(output)) # 8 计算 TotalScore 和 DeductedScore 字段的总和 total_score = 0 # 初始化总分数为 0 for item in data: # 遍历每个场景 # 计算当前场景的实际分数(总分数减去扣减分数) actual_score = item['TotalScore'] + item['DeductedScore'] # 累加实际分数到总分数 total_score += actual_score conn2 = pymysql.connect( host='10.14.85.241', # host='36.110.106.156', port=3306, # 默认MySQL端口为3306 user='root', password='1qaz2wsx!', database='dataclosedloop', charset='utf8mb4', # 设置字符集 cursorclass=pymysql.cursors.DictCursor, # 设置游标类型为字典型 connect_timeout=60000, # 连接超时,单位是秒 read_timeout=60000, # 读取超时,单位是秒 write_timeout=60000 # 写入超时,单位是秒 ) # 9 将解析结果回传到数据库 with conn2.cursor() as cursor2: # 编写UPDATE语句,更新指定字段 sql = """ UPDATE exam SET score_online = %s, details = %s WHERE id = %s """ cursor2.execute(sql, (total_score, text, row['id'])) conn2.commit() conn2.close() else: print("执行命令报错:", error) logging.info("队伍 %s 评分结束。", row['team_name'].encode('utf-8')) # try: # os.remove(merged_bag_path) # except Exception as e: # logging.exception("删除本地临时文件报错: %s" % str(e)) except Exception as e: logging.exception("报错: %s" % str(e)) continue