for_competition.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package service
  2. import (
  3. "cicv-data-closedloop/aarch64/pjisuv/common/config"
  4. commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
  5. "cicv-data-closedloop/common/config/c_log"
  6. "cicv-data-closedloop/common/util"
  7. "cicv-data-closedloop/pjisuv_msgs"
  8. "github.com/bluenviron/goroslib/v2"
  9. "github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
  10. "os"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. dir = "/userdata/competition/"
  17. dirBak = "/userdata/competition-bak/"
  18. ossPrefix = "competition/"
  19. // v20240604 添加 /pj_vehicle_fdb_pub 判断人工接管场景
  20. commandArgs = []string{"record", "--split", "--duration=1", "/pji_gps", "/data_read", "/pj_vehicle_fdb_pub"}
  21. topic = "/team_name"
  22. urlExamTick = "http://36.110.106.142:12341/web_server/exam/tick"
  23. urlExamBegin = "http://36.110.106.142:12341/web_server/exam/begin"
  24. urlExamEnd = "http://36.110.106.142:12341/web_server/exam/end"
  25. cacheMutex sync.Mutex
  26. cacheTeamName = make(map[string]time.Time)
  27. heartBeatTimeThreshold = 5 * time.Second // 心跳时间
  28. bakSecondNumber = 3600
  29. cachePositionX = -999.00
  30. cachePositionY = -999.00
  31. cacheAutoMode = -1 // 自动驾驶状态 0 人工 1 自动
  32. InitialPositionX = 565266.0482367771 // todo 需要比赛确认起点
  33. InitialPositionY = 4329773.48728322 // todo 需要比赛确认起点
  34. )
  35. // todo 实车比赛临时使用
  36. // history record命令无法录制()
  37. func ForCompetition() {
  38. go dataCollection()
  39. go dataCollectionBak(bakSecondNumber) // 需要再车上备份的数据,防止网络差传不回来
  40. go location() // 记录经纬度和速度
  41. go mode() // 记录自动驾驶状态
  42. go tick()
  43. }
  44. // 全量数据采集
  45. func dataCollection() {
  46. c_log.GlobalLogger.Info("开始采集实车算法比赛全量数据。")
  47. util.CreateDir(dir)
  48. // 1 打包
  49. c_log.GlobalLogger.Info("采集实车算法比赛全量数据的环境变量为:", commonConfig.RosbagEnvs)
  50. command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dir, commonConfig.RosbagPath, commandArgs...)
  51. if err != nil {
  52. c_log.GlobalLogger.Error("程序崩溃。执行record命令", command, "出错:", err)
  53. os.Exit(-1)
  54. }
  55. // 2 扫描目录文件
  56. for {
  57. time.Sleep(time.Duration(2) * time.Second)
  58. files, _ := util.ListAbsolutePathAndSort(dir)
  59. if len(files) >= 2 {
  60. //c_log.GlobalLogger.Info("扫描实车比赛数据采集目录,", files)
  61. for i := range files {
  62. if i == len(files)-1 { // 最后一个包在录制中,不上传
  63. break
  64. }
  65. c_log.GlobalLogger.Debug("上传实车算法比赛全量数据包", files[i])
  66. bagSlice := strings.Split(files[i], "/")
  67. commonConfig.OssMutex.Lock()
  68. _ = commonConfig.OssBucket.PutObjectFromFile(ossPrefix+config.LocalConfig.EquipmentNo+"/"+bagSlice[len(bagSlice)-1], files[i])
  69. commonConfig.OssMutex.Unlock()
  70. _ = util.DeleteFile(files[i])
  71. }
  72. }
  73. }
  74. }
  75. // 全量数据采集
  76. func dataCollectionBak(bakSecondNumber int) {
  77. c_log.GlobalLogger.Info("开始备份实车算法比赛全量数据。")
  78. util.CreateDir(dirBak)
  79. // 1 打包
  80. command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dirBak, commonConfig.RosbagPath, commandArgs...)
  81. if err != nil {
  82. c_log.GlobalLogger.Error("程序崩溃。备份目录执行record命令", command, "出错:", err)
  83. os.Exit(-1)
  84. }
  85. // 2 扫描目录文件
  86. for {
  87. time.Sleep(time.Duration(2) * time.Second)
  88. files, _ := util.ListAbsolutePathAndSort(dirBak)
  89. if len(files) >= bakSecondNumber {
  90. // 超出阈值就删除心跳+1个文件
  91. _ = util.DeleteFile(files[0])
  92. _ = util.DeleteFile(files[1])
  93. _ = util.DeleteFile(files[2])
  94. }
  95. }
  96. }
  97. // data格式为队伍编号
  98. // 保存单次考试时间区间
  99. func location() {
  100. _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  101. Node: commonConfig.RosNode,
  102. Topic: "/cicv_location",
  103. Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
  104. cachePositionX = data.PositionX
  105. cachePositionY = data.PositionY
  106. },
  107. })
  108. }
  109. func mode() {
  110. _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  111. Node: commonConfig.RosNode,
  112. Topic: "/pj_vehicle_fdb_pub",
  113. Callback: func(data *pjisuv_msgs.VehicleFdb) {
  114. cacheAutoMode = int(data.Automode)
  115. },
  116. })
  117. }
  118. // data格式为队伍编号
  119. // 保存单次考试时间区间
  120. func tick() {
  121. _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  122. Node: commonConfig.RosNode,
  123. Topic: topic,
  124. Callback: func(data *std_msgs.String) {
  125. if cacheAutoMode == 1 { // 只有在自动驾驶状态才发送心跳
  126. //if math.Abs(positionX-InitialPositionX) > 5.00 || math.Abs(positionY-InitialPositionY) > 5.00 { // todo 测试临时使用起点判断
  127. teamName := data.Data
  128. response, err := util.HttpPostJsonWithHeaders(
  129. urlExamTick,
  130. map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  131. map[string]string{
  132. "teamName": teamName,
  133. "positionX": util.ToString(cachePositionX),
  134. "positionY": util.ToString(cachePositionY),
  135. "equipmentNo": config.LocalConfig.EquipmentNo, // 设备编号
  136. },
  137. )
  138. if err != nil {
  139. c_log.GlobalLogger.Infof("队伍 %v 的心跳发送到云端报错,错误信息为:%v。", teamName, err)
  140. }
  141. c_log.GlobalLogger.Infof("队伍 %v 的心跳发送到云端,响应结果为:%v。", teamName, response)
  142. }
  143. },
  144. })
  145. }
  146. //// data格式为队伍编号
  147. //// 保存单次考试时间区间
  148. //func examBeginBak() {
  149. // _, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
  150. // Node: commonConfig.RosNode,
  151. // Topic: topic,
  152. // Callback: func(data *std_msgs.String) {
  153. // teamName := data.Data
  154. // cacheMutex.Lock()
  155. // {
  156. // if !util.ContainsKey(cacheTeamName, teamName) { // 1 如果缓存数组中没有此队名,代表考试开始,缓存此队名,和当前时间戳
  157. // examBeginTime := time.Now()
  158. // cacheTeamName[teamName] = examBeginTime
  159. // _, _ = util.HttpPostJsonWithHeaders(
  160. // urlExamBegin,
  161. // map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  162. // map[string]string{"teamName": teamName},
  163. // )
  164. // c_log.GlobalLogger.Infof("队伍 %v 的考试开始。", teamName)
  165. // } else { // 2 如果缓存数组中有此队名,代表考试进行中,刷新时间戳
  166. // cacheTeamName[teamName] = time.Now()
  167. // }
  168. // }
  169. // cacheMutex.Unlock()
  170. // },
  171. // })
  172. //}
  173. //
  174. //func examEndBak() {
  175. // for {
  176. // time.Sleep(time.Duration(1) * time.Second)
  177. // cacheMutex.Lock()
  178. // {
  179. // var keysToDelete []string
  180. // for teamName, heartBeatTime := range cacheTeamName {
  181. // if time.Since(heartBeatTime) > heartBeatTimeThreshold { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
  182. // keysToDelete = append(keysToDelete, teamName)
  183. // }
  184. // }
  185. // for _, teamName := range keysToDelete { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
  186. // delete(cacheTeamName, teamName)
  187. // _, _ = util.HttpPostJsonWithHeaders(
  188. // urlExamEnd,
  189. // map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
  190. // map[string]string{"teamName": teamName},
  191. // )
  192. // c_log.GlobalLogger.Infof("队伍 %v 的考试结束。", teamName)
  193. // }
  194. // }
  195. // cacheMutex.Unlock()
  196. // }
  197. //}