import sys import pandas as pd import numpy as np import requests from concurrent.futures import ThreadPoolExecutor, as_completed from functools import partial from pathlib import Path root_path = Path(__file__).resolve().parent.parent sys.path.append(str(root_path)) from models.function import function from models.comfort import comfort from models.safety import safety from models.traffic import traffic import json from config import config from models.common import log log_path = config.LOG_PATH logger = log.get_logger(log_path) def traffic_run(data): # 创建 traffic 类的实例 traffic_instance = traffic.SpeedingViolation(data) # traffic_instance = traffic.OvertakingViolation(data) # 调用实例方法 report_statistic,它不接受除 self 之外的参数 try: traffic_result = traffic_instance.report_statistic() # traffic_result = traffic_instance.overtake_statistic() print("------traffic--------") result = {"traffic": traffic_result} print(result) return traffic_result except Exception as e: print(f"An error occurred in Traffic.report_statistic: {e}") def comfort_run(data): # 创建 Comfort 类的实例 comfort_instance = comfort.Comfort(data) # 调用实例方法 report_statistic,它不接受除 self 之外的参数 try: comfort_result = comfort_instance.report_statistic() # print('------comfort--------') # result = {'comfort': comfort_result} # print(result) return comfort_result except Exception as e: print(f"An error occurred in Comfort.report_statistic: {e}") def safety_run(data): # 创建 Comfort 类的实例 safety_instance = safety.Safe(data) # 调用实例方法 report_statistic,它不接受除 self 之外的参数 try: safety_result = safety_instance.report_statistic() # print('------safety--------') # result = {'safety': safety_result} # print(result) return safety_result except Exception as e: print(f"An error occurred in Safe.report_statistic: {e}") def single_case_run(data, case_name): # 创建线程池并启动线程 with ThreadPoolExecutor(max_workers=1) as executor: futures = [ # executor.submit(function, data, config), # executor.submit(safety_run, data), # executor.submit(comfort_run, data), executor.submit(traffic_run, data), ] results = [] # 用于存储程序返回的结果 # 等待所有线程完成并获取结果(带异常处理) for future in as_completed(futures): try: result = future.result() # 获取返回结果 results.append(result) # 将结果添加到列表中 # print(f'创建线程池并启动线程结果{result}') except Exception as e: print(f"{case_name}: An error occurred in a thread: {e}") return results def single_case_statistic(result_list): single_case_result = {} for result in result_list: if isinstance(result, dict): single_case_result.update(result) return single_case_result if __name__ == "__main__": pass