for_competition.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package svc
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/common/config/c_log"
  5. "cicv-data-closedloop/common/util"
  6. "github.com/bluenviron/goroslib/v2"
  7. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  8. "os"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. dir = "/userdata/competition/"
  15. commandArgs = []string{"record", "--split", "--duration=1", "/pji_gps", "/data_read"}
  16. topic = "/cicv_competition"
  17. urlExamBegin = "36.110.106.142/exam/begin"
  18. urlExamEnd = "36.110.106.142/exam/end"
  19. cacheMutex sync.Mutex
  20. cacheTeamName = make(map[string]time.Time)
  21. heartBeatTimeThreshold = 5 * time.Second // 心跳时间
  22. )
  23. // todo 实车比赛临时使用
  24. // history record命令无法录制()
  25. func ForCompetition() {
  26. go dataCollection()
  27. go examBegin()
  28. go examEnd()
  29. }
  30. // 全量数据采集
  31. func dataCollection() {
  32. c_log.GlobalLogger.Info("开始采集实车算法比赛全量数据。")
  33. util.CreateDir(dir)
  34. // 1 打包
  35. c_log.GlobalLogger.Info("采集实车算法比赛全量数据的环境变量为:", commonConfig.RosbagEnvs)
  36. command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dir, commonConfig.RosbagPath, commandArgs...)
  37. if err != nil {
  38. c_log.GlobalLogger.Error("程序崩溃。执行record命令", command, "出错:", err)
  39. os.Exit(-1)
  40. }
  41. // 2 扫描目录文件
  42. for {
  43. time.Sleep(time.Duration(2) * time.Second)
  44. files, _ := util.ListAbsolutePathAndSort(dir)
  45. if len(files) >= 2 {
  46. c_log.GlobalLogger.Info("扫描试车比赛数据采集目录,", files)
  47. for i := range files {
  48. if i == len(files)-1 { // 最后一个包在录制中,不上传
  49. break
  50. }
  51. c_log.GlobalLogger.Debug("上传实车算法比赛全量数据包", files[i])
  52. bagSlice := strings.Split(files[i], "/")
  53. commonConfig.OssMutex.Lock()
  54. _ = commonConfig.OssBucket.PutObjectFromFile("competition/"+bagSlice[len(bagSlice)-1], files[i])
  55. commonConfig.OssMutex.Unlock()
  56. _ = util.DeleteFile(files[i])
  57. }
  58. }
  59. }
  60. }
  61. // data格式为队伍编号
  62. // 保存单次考试时间区间
  63. func examBegin() {
  64. _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  65. Node: commonConfig.RosNode,
  66. Topic: topic,
  67. Callback: func(data *std_msgs.String) {
  68. teamName := data.Data
  69. cacheMutex.Lock()
  70. {
  71. if !util.ContainsKey(cacheTeamName, teamName) { // 1 如果缓存数组中没有此队名,代表考试开始,缓存此队名,和当前时间戳
  72. examBeginTime := time.Now()
  73. cacheTeamName[teamName] = examBeginTime
  74. _, _ = util.PostJsonResponseString(urlExamBegin, map[string]string{"teamName": teamName})
  75. c_log.GlobalLogger.Infof("队伍 %v 的考试开始。", teamName)
  76. } else { // 2 如果缓存数组中有此队名,代表考试进行中,刷新时间戳
  77. cacheTeamName[teamName] = time.Now()
  78. }
  79. }
  80. cacheMutex.Unlock()
  81. },
  82. })
  83. }
  84. func examEnd() {
  85. for {
  86. time.Sleep(time.Duration(1) * time.Second)
  87. cacheMutex.Lock()
  88. {
  89. var keysToDelete []string
  90. for teamName, heartBeatTime := range cacheTeamName {
  91. if time.Since(heartBeatTime) > heartBeatTimeThreshold { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
  92. keysToDelete = append(keysToDelete, teamName)
  93. }
  94. }
  95. for _, teamName := range keysToDelete { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
  96. delete(cacheTeamName, teamName)
  97. _, _ = util.PostJsonResponseString(urlExamEnd, map[string]string{"teamName": teamName})
  98. c_log.GlobalLogger.Infof("队伍 %v 的考试结束。", teamName)
  99. }
  100. }
  101. cacheMutex.Unlock()
  102. }
  103. }