package service import ( commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config" "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/util" "cicv-data-closedloop/pjisuv_msgs" "github.com/bluenviron/goroslib/v2" "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs" "os" "strings" "sync" "time" ) var ( dir = "/userdata/competition/" dirBak = "/userdata/competition-bak/" commandArgs = []string{"record", "--split", "--duration=1", "/pji_gps", "/data_read"} topic = "/team_name" urlExamTick = "http://36.110.106.142:12341/web_server/exam/tick" urlExamBegin = "http://36.110.106.142:12341/web_server/exam/begin" urlExamEnd = "http://36.110.106.142:12341/web_server/exam/end" cacheMutex sync.Mutex cacheTeamName = make(map[string]time.Time) heartBeatTimeThreshold = 5 * time.Second // 心跳时间 bakSecondNumber = 3600 cachePositionX = -999.00 cachePositionY = -999.00 cacheAutoMode = -1 // 自动驾驶状态 0 人工 1 自动 ) // todo 实车比赛临时使用 // history record命令无法录制() func ForCompetition() { go dataCollection() go dataCollectionBak(bakSecondNumber) // 需要再车上备份的数据,防止网络差传不回来 go location() // 记录经纬度和速度 go mode() // 记录自动驾驶状态 go tick() } // 全量数据采集 func dataCollection() { c_log.GlobalLogger.Info("开始采集实车算法比赛全量数据。") util.CreateDir(dir) // 1 打包 c_log.GlobalLogger.Info("采集实车算法比赛全量数据的环境变量为:", commonConfig.RosbagEnvs) command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dir, commonConfig.RosbagPath, commandArgs...) if err != nil { c_log.GlobalLogger.Error("程序崩溃。执行record命令", command, "出错:", err) os.Exit(-1) } // 2 扫描目录文件 for { time.Sleep(time.Duration(2) * time.Second) files, _ := util.ListAbsolutePathAndSort(dir) if len(files) >= 2 { //c_log.GlobalLogger.Info("扫描实车比赛数据采集目录,", files) for i := range files { if i == len(files)-1 { // 最后一个包在录制中,不上传 break } c_log.GlobalLogger.Debug("上传实车算法比赛全量数据包", files[i]) bagSlice := strings.Split(files[i], "/") commonConfig.OssMutex.Lock() _ = commonConfig.OssBucket.PutObjectFromFile("competition/"+bagSlice[len(bagSlice)-1], files[i]) commonConfig.OssMutex.Unlock() _ = util.DeleteFile(files[i]) } } } } // 全量数据采集 func dataCollectionBak(bakSecondNumber int) { c_log.GlobalLogger.Info("开始备份实车算法比赛全量数据。") util.CreateDir(dirBak) // 1 打包 command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dirBak, commonConfig.RosbagPath, commandArgs...) if err != nil { c_log.GlobalLogger.Error("程序崩溃。备份目录执行record命令", command, "出错:", err) os.Exit(-1) } // 2 扫描目录文件 for { time.Sleep(time.Duration(2) * time.Second) files, _ := util.ListAbsolutePathAndSort(dirBak) if len(files) >= bakSecondNumber { // 超出阈值就删除心跳+1个文件 _ = util.DeleteFile(files[0]) _ = util.DeleteFile(files[1]) _ = util.DeleteFile(files[2]) } } } // data格式为队伍编号 // 保存单次考试时间区间 func location() { _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: "/cicv_location", Callback: func(data *pjisuv_msgs.PerceptionLocalization) { cachePositionX = data.PositionX cachePositionY = data.PositionY }, }) } func mode() { _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: "/pj_vehicle_fdb_pub", Callback: func(data *pjisuv_msgs.VehicleFdb) { cacheAutoMode = int(data.Automode) }, }) } // data格式为队伍编号 // 保存单次考试时间区间 func tick() { _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: topic, Callback: func(data *std_msgs.String) { if cacheAutoMode == 1 { // 只有在自动驾驶状态才发送心跳 teamName := data.Data _, _ = util.HttpPostJsonWithHeaders( urlExamTick, map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"}, map[string]string{ "teamName": teamName, "positionX": util.ToString(cachePositionX), "positionY": util.ToString(cachePositionY), }, ) c_log.GlobalLogger.Infof("队伍 %v 的心跳发送到云端成功。", teamName) } }, }) } // data格式为队伍编号 // 保存单次考试时间区间 func examBeginBak() { _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: topic, Callback: func(data *std_msgs.String) { teamName := data.Data cacheMutex.Lock() { if !util.ContainsKey(cacheTeamName, teamName) { // 1 如果缓存数组中没有此队名,代表考试开始,缓存此队名,和当前时间戳 examBeginTime := time.Now() cacheTeamName[teamName] = examBeginTime _, _ = util.HttpPostJsonWithHeaders( urlExamBegin, map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"}, map[string]string{"teamName": teamName}, ) c_log.GlobalLogger.Infof("队伍 %v 的考试开始。", teamName) } else { // 2 如果缓存数组中有此队名,代表考试进行中,刷新时间戳 cacheTeamName[teamName] = time.Now() } } cacheMutex.Unlock() }, }) } func examEndBak() { for { time.Sleep(time.Duration(1) * time.Second) cacheMutex.Lock() { var keysToDelete []string for teamName, heartBeatTime := range cacheTeamName { if time.Since(heartBeatTime) > heartBeatTimeThreshold { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名 keysToDelete = append(keysToDelete, teamName) } } for _, teamName := range keysToDelete { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名 delete(cacheTeamName, teamName) _, _ = util.HttpPostJsonWithHeaders( urlExamEnd, map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"}, map[string]string{"teamName": teamName}, ) c_log.GlobalLogger.Infof("队伍 %v 的考试结束。", teamName) } } cacheMutex.Unlock() } }