123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628 |
- # -*- coding: utf-8 -*-
- import subprocess
- import time
- import oss2
- from rosbag import Bag, Compression
- import logging
- import pymysql
- import os
- import urllib2
- import json
- path1 = '/root/cicv_data_closedloop-data_processing/python2-algorithm-exam/'
- path2 = '/root/cicv_data_closedloop-data_processing/python2-algorithm-exam/json/'
- path3 = '/root/cicv_data_closedloop-data_processing/python2-algorithm-exam/bag/'
- path4 = '/root/cicv_data_closedloop-data_processing/python2-algorithm-exam/bag/merged/'
- logging.basicConfig(filename=path1 + 'log/algorithm-exam.log', level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s')
- key1 = 'pjisuv/'
- key2 = 'data/'
- key3 = 'data_merge/'
- key4 = 'data_parse/'
- sleep_time = 5 # 每多少秒扫描一次
- compress_way = Compression.BZ2
- score_script_path = '/root/competition/Final_Compet_619.py'
- def merge(source_bag_files, target_bag_file, target_bag_file2):
- try:
- with Bag(target_bag_file, 'w', compression=compress_way) as o:
- for i in range(len(source_bag_files)):
- with Bag(source_bag_files[i], 'r') as ib:
- for topic, msg, t in ib:
- o.write(topic, msg, t)
- with Bag(target_bag_file2, 'w', compression=compress_way) as o2:
- for i in range(len(source_bag_files)):
- with Bag(source_bag_files[i], 'r') as ib:
- for topic, msg, t in ib:
- if topic == '/pj_vehicle_fdb_pub':
- o2.write(topic, msg, t)
- except Exception as e:
- logging.exception("bag包合并报错: %s", e)
- def convert_unix_timestamp(timestamp):
- return time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(float(timestamp)))
- def format_timestamp(timestamp):
- return time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(float(timestamp)))
- def seconds_between_timestamps(time_begin, time_end):
- begin_seconds = int(time_begin)
- end_seconds = int(time_end)
- current_seconds = begin_seconds
- formatted_times = []
- while current_seconds <= end_seconds:
- formatted_time = format_timestamp(current_seconds)
- formatted_times.append(formatted_time)
- current_seconds += 1
- return formatted_times
- def parse_light_6(time_begin1, time_end1):
- a = 6
- result1 = []
- try:
- # 定义 API 地址
- url = 'http://10.14.65.102:30087/event_rsu_rsu2cloud_spat_alias/_search'
- logging.info('信号灯开始时间为: %s', str(int(time_begin1) * 1000))
- logging.info('信号灯结束时间为: %s', str(int(time_end1) * 1000))
- # 定义请求体
- data1 = {
- "query": {
- "bool": {
- "must": [
- {
- "term": {
- "deviceId": {
- "value": "R-HK0511"
- }
- }
- },
- {
- "range": {
- "timestamp": {
- "gte": int(time_begin1) * 1000,
- "lte": int(time_end1) * 1000
- }
- }
- }
- ]
- }
- },
- "size": 1000,
- "sort": [
- {
- "timestamp": {
- "order": "asc"
- }
- }
- ]
- }
- # 将请求体转换为 JSON 格式
- json_data = json.dumps(data1)
- # 发起 HTTP POST 请求
- request = urllib2.Request(url, json_data)
- request.add_header('Content-Type', 'application/json; charset=UTF-8')
- # 接收响应
- response = urllib2.urlopen(request)
- # 读取响应内容
- response_data = response.read()
- logging.info("信号灯数据结果为:%s", response_data)
- # 将响应内容转换为 JSON 对象
- json_response = json.loads(response_data)
- hits_array = list(json_response.get('hits').get('hits'))
- data_array = []
- for hits in hits_array:
- _source = hits.get('_source')
- intersections = list(_source.get('intersections'))
- for intersection in intersections:
- intersectionTimestamp = intersection.get('intersectionTimestamp')
- phases = list(intersection.get('phases'))
- for phase in phases:
- if phase.get('phaseId') == a:
- phaseStates = list(phase.get('phaseStates'))
- for phaseState in phaseStates:
- startTime = phaseState.get('startTime')
- if startTime == 0:
- data_array.append(
- {'intersectionTimestamp': intersectionTimestamp, 'light': phaseState.get('light')})
- currentIntersectionTimestamp = data_array[0].get('intersectionTimestamp')
- if data_array[0].get('light') == 2 or data_array[0].get('light') == 3:
- currentLightCustom = 1
- else:
- currentLightCustom = 0
- # logging.info("原始信号灯数据为 %s" % data_array)
- # 0 绿灯 1 红灯
- for e in data_array:
- intersectionTimestamp = e.get('intersectionTimestamp')
- tempLight = e.get('light')
- if tempLight == 2 or tempLight == 3: # 1红灯 0绿灯
- tempCurrentLightCustom = 1
- else:
- tempCurrentLightCustom = 0
- if tempCurrentLightCustom == currentLightCustom:
- continue
- else:
- result1.append({'beginTime': currentIntersectionTimestamp, 'endTime': intersectionTimestamp,
- 'light': tempCurrentLightCustom})
- currentIntersectionTimestamp = intersectionTimestamp
- currentLightCustom = tempCurrentLightCustom
- if currentLightCustom == 1:
- result1.append(
- {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000,
- 'light': 1})
- else:
- result1.append(
- {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000,
- 'light': 0})
- except Exception as e:
- logging.exception('无法获取信号灯数据,默认全程绿灯 %s' % str(e))
- result1.append({'beginTime': int(time_begin1) * 1000, 'endTime': int(time_end1) * 1000 + 10000, 'light': 0})
- # 打印JSON文件的内容
- logging.info("信号灯数据为 %s" % result1)
- # 定义要保存为JSON文件的列表
- # 指定要保存的JSON文件路径
- file_path = path2 + str(time_begin1) + '_6.json'
- # 将列表保存为JSON文件
- with open(file_path, "w") as json_file:
- json.dump(result1, json_file)
- logging.info("信号灯数据文件路径为 %s" % json_file)
- return file_path
- def parse_light_13(time_begin1, time_end1):
- a = 13
- result1 = []
- try:
- # 定义 API 地址
- url = 'http://10.14.65.102:30087/event_rsu_rsu2cloud_spat_alias/_search'
- logging.info('信号灯开始时间为: %s', str(int(time_begin1) * 1000))
- logging.info('信号灯结束时间为: %s', str(int(time_end1) * 1000))
- # 定义请求体
- data1 = {
- "query": {
- "bool": {
- "must": [
- {
- "term": {
- "deviceId": {
- "value": "R-HK0511"
- }
- }
- },
- {
- "range": {
- "timestamp": {
- "gte": int(time_begin1) * 1000,
- "lte": int(time_end1) * 1000
- }
- }
- }
- ]
- }
- },
- "size": 1000,
- "sort": [
- {
- "timestamp": {
- "order": "asc"
- }
- }
- ]
- }
- # 将请求体转换为 JSON 格式
- json_data = json.dumps(data1)
- # 发起 HTTP POST 请求
- request = urllib2.Request(url, json_data)
- request.add_header('Content-Type', 'application/json; charset=UTF-8')
- # 接收响应
- response = urllib2.urlopen(request)
- # 读取响应内容
- response_data = response.read()
- # 将响应内容转换为 JSON 对象
- json_response = json.loads(response_data)
- hits_array = list(json_response.get('hits').get('hits'))
- data_array = []
- for hits in hits_array:
- _source = hits.get('_source')
- intersections = list(_source.get('intersections'))
- for intersection in intersections:
- intersectionTimestamp = intersection.get('intersectionTimestamp')
- phases = list(intersection.get('phases'))
- for phase in phases:
- if phase.get('phaseId') == a:
- phaseStates = list(phase.get('phaseStates'))
- for phaseState in phaseStates:
- startTime = phaseState.get('startTime')
- if startTime == 0:
- data_array.append(
- {'intersectionTimestamp': intersectionTimestamp, 'light': phaseState.get('light')})
- currentIntersectionTimestamp = data_array[0].get('intersectionTimestamp')
- if data_array[0].get('light') == 2 or data_array[0].get('light') == 3:
- currentLightCustom = 1
- else:
- currentLightCustom = 0
- # logging.info("原始信号灯数据为 %s" % data_array)
- # 0 绿灯 1 红灯
- for e in data_array:
- intersectionTimestamp = e.get('intersectionTimestamp')
- tempLight = e.get('light')
- if tempLight == 2 or tempLight == 3: # 1红灯 0绿灯
- tempCurrentLightCustom = 1
- else:
- tempCurrentLightCustom = 0
- if tempCurrentLightCustom == currentLightCustom:
- continue
- else:
- result1.append({'beginTime': currentIntersectionTimestamp, 'endTime': intersectionTimestamp,
- 'light': tempCurrentLightCustom})
- currentIntersectionTimestamp = intersectionTimestamp
- currentLightCustom = tempCurrentLightCustom
- if currentLightCustom == 1:
- result1.append(
- {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000,
- 'light': 1})
- else:
- result1.append(
- {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000,
- 'light': 0})
- except Exception as e:
- logging.exception('无法获取信号灯数据,默认全程绿灯 %s' % str(e))
- result1.append({'beginTime': int(time_begin1) * 1000, 'endTime': int(time_end1) * 1000 + 10000, 'light': 0})
- # 打印JSON文件的内容
- logging.info("信号灯数据为 %s" % result1)
- # 定义要保存为JSON文件的列表
- # 指定要保存的JSON文件路径
- file_path = path2 + str(time_begin1) + '_13.json'
- # 将列表保存为JSON文件
- with open(file_path, "w") as json_file:
- json.dump(result1, json_file)
- logging.info("信号灯数据文件路径为 %s" % json_file)
- return file_path
- def parse_light_1(time_begin1, time_end1):
- a = 1
- result1 = []
- try:
- # 定义 API 地址
- url = 'http://10.14.65.102:30087/event_rsu_rsu2cloud_spat_alias/_search'
- logging.info('信号灯开始时间为: %s', str(int(time_begin1) * 1000))
- logging.info('信号灯结束时间为: %s', str(int(time_end1) * 1000))
- # 定义请求体
- data1 = {
- "query": {
- "bool": {
- "must": [
- {
- "term": {
- "deviceId": {
- "value": "R-HK0511"
- }
- }
- },
- {
- "range": {
- "timestamp": {
- "gte": int(time_begin1) * 1000,
- "lte": int(time_end1) * 1000
- }
- }
- }
- ]
- }
- },
- "size": 1000,
- "sort": [
- {
- "timestamp": {
- "order": "asc"
- }
- }
- ]
- }
- # 将请求体转换为 JSON 格式
- json_data = json.dumps(data1)
- # 发起 HTTP POST 请求
- request = urllib2.Request(url, json_data)
- request.add_header('Content-Type', 'application/json; charset=UTF-8')
- # 接收响应
- response = urllib2.urlopen(request)
- # 读取响应内容
- response_data = response.read()
- # 将响应内容转换为 JSON 对象
- json_response = json.loads(response_data)
- hits_array = list(json_response.get('hits').get('hits'))
- data_array = []
- for hits in hits_array:
- _source = hits.get('_source')
- intersections = list(_source.get('intersections'))
- for intersection in intersections:
- intersectionTimestamp = intersection.get('intersectionTimestamp')
- phases = list(intersection.get('phases'))
- for phase in phases:
- if phase.get('phaseId') == a:
- phaseStates = list(phase.get('phaseStates'))
- for phaseState in phaseStates:
- startTime = phaseState.get('startTime')
- if startTime == 0:
- data_array.append(
- {'intersectionTimestamp': intersectionTimestamp, 'light': phaseState.get('light')})
- currentIntersectionTimestamp = data_array[0].get('intersectionTimestamp')
- if data_array[0].get('light') == 2 or data_array[0].get('light') == 3:
- currentLightCustom = 1
- else:
- currentLightCustom = 0
- # logging.info("原始信号灯数据为 %s" % data_array)
- # 0 绿灯 1 红灯
- for e in data_array:
- intersectionTimestamp = e.get('intersectionTimestamp')
- tempLight = e.get('light')
- if tempLight == 2 or tempLight == 3: # 1红灯 0绿灯
- tempCurrentLightCustom = 1
- else:
- tempCurrentLightCustom = 0
- if tempCurrentLightCustom == currentLightCustom:
- continue
- else:
- result1.append({'beginTime': currentIntersectionTimestamp, 'endTime': intersectionTimestamp,
- 'light': tempCurrentLightCustom})
- currentIntersectionTimestamp = intersectionTimestamp
- currentLightCustom = tempCurrentLightCustom
- if currentLightCustom == 1:
- result1.append(
- {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000,
- 'light': 1})
- else:
- result1.append(
- {'beginTime': currentIntersectionTimestamp, 'endTime': currentIntersectionTimestamp + 10000,
- 'light': 0})
- except Exception as e:
- logging.exception('无法获取信号灯数据,默认全程绿灯 %s' % str(e))
- result1.append({'beginTime': int(time_begin1) * 1000, 'endTime': int(time_end1) * 1000 + 10000, 'light': 0})
- # 打印JSON文件的内容
- logging.info("信号灯数据为 %s" % result1)
- # 定义要保存为JSON文件的列表
- # 指定要保存的JSON文件路径
- file_path = path2 + str(time_begin1) + '_1.json'
- # 将列表保存为JSON文件
- with open(file_path, "w") as json_file:
- json.dump(result1, json_file)
- logging.info("信号灯数据文件路径为 %s" % json_file)
- return file_path
- '''
- 1-东进口左转
- 2-东进口直行
- 3-东进口右转
- 4-东进口非机动车
- 5–南进口左转
- 6-南进口直行
- 7-南进口右转
- 8-南进口非机动车
- 9-西进口左转
- 10-西进口直行
- 11-西进口右转
- 12-西进口非机动车
- 13-北进口左转
- 14-北进口直行
- 15-北进口右转
- 16-北进口非机动车
- '''
- '''
- 1.南进口直行 6
- 2.北进口左转 13
- 3.东进口左转 1
- '''
- if __name__ == '__main__':
- # 1 创建阿里云对象
- auth = oss2.Auth('n8glvFGS25MrLY7j', 'xZ2Fozoarpfw0z28FUhtg8cu0yDc5d')
- # cname = 'http://open-bucket.oss.icvdc.com'
- # bucket = oss2.Bucket(auth, cname, 'open-bucket', is_cname=True)
- endpoint = 'oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com'
- bucket = oss2.Bucket(auth, endpoint, 'open-bucket')
- while True:
- try:
- # 连接到MySQL数据库
- conn = pymysql.connect(
- host='10.14.85.241',
- # host='36.110.106.156',
- port=3306, # 默认MySQL端口为3306
- user='root',
- password='1qaz2wsx!',
- database='dataclosedloop',
- charset='utf8mb4', # 设置字符集
- cursorclass=pymysql.cursors.DictCursor, # 设置游标类型为字典型
- connect_timeout=60000, # 连接超时,单位是秒
- read_timeout=60000, # 读取超时,单位是秒
- write_timeout=60000 # 写入超时,单位是秒
- )
- # 创建一个游标对象
- with conn.cursor() as cursor:
- # 编写你的SQL查询语句,这里以连接查询为例
- sql = """
- SELECT *
- FROM exam
- where score_online = 0
- and (details is null or details = '')
- and end_time != '2006-01-02 15:04:05'
- """
- # 执行查询
- cursor.execute(sql)
- # 获取查询结果
- result = cursor.fetchall()
- if len(result) == 0:
- logging.info("未查询到待评分的记录。")
- time.sleep(1)
- continue
- conn.close()
- # 输出结果
- for row in result:
- equipment_no = str(row['equipment_no']) # 车辆编号
- logging.info("即将对队伍 %s 进行评分。", row['team_name'].encode('utf-8'))
- timestamp_begin = time.mktime(row['begin_time'].timetuple())
- timestamp_end = time.mktime(row['end_time'].timetuple())
- time_begin_for_light = str(int(timestamp_begin))
- time_begin_for_bag = str(int(timestamp_begin) - (8 * 60 * 60))
- time_end_for_light = str(int(timestamp_end))
- time_end_for_bag = str(int(time_end_for_light) - (8 * 60 * 60))
- interval = seconds_between_timestamps(time_begin_for_light, time_end_for_light)
- interval_for_bag = seconds_between_timestamps(time_begin_for_bag, time_end_for_bag)
- logging.info('时间戳 %s 到 %s 改为自定义格式 %s ', time_begin_for_bag, time_end_for_bag,
- interval_for_bag)
- oss_bags = []
- local_bags = []
- local_delete_list = []
- for time_prefix in interval_for_bag:
- # 添加待下载的bag列表
- for obj1 in oss2.ObjectIterator(bucket,
- prefix='competition/' + equipment_no + '/' + time_prefix):
- oss_bags.append(str(obj1.key))
- # 下载准备merge
- logging.info("合并区间内的bag包:%s -> %s,共 %s 个", interval_for_bag[0],
- interval_for_bag[len(interval_for_bag) - 1],
- len(interval_for_bag))
- for oss_bag in oss_bags:
- bag_name = str(oss_bag).split('/')[-1]
- equipment_dir = path3 + equipment_no
- if not os.path.exists(equipment_dir) or not os.path.isdir(equipment_dir):
- os.makedirs(equipment_dir)
- local_bag_path = equipment_dir + '/' + bag_name
- bucket.get_object_to_file(oss_bag, local_bag_path)
- local_bags.append(local_bag_path)
- local_delete_list.append(local_bag_path)
- # 3 合并
- merged_bag_dir = path4 + str(interval[0]) + '/'
- if not os.path.exists(merged_bag_dir) or not os.path.isdir(merged_bag_dir):
- os.makedirs(merged_bag_dir)
- merged_bag_path = merged_bag_dir + str(interval[0]) + '.bag'
- merged_bag_path2 = merged_bag_dir + str(interval[0]) + '_2.bag'
- merge(local_bags, merged_bag_path, merged_bag_path2)
- # 4 清理临时文件
- if len(local_delete_list) > 0:
- for local_delete in local_delete_list:
- try:
- os.remove(local_delete)
- except Exception as e:
- logging.exception("删除本地临时文件报错: %s" % str(e))
- logging.info("合并后的数据包为 %s" % merged_bag_path)
- # 5 查询信号灯数据
- try:
- json_path_6 = parse_light_6(time_begin_for_light, time_end_for_light)
- json_path_13 = parse_light_13(time_begin_for_light, time_end_for_light)
- json_path_1 = parse_light_1(time_begin_for_light, time_end_for_light)
- except Exception as e:
- logging.exception("未获取信号灯数据: %s" % str(e))
- continue
- # 6 调用bag解析程序
- command1 = 'rosrun carcompetition jiexi_node ' + merged_bag_path
- logging.info("生成csv: %s" % str(command1))
- process1 = subprocess.Popen(command1, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- output, error = process1.communicate()
- if process1.returncode == 0:
- print("Output:", output)
- else:
- print("Error:", error)
- # 7 调用评分脚本获取评分结果
- # command2 = 'python /root/competition/demo.py /root/candata.csv /root/gpsdata.csv'
- command2 = 'python3 ' + score_script_path + ' ' + merged_bag_dir + 'candata.csv ' + merged_bag_dir + 'gpsdata.csv ' + json_path_6 + ' ' + json_path_13 + ' ' + json_path_1 + ' ' + merged_bag_path2
- logging.info("执行打分命令 %s", command2)
- process2 = subprocess.Popen(command2, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- output, error = process2.communicate()
- if process2.returncode == 0:
- text = str(output)
- logging.info('评分结果为:%s' % str(output))
- # 使用 json.loads() 方法将 JSON 字符串解析为 Python 字典
- data = json.loads(str(output))
- # 8 计算 TotalScore 和 DeductedScore 字段的总和
- total_score = 0 # 初始化总分数为 0
- for item in data: # 遍历每个场景
- # 计算当前场景的实际分数(总分数减去扣减分数)
- actual_score = item['TotalScore'] + item['DeductedScore']
- # 累加实际分数到总分数
- total_score += actual_score
- conn2 = pymysql.connect(
- host='10.14.85.241',
- # host='36.110.106.156',
- port=3306, # 默认MySQL端口为3306
- user='root',
- password='1qaz2wsx!',
- database='dataclosedloop',
- charset='utf8mb4', # 设置字符集
- cursorclass=pymysql.cursors.DictCursor, # 设置游标类型为字典型
- connect_timeout=60000, # 连接超时,单位是秒
- read_timeout=60000, # 读取超时,单位是秒
- write_timeout=60000 # 写入超时,单位是秒
- )
- # 9 将解析结果回传到数据库
- with conn2.cursor() as cursor2:
- # 编写UPDATE语句,更新指定字段
- sql = """
- UPDATE exam
- SET score_online = %s, details = %s
- WHERE id = %s
- """
- cursor2.execute(sql, (total_score, text, row['id']))
- conn2.commit()
- conn2.close()
- else:
- print("执行命令报错:", error)
- logging.info("队伍 %s 评分结束。", row['team_name'].encode('utf-8'))
- # try:
- # os.remove(merged_bag_path)
- # except Exception as e:
- # logging.exception("删除本地临时文件报错: %s" % str(e))
- except Exception as e:
- logging.exception("报错: %s" % str(e))
- continue
|