package service import ( "cicv-data-closedloop/aarch64/pjisuv/common/config" commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config" "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/util" "cicv-data-closedloop/pjisuv_msgs" "github.com/bluenviron/goroslib/v2" "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs" "os" "strings" "sync" "time" ) var ( dir = "/userdata/competition/" dirBak = "/userdata/competition-bak/" ossPrefix = "competition/" commandArgs = []string{"record", "--split", "--duration=1", "/pji_gps", "/data_read"} topic = "/team_name" urlExamTick = "http://36.110.106.142:12341/web_server/exam/tick" urlExamBegin = "http://36.110.106.142:12341/web_server/exam/begin" urlExamEnd = "http://36.110.106.142:12341/web_server/exam/end" cacheMutex sync.Mutex cacheTeamName = make(map[string]time.Time) heartBeatTimeThreshold = 5 * time.Second // 心跳时间 bakSecondNumber = 3600 cachePositionX = -999.00 cachePositionY = -999.00 cacheAutoMode = -1 // 自动驾驶状态 0 人工 1 自动 ) // todo 实车比赛临时使用 // history record命令无法录制() func ForCompetition() { go dataCollection() go dataCollectionBak(bakSecondNumber) // 需要再车上备份的数据,防止网络差传不回来 go location() // 记录经纬度和速度 go mode() // 记录自动驾驶状态 go tick() } // 全量数据采集 func dataCollection() { c_log.GlobalLogger.Info("开始采集实车算法比赛全量数据。") util.CreateDir(dir) // 1 打包 c_log.GlobalLogger.Info("采集实车算法比赛全量数据的环境变量为:", commonConfig.RosbagEnvs) command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dir, commonConfig.RosbagPath, commandArgs...) if err != nil { c_log.GlobalLogger.Error("程序崩溃。执行record命令", command, "出错:", err) os.Exit(-1) } // 2 扫描目录文件 for { time.Sleep(time.Duration(2) * time.Second) files, _ := util.ListAbsolutePathAndSort(dir) if len(files) >= 2 { //c_log.GlobalLogger.Info("扫描实车比赛数据采集目录,", files) for i := range files { if i == len(files)-1 { // 最后一个包在录制中,不上传 break } c_log.GlobalLogger.Debug("上传实车算法比赛全量数据包", files[i]) bagSlice := strings.Split(files[i], "/") commonConfig.OssMutex.Lock() _ = commonConfig.OssBucket.PutObjectFromFile(ossPrefix+config.LocalConfig.EquipmentNo+"/"+bagSlice[len(bagSlice)-1], files[i]) commonConfig.OssMutex.Unlock() _ = util.DeleteFile(files[i]) } } } } // 全量数据采集 func dataCollectionBak(bakSecondNumber int) { c_log.GlobalLogger.Info("开始备份实车算法比赛全量数据。") util.CreateDir(dirBak) // 1 打包 command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dirBak, commonConfig.RosbagPath, commandArgs...) if err != nil { c_log.GlobalLogger.Error("程序崩溃。备份目录执行record命令", command, "出错:", err) os.Exit(-1) } // 2 扫描目录文件 for { time.Sleep(time.Duration(2) * time.Second) files, _ := util.ListAbsolutePathAndSort(dirBak) if len(files) >= bakSecondNumber { // 超出阈值就删除心跳+1个文件 _ = util.DeleteFile(files[0]) _ = util.DeleteFile(files[1]) _ = util.DeleteFile(files[2]) } } } // data格式为队伍编号 // 保存单次考试时间区间 func location() { _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: "/cicv_location", Callback: func(data *pjisuv_msgs.PerceptionLocalization) { cachePositionX = data.PositionX cachePositionY = data.PositionY }, }) } func mode() { _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: "/pj_vehicle_fdb_pub", Callback: func(data *pjisuv_msgs.VehicleFdb) { cacheAutoMode = int(data.Automode) }, }) } // data格式为队伍编号 // 保存单次考试时间区间 func tick() { _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{ Node: commonConfig.RosNode, Topic: topic, Callback: func(data *std_msgs.String) { if cacheAutoMode == 1 { // 只有在自动驾驶状态才发送心跳 teamName := data.Data response, _ := util.HttpPostJsonWithHeaders( urlExamTick, map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"}, map[string]string{ "teamName": teamName, "positionX": util.ToString(cachePositionX), "positionY": util.ToString(cachePositionY), "equipmentNo": config.LocalConfig.EquipmentNo, // 设备编号 }, ) c_log.GlobalLogger.Infof("队伍 %v 的心跳发送到云端,响应结果为:%v。", teamName, response) } }, }) } //// data格式为队伍编号 //// 保存单次考试时间区间 //func examBeginBak() { // _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{ // Node: commonConfig.RosNode, // Topic: topic, // Callback: func(data *std_msgs.String) { // teamName := data.Data // cacheMutex.Lock() // { // if !util.ContainsKey(cacheTeamName, teamName) { // 1 如果缓存数组中没有此队名,代表考试开始,缓存此队名,和当前时间戳 // examBeginTime := time.Now() // cacheTeamName[teamName] = examBeginTime // _, _ = util.HttpPostJsonWithHeaders( // urlExamBegin, // map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"}, // map[string]string{"teamName": teamName}, // ) // c_log.GlobalLogger.Infof("队伍 %v 的考试开始。", teamName) // } else { // 2 如果缓存数组中有此队名,代表考试进行中,刷新时间戳 // cacheTeamName[teamName] = time.Now() // } // } // cacheMutex.Unlock() // }, // }) //} // //func examEndBak() { // for { // time.Sleep(time.Duration(1) * time.Second) // cacheMutex.Lock() // { // var keysToDelete []string // for teamName, heartBeatTime := range cacheTeamName { // if time.Since(heartBeatTime) > heartBeatTimeThreshold { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名 // keysToDelete = append(keysToDelete, teamName) // } // } // for _, teamName := range keysToDelete { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名 // delete(cacheTeamName, teamName) // _, _ = util.HttpPostJsonWithHeaders( // urlExamEnd, // map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"}, // map[string]string{"teamName": teamName}, // ) // c_log.GlobalLogger.Infof("队伍 %v 的考试结束。", teamName) // } // } // cacheMutex.Unlock() // } //}