for_competition.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. "cicv-data-closedloop/common/config/c_log"
  5. "cicv-data-closedloop/common/util"
  6. "cicv-data-closedloop/pjisuv_msgs"
  7. "github.com/bluenviron/goroslib/v2"
  8. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  9. "os"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. var (
  15. dir = "/userdata/competition/"
  16. dirBak = "/userdata/competition-bak/"
  17. commandArgs = []string{"record", "--split", "--duration=1", "/pji_gps", "/data_read"}
  18. topic = "/team_name"
  19. urlExamTick = "http://36.110.106.142:12341/web_server/exam/tick"
  20. urlExamBegin = "http://36.110.106.142:12341/web_server/exam/begin"
  21. urlExamEnd = "http://36.110.106.142:12341/web_server/exam/end"
  22. cacheMutex sync.Mutex
  23. cacheTeamName = make(map[string]time.Time)
  24. heartBeatTimeThreshold = 5 * time.Second // 心跳时间
  25. bakSecondNumber = 3600
  26. cachePositionX = -999.00
  27. cachePositionY = -999.00
  28. cacheAutoMode = -1 // 自动驾驶状态 0 人工 1 自动
  29. )
  30. // todo 实车比赛临时使用
  31. // history record命令无法录制()
  32. func ForCompetition() {
  33. go dataCollection()
  34. go dataCollectionBak(bakSecondNumber) // 需要再车上备份的数据,防止网络差传不回来
  35. go location() // 记录经纬度和速度
  36. go mode() // 记录自动驾驶状态
  37. go tick()
  38. }
  39. // 全量数据采集
  40. func dataCollection() {
  41. c_log.GlobalLogger.Info("开始采集实车算法比赛全量数据。")
  42. util.CreateDir(dir)
  43. // 1 打包
  44. c_log.GlobalLogger.Info("采集实车算法比赛全量数据的环境变量为:", commonConfig.RosbagEnvs)
  45. command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dir, commonConfig.RosbagPath, commandArgs...)
  46. if err != nil {
  47. c_log.GlobalLogger.Error("程序崩溃。执行record命令", command, "出错:", err)
  48. os.Exit(-1)
  49. }
  50. // 2 扫描目录文件
  51. for {
  52. time.Sleep(time.Duration(2) * time.Second)
  53. files, _ := util.ListAbsolutePathAndSort(dir)
  54. if len(files) >= 2 {
  55. //c_log.GlobalLogger.Info("扫描实车比赛数据采集目录,", files)
  56. for i := range files {
  57. if i == len(files)-1 { // 最后一个包在录制中,不上传
  58. break
  59. }
  60. c_log.GlobalLogger.Debug("上传实车算法比赛全量数据包", files[i])
  61. bagSlice := strings.Split(files[i], "/")
  62. commonConfig.OssMutex.Lock()
  63. _ = commonConfig.OssBucket.PutObjectFromFile("competition/"+bagSlice[len(bagSlice)-1], files[i])
  64. commonConfig.OssMutex.Unlock()
  65. _ = util.DeleteFile(files[i])
  66. }
  67. }
  68. }
  69. }
  70. // 全量数据采集
  71. func dataCollectionBak(bakSecondNumber int) {
  72. c_log.GlobalLogger.Info("开始备份实车算法比赛全量数据。")
  73. util.CreateDir(dirBak)
  74. // 1 打包
  75. command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dirBak, commonConfig.RosbagPath, commandArgs...)
  76. if err != nil {
  77. c_log.GlobalLogger.Error("程序崩溃。备份目录执行record命令", command, "出错:", err)
  78. os.Exit(-1)
  79. }
  80. // 2 扫描目录文件
  81. for {
  82. time.Sleep(time.Duration(2) * time.Second)
  83. files, _ := util.ListAbsolutePathAndSort(dirBak)
  84. if len(files) >= bakSecondNumber {
  85. // 超出阈值就删除心跳+1个文件
  86. _ = util.DeleteFile(files[0])
  87. _ = util.DeleteFile(files[1])
  88. _ = util.DeleteFile(files[2])
  89. }
  90. }
  91. }
  92. // data格式为队伍编号
  93. // 保存单次考试时间区间
  94. func location() {
  95. _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  96. Node: commonConfig.RosNode,
  97. Topic: "/cicv_location",
  98. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  99. cachePositionX = data.PositionX
  100. cachePositionY = data.PositionY
  101. },
  102. })
  103. }
  104. func mode() {
  105. _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  106. Node: commonConfig.RosNode,
  107. Topic: "/pj_vehicle_fdb_pub",
  108. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  109. cacheAutoMode = int(data.Automode)
  110. },
  111. })
  112. }
  113. // data格式为队伍编号
  114. // 保存单次考试时间区间
  115. func tick() {
  116. _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  117. Node: commonConfig.RosNode,
  118. Topic: topic,
  119. Callback: func(data *std_msgs.String) {
  120. if cacheAutoMode == 1 { // 只有在自动驾驶状态才发送心跳
  121. teamName := data.Data
  122. _, _ = util.HttpPostJsonWithHeaders(
  123. urlExamTick,
  124. map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  125. map[string]string{
  126. "teamName": teamName,
  127. "positionX": util.ToString(cachePositionX),
  128. "positionY": util.ToString(cachePositionY),
  129. },
  130. )
  131. c_log.GlobalLogger.Infof("队伍 %v 的心跳发送到云端成功。", teamName)
  132. }
  133. },
  134. })
  135. }
  136. // data格式为队伍编号
  137. // 保存单次考试时间区间
  138. func examBeginBak() {
  139. _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  140. Node: commonConfig.RosNode,
  141. Topic: topic,
  142. Callback: func(data *std_msgs.String) {
  143. teamName := data.Data
  144. cacheMutex.Lock()
  145. {
  146. if !util.ContainsKey(cacheTeamName, teamName) { // 1 如果缓存数组中没有此队名,代表考试开始,缓存此队名,和当前时间戳
  147. examBeginTime := time.Now()
  148. cacheTeamName[teamName] = examBeginTime
  149. _, _ = util.HttpPostJsonWithHeaders(
  150. urlExamBegin,
  151. map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  152. map[string]string{"teamName": teamName},
  153. )
  154. c_log.GlobalLogger.Infof("队伍 %v 的考试开始。", teamName)
  155. } else { // 2 如果缓存数组中有此队名,代表考试进行中,刷新时间戳
  156. cacheTeamName[teamName] = time.Now()
  157. }
  158. }
  159. cacheMutex.Unlock()
  160. },
  161. })
  162. }
  163. func examEndBak() {
  164. for {
  165. time.Sleep(time.Duration(1) * time.Second)
  166. cacheMutex.Lock()
  167. {
  168. var keysToDelete []string
  169. for teamName, heartBeatTime := range cacheTeamName {
  170. if time.Since(heartBeatTime) > heartBeatTimeThreshold { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
  171. keysToDelete = append(keysToDelete, teamName)
  172. }
  173. }
  174. for _, teamName := range keysToDelete { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
  175. delete(cacheTeamName, teamName)
  176. _, _ = util.HttpPostJsonWithHeaders(
  177. urlExamEnd,
  178. map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  179. map[string]string{"teamName": teamName},
  180. )
  181. c_log.GlobalLogger.Infof("队伍 %v 的考试结束。", teamName)
  182. }
  183. }
  184. cacheMutex.Unlock()
  185. }
  186. }