123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- package service
- import (
- commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
- "cicv-data-closedloop/common/config/c_log"
- "cicv-data-closedloop/common/util"
- "cicv-data-closedloop/pjisuv_msgs"
- "github.com/bluenviron/goroslib/v2"
- "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
- "os"
- "strings"
- "sync"
- "time"
- )
- var (
- dir = "/userdata/competition/"
- dirBak = "/userdata/competition-bak/"
- commandArgs = []string{"record", "--split", "--duration=1", "/pji_gps", "/data_read"}
- topic = "/team_name"
- urlExamTick = "http://36.110.106.142:12341/web_server/exam/tick"
- urlExamBegin = "http://36.110.106.142:12341/web_server/exam/begin"
- urlExamEnd = "http://36.110.106.142:12341/web_server/exam/end"
- cacheMutex sync.Mutex
- cacheTeamName = make(map[string]time.Time)
- heartBeatTimeThreshold = 5 * time.Second // 心跳时间
- bakSecondNumber = 3600
- cachePositionX = -999.00
- cachePositionY = -999.00
- cacheAutoMode = -1 // 自动驾驶状态 0 人工 1 自动
- )
- // todo 实车比赛临时使用
- // history record命令无法录制()
- func ForCompetition() {
- go dataCollection()
- go dataCollectionBak(bakSecondNumber) // 需要再车上备份的数据,防止网络差传不回来
- go location() // 记录经纬度和速度
- go mode() // 记录自动驾驶状态
- go tick()
- }
- // 全量数据采集
- func dataCollection() {
- c_log.GlobalLogger.Info("开始采集实车算法比赛全量数据。")
- util.CreateDir(dir)
- // 1 打包
- c_log.GlobalLogger.Info("采集实车算法比赛全量数据的环境变量为:", commonConfig.RosbagEnvs)
- command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dir, commonConfig.RosbagPath, commandArgs...)
- if err != nil {
- c_log.GlobalLogger.Error("程序崩溃。执行record命令", command, "出错:", err)
- os.Exit(-1)
- }
- // 2 扫描目录文件
- for {
- time.Sleep(time.Duration(2) * time.Second)
- files, _ := util.ListAbsolutePathAndSort(dir)
- if len(files) >= 2 {
- //c_log.GlobalLogger.Info("扫描实车比赛数据采集目录,", files)
- for i := range files {
- if i == len(files)-1 { // 最后一个包在录制中,不上传
- break
- }
- c_log.GlobalLogger.Debug("上传实车算法比赛全量数据包", files[i])
- bagSlice := strings.Split(files[i], "/")
- commonConfig.OssMutex.Lock()
- _ = commonConfig.OssBucket.PutObjectFromFile("competition/"+bagSlice[len(bagSlice)-1], files[i])
- commonConfig.OssMutex.Unlock()
- _ = util.DeleteFile(files[i])
- }
- }
- }
- }
- // 全量数据采集
- func dataCollectionBak(bakSecondNumber int) {
- c_log.GlobalLogger.Info("开始备份实车算法比赛全量数据。")
- util.CreateDir(dirBak)
- // 1 打包
- command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dirBak, commonConfig.RosbagPath, commandArgs...)
- if err != nil {
- c_log.GlobalLogger.Error("程序崩溃。备份目录执行record命令", command, "出错:", err)
- os.Exit(-1)
- }
- // 2 扫描目录文件
- for {
- time.Sleep(time.Duration(2) * time.Second)
- files, _ := util.ListAbsolutePathAndSort(dirBak)
- if len(files) >= bakSecondNumber {
- // 超出阈值就删除心跳+1个文件
- _ = util.DeleteFile(files[0])
- _ = util.DeleteFile(files[1])
- _ = util.DeleteFile(files[2])
- }
- }
- }
- // data格式为队伍编号
- // 保存单次考试时间区间
- func location() {
- _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: commonConfig.RosNode,
- Topic: "/cicv_location",
- Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
- cachePositionX = data.PositionX
- cachePositionY = data.PositionY
- },
- })
- }
- func mode() {
- _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: commonConfig.RosNode,
- Topic: "/pj_vehicle_fdb_pub",
- Callback: func(data *pjisuv_msgs.VehicleFdb) {
- cacheAutoMode = int(data.Automode)
- },
- })
- }
- // data格式为队伍编号
- // 保存单次考试时间区间
- func tick() {
- _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: commonConfig.RosNode,
- Topic: topic,
- Callback: func(data *std_msgs.String) {
- if cacheAutoMode == 1 { // 只有在自动驾驶状态才发送心跳
- teamName := data.Data
- _, _ = util.HttpPostJsonWithHeaders(
- urlExamTick,
- map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
- map[string]string{
- "teamName": teamName,
- "positionX": util.ToString(cachePositionX),
- "positionY": util.ToString(cachePositionY),
- },
- )
- c_log.GlobalLogger.Infof("队伍 %v 的心跳发送到云端成功。", teamName)
- }
- },
- })
- }
- // data格式为队伍编号
- // 保存单次考试时间区间
- func examBeginBak() {
- _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
- Node: commonConfig.RosNode,
- Topic: topic,
- Callback: func(data *std_msgs.String) {
- teamName := data.Data
- cacheMutex.Lock()
- {
- if !util.ContainsKey(cacheTeamName, teamName) { // 1 如果缓存数组中没有此队名,代表考试开始,缓存此队名,和当前时间戳
- examBeginTime := time.Now()
- cacheTeamName[teamName] = examBeginTime
- _, _ = util.HttpPostJsonWithHeaders(
- urlExamBegin,
- map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
- map[string]string{"teamName": teamName},
- )
- c_log.GlobalLogger.Infof("队伍 %v 的考试开始。", teamName)
- } else { // 2 如果缓存数组中有此队名,代表考试进行中,刷新时间戳
- cacheTeamName[teamName] = time.Now()
- }
- }
- cacheMutex.Unlock()
- },
- })
- }
- func examEndBak() {
- for {
- time.Sleep(time.Duration(1) * time.Second)
- cacheMutex.Lock()
- {
- var keysToDelete []string
- for teamName, heartBeatTime := range cacheTeamName {
- if time.Since(heartBeatTime) > heartBeatTimeThreshold { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
- keysToDelete = append(keysToDelete, teamName)
- }
- }
- for _, teamName := range keysToDelete { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
- delete(cacheTeamName, teamName)
- _, _ = util.HttpPostJsonWithHeaders(
- urlExamEnd,
- map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
- map[string]string{"teamName": teamName},
- )
- c_log.GlobalLogger.Infof("队伍 %v 的考试结束。", teamName)
- }
- }
- cacheMutex.Unlock()
- }
- }
|