c_cloud.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package config
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. "cicv-data-closedloop/common/util"
  5. "gopkg.in/yaml.v3"
  6. "os"
  7. "sync"
  8. "time"
  9. )
  10. type MonitorStruct struct {
  11. Url string `yaml:"url"`
  12. }
  13. type platform struct {
  14. UrlDeviceAuth string `yaml:"url-device-auth"`
  15. UrlTaskPoll string `yaml:"url-task-poll"`
  16. UrlTask string `yaml:"url-task"`
  17. Ak string `yaml:"ak"`
  18. Sk string `yaml:"sk"`
  19. }
  20. type rosbagStruct struct {
  21. Path string `yaml:"path"`
  22. Envs []string `yaml:"envs"`
  23. }
  24. type hostStruct struct {
  25. Name string `yaml:"name"`
  26. Ip string `yaml:"ip"`
  27. Topics []string `yaml:"topics"`
  28. Rosbag rosbagStruct `yaml:"rosbag"`
  29. }
  30. type ros struct {
  31. MasterAddress string `yaml:"master-address"`
  32. Nodes []string `yaml:"nodes"`
  33. }
  34. type disk struct {
  35. Name string `yaml:"name"`
  36. Used uint64 `yaml:"used"`
  37. Path []string `yaml:"path"`
  38. }
  39. type trigger struct {
  40. Label string `yaml:"label"`
  41. Topics []string `yaml:"topics"`
  42. }
  43. type CloudConfigStruct struct {
  44. RefreshCloudConfig bool `yaml:"refresh-cloud-config"`
  45. CompressBag bool `yaml:"compress-bag"`
  46. CleanBeforeStart bool `yaml:"clean-before-start"`
  47. FullCollect bool `yaml:"full-collect"`
  48. ConfigRefreshInterval int `yaml:"config-refresh-interval"` // 配置刷新时间间隔
  49. BagNumber int `yaml:"bag-number"`
  50. TimeWindowSendGap int `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
  51. TimeToLabelJsonPath string `yaml:"time-to-label-json-path"`
  52. BagDataDir string `yaml:"bag-data-dir"`
  53. BagCopyDir string `yaml:"bag-copy-dir"`
  54. TriggersDir string `yaml:"triggers-dir"`
  55. TcpPort string `yaml:"tcp-port"`
  56. RpcPort string `yaml:"rpc-port"`
  57. Triggers []trigger `yaml:"triggers"`
  58. Hosts []hostStruct `yaml:"hosts"`
  59. Ros ros `yaml:"ros"`
  60. Platform platform `yaml:"platform"`
  61. Disk disk `yaml:"disk"`
  62. Monitor MonitorStruct `yaml:"monitor"`
  63. }
  64. var (
  65. CloudConfig CloudConfigStruct
  66. CloudConfigMutex sync.RWMutex
  67. )
  68. // InitCloudConfig 初始化业务配置
  69. func InitCloudConfig() {
  70. c_log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
  71. var content []byte // cloud.yaml 内容
  72. var err error
  73. // todo 测试时不从网上拉取配置文件,编译时需修改好下面内容
  74. //if LocalConfig.Internet {
  75. // c_log.GlobalLogger.Info("车辆可以访问互联网。")
  76. // // 获取文件的目录
  77. // _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  78. // // 3 ------- 获取 yaml 字符串 -------
  79. // cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  80. // // todo 等待时间同步
  81. // // 判断文件是否存在。如果不存在则使用默认的
  82. // isExist, err := OssBucket.IsObjectExist(cloudConfigObjectKey)
  83. // if err != nil {
  84. // c_log.GlobalLogger.Errorf("判断配置文件是否存在失败,错误信息为:%v", err)
  85. // }
  86. // if isExist {
  87. // c_log.GlobalLogger.Info("使用机器人自定义配置文件:", cloudConfigObjectKey)
  88. // } else {
  89. // cloudConfigObjectKey = LocalConfig.OssBasePrefix + LocalConfig.CloudConfigFilename // 默认配置文件路径
  90. // c_log.GlobalLogger.Info("使用默认配置文件:", cloudConfigObjectKey)
  91. // }
  92. // for {
  93. // OssMutex.Lock()
  94. // err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  95. // OssMutex.Unlock()
  96. // if err != nil {
  97. // c_log.GlobalLogger.Error("下载oss上的配置文件 "+cloudConfigObjectKey+" 失败。网络授时未完成或者未配置阿里云网络映射到/etc/hosts:", err)
  98. // time.Sleep(time.Duration(2) * time.Second)
  99. // continue
  100. // }
  101. // break
  102. // }
  103. // if CloudConfig.RefreshCloudConfig {
  104. // go RefreshCloudConfig()
  105. // }
  106. //} else {
  107. // c_log.GlobalLogger.Infof("车辆不可以访问互联网,请提前将配置文件放到【%v】。", LocalConfig.CloudConfigLocalPath)
  108. // for {
  109. // time.Sleep(time.Duration(1) * time.Second)
  110. // if util.FileExists(LocalConfig.CloudConfigLocalPath) {
  111. // c_log.GlobalLogger.Infof("配置文件【%v】已准备好。", LocalConfig.CloudConfigLocalPath)
  112. // break
  113. // } else {
  114. // c_log.GlobalLogger.Infof("配置文件【%v】没有准备好。", LocalConfig.CloudConfigLocalPath)
  115. // continue
  116. // }
  117. // }
  118. //}
  119. for {
  120. time.Sleep(time.Duration(1) * time.Second)
  121. if util.FileExists(LocalConfig.CloudConfigLocalPath) {
  122. c_log.GlobalLogger.Infof("配置文件【%v】已准备好。", LocalConfig.CloudConfigLocalPath)
  123. break
  124. } else {
  125. c_log.GlobalLogger.Infof("配置文件【%v】没有准备好。", LocalConfig.CloudConfigLocalPath)
  126. continue
  127. }
  128. }
  129. content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
  130. if err != nil {
  131. c_log.GlobalLogger.Error("程序退出。配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  132. os.Exit(-1)
  133. }
  134. // 4 ------- 解析YAML内容 -------
  135. var newCloudConfig CloudConfigStruct
  136. err = yaml.Unmarshal(content, &newCloudConfig)
  137. if err != nil {
  138. c_log.GlobalLogger.Error("程序退出。配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  139. os.Exit(-1)
  140. }
  141. // 5 ------- 校验 yaml -------
  142. if checkConfig(newCloudConfig) {
  143. CloudConfigMutex.RLock()
  144. CloudConfig = newCloudConfig
  145. CloudConfigMutex.RUnlock()
  146. } else {
  147. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  148. os.Exit(-1)
  149. }
  150. c_log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
  151. util.CreateDir(CloudConfig.BagDataDir)
  152. util.CreateDir(CloudConfig.BagCopyDir)
  153. _ = util.RemoveSubFiles(CloudConfig.BagDataDir)
  154. if CloudConfig.CleanBeforeStart { // 判断是否需要清空原数据缓存
  155. if LocalConfig.Node.Name == "node1" { // 只有cloud配置了同时节点为node1才清除
  156. // 清空 timeToLabel.json
  157. timeToLabelJson, _ := util.MapToJsonString(map[string]interface{}{"time": "label"})
  158. _ = util.WriteFile(timeToLabelJson, CloudConfig.TimeToLabelJsonPath)
  159. c_log.GlobalLogger.Info("清空timeToLabel.json - 成功。")
  160. }
  161. _ = util.RemoveSubFiles(CloudConfig.BagCopyDir)
  162. }
  163. }
  164. // refreshCloudConfig 刷新配置文件
  165. func refreshCloudConfig() {
  166. // 获取文件的目录
  167. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  168. // 3 ------- 获取 yaml 字符串 -------
  169. var content []byte
  170. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  171. // 判断文件是否存在。如果不存在则使用默认的
  172. isExist, err := OssBucket.IsObjectExist(cloudConfigObjectKey)
  173. if err != nil {
  174. c_log.GlobalLogger.Errorf("判断配置文件是否存在失败,错误信息为:%v", err)
  175. return
  176. }
  177. if !isExist {
  178. cloudConfigObjectKey = LocalConfig.OssBasePrefix + LocalConfig.CloudConfigFilename // 默认配置文件路径
  179. }
  180. OssMutex.Lock()
  181. err = OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  182. OssMutex.Unlock()
  183. if err != nil {
  184. c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
  185. return
  186. }
  187. content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
  188. if err != nil {
  189. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  190. return
  191. }
  192. // 4 ------- 解析YAML内容 -------
  193. var newCloudConfig CloudConfigStruct
  194. err = yaml.Unmarshal(content, &newCloudConfig)
  195. if err != nil {
  196. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  197. return
  198. }
  199. // 5 ------- 校验 yaml -------
  200. if checkConfig(newCloudConfig) {
  201. CloudConfigMutex.RLock()
  202. CloudConfig = newCloudConfig
  203. CloudConfigMutex.RUnlock()
  204. } else {
  205. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  206. return
  207. }
  208. util.CreateDir(CloudConfig.BagDataDir)
  209. util.CreateDir(CloudConfig.BagCopyDir)
  210. }
  211. // RefreshCloudConfig 轮询oss上的配置文件更新到本地
  212. func RefreshCloudConfig() {
  213. for {
  214. time.Sleep(time.Duration(CloudConfig.ConfigRefreshInterval) * time.Second)
  215. refreshCloudConfig()
  216. }
  217. }
  218. // checkConfig 校验 cfg.yaml 文件
  219. func checkConfig(check CloudConfigStruct) bool {
  220. if len(check.Hosts) != 2 {
  221. c_log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为2。")
  222. os.Exit(-1)
  223. }
  224. return true
  225. }