c_cloud.go 7.7 KB


  1. package config
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. "cicv-data-closedloop/common/util"
  5. "gopkg.in/yaml.v3"
  6. "os"
  7. "sync"
  8. "time"
  9. )
  10. type MonitorStruct struct {
  11. Url string `yaml:"url"`
  12. }
  13. type platform struct {
  14. UrlDeviceAuth string `yaml:"url-device-auth"`
  15. UrlTaskPoll string `yaml:"url-task-poll"`
  16. UrlTask string `yaml:"url-task"`
  17. }
  18. type rosbagStruct struct {
  19. Path string `yaml:"path"`
  20. Envs []string `yaml:"envs"`
  21. }
  22. type hostStruct struct {
  23. Name string `yaml:"name"`
  24. Ip string `yaml:"ip"`
  25. Topics []string `yaml:"topics"`
  26. Rosbag rosbagStruct `yaml:"rosbag"`
  27. }
  28. type ros struct {
  29. MasterAddress string `yaml:"master-address"`
  30. Nodes []string `yaml:"nodes"`
  31. }
  32. type disk struct {
  33. Name string `yaml:"name"`
  34. Used uint64 `yaml:"used"`
  35. }
  36. type trigger struct {
  37. Label string `yaml:"label"`
  38. Topics []string `yaml:"topics"`
  39. }
  40. type CloudConfigStruct struct {
  41. RefreshCloudConfig bool `yaml:"refresh-cloud-config"`
  42. CompressBag bool `yaml:"compress-bag"`
  43. CleanBeforeStart bool `yaml:"clean-before-start"`
  44. FullCollect bool `yaml:"full-collect"`
  45. ConfigRefreshInterval int `yaml:"config-refresh-interval"` // 配置刷新时间间隔
  46. BagNumber int `yaml:"bag-number"`
  47. TimeWindowSendGap int `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
  48. TimeToLabelJsonPath string `yaml:"time-to-label-json-path"`
  49. BagDataDir string `yaml:"bag-data-dir"`
  50. BagCopyDir string `yaml:"bag-copy-dir"`
  51. TriggersDir string `yaml:"triggers-dir"`
  52. TcpPort string `yaml:"tcp-port"`
  53. RpcPort string `yaml:"rpc-port"`
  54. Triggers []trigger `yaml:"triggers"`
  55. Hosts []hostStruct `yaml:"hosts"`
  56. Ros ros `yaml:"ros"`
  57. Platform platform `yaml:"platform"`
  58. Disk disk `yaml:"disk"`
  59. Monitor MonitorStruct `yaml:"monitor"`
  60. }
  61. var (
  62. CloudConfig CloudConfigStruct
  63. CloudConfigMutex sync.RWMutex
  64. )
  65. // 初始化业务配置
  66. func InitCloudConfig() {
  67. c_log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
  68. var content []byte // cloud.yaml 内容
  69. var err error
  70. if LocalConfig.Internet {
  71. c_log.GlobalLogger.Info("车辆可以访问互联网。")
  72. // 获取文件的目录
  73. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  74. // 3 ------- 获取 yaml 字符串 -------
  75. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  76. // todo 等待时间同步
  77. // 判断文件是否存在。如果不存在则使用默认的
  78. isExist, err := OssBucket.IsObjectExist(cloudConfigObjectKey)
  79. if err != nil {
  80. c_log.GlobalLogger.Errorf("判断配置文件是否存在失败,错误信息为:%v", err)
  81. }
  82. if isExist {
  83. c_log.GlobalLogger.Info("使用机器人自定义配置文件:", cloudConfigObjectKey)
  84. } else {
  85. cloudConfigObjectKey = LocalConfig.OssBasePrefix + LocalConfig.CloudConfigFilename // 默认配置文件路径
  86. c_log.GlobalLogger.Info("使用默认配置文件:", cloudConfigObjectKey)
  87. }
  88. for {
  89. OssMutex.Lock()
  90. err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  91. OssMutex.Unlock()
  92. if err != nil {
  93. c_log.GlobalLogger.Error("下载oss上的配置文件 "+cloudConfigObjectKey+" 失败。网络授时未完成或者未配置阿里云网络映射到/etc/hosts:", err)
  94. time.Sleep(time.Duration(2) * time.Second)
  95. continue
  96. }
  97. break
  98. }
  99. if CloudConfig.RefreshCloudConfig {
  100. go RefreshCloudConfig()
  101. }
  102. } else {
  103. c_log.GlobalLogger.Infof("车辆不可以访问互联网,请提前将配置文件放到【%v】。", LocalConfig.CloudConfigLocalPath)
  104. for {
  105. time.Sleep(time.Duration(1) * time.Second)
  106. if util.FileExists(LocalConfig.CloudConfigLocalPath) {
  107. c_log.GlobalLogger.Infof("配置文件【%v】已准备好。", LocalConfig.CloudConfigLocalPath)
  108. break
  109. } else {
  110. c_log.GlobalLogger.Infof("配置文件【%v】没有准备好。", LocalConfig.CloudConfigLocalPath)
  111. continue
  112. }
  113. }
  114. }
  115. content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
  116. if err != nil {
  117. c_log.GlobalLogger.Error("程序退出。配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  118. os.Exit(-1)
  119. }
  120. // 4 ------- 解析YAML内容 -------
  121. var newCloudConfig CloudConfigStruct
  122. err = yaml.Unmarshal(content, &newCloudConfig)
  123. if err != nil {
  124. c_log.GlobalLogger.Error("程序退出。配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  125. os.Exit(-1)
  126. }
  127. // 5 ------- 校验 yaml -------
  128. if checkConfig(newCloudConfig) {
  129. CloudConfigMutex.RLock()
  130. CloudConfig = newCloudConfig
  131. CloudConfigMutex.RUnlock()
  132. } else {
  133. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  134. os.Exit(-1)
  135. }
  136. c_log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
  137. util.CreateDir(CloudConfig.BagDataDir)
  138. util.CreateDir(CloudConfig.BagCopyDir)
  139. _ = util.RemoveSubFiles(CloudConfig.BagDataDir)
  140. if CloudConfig.CleanBeforeStart { // 判断是否需要清空原数据缓存
  141. if LocalConfig.Node.Name == "node1" { // 只有cloud配置了同时节点为node1才清除
  142. // 清空 timeToLabel.json
  143. timeToLabelJson, _ := util.MapToJsonString(map[string]interface{}{"time": "label"})
  144. _ = util.WriteFile(timeToLabelJson, CloudConfig.TimeToLabelJsonPath)
  145. c_log.GlobalLogger.Info("清空timeToLabel.json - 成功。")
  146. }
  147. _ = util.RemoveSubFiles(CloudConfig.BagCopyDir)
  148. }
  149. }
  150. // 刷新配置文件
  151. func refreshCloudConfig() {
  152. // 获取文件的目录
  153. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  154. // 3 ------- 获取 yaml 字符串 -------
  155. var content []byte
  156. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  157. // 判断文件是否存在。如果不存在则使用默认的
  158. isExist, err := OssBucket.IsObjectExist(cloudConfigObjectKey)
  159. if err != nil {
  160. c_log.GlobalLogger.Errorf("判断配置文件是否存在失败,错误信息为:%v", err)
  161. return
  162. }
  163. if !isExist {
  164. cloudConfigObjectKey = LocalConfig.OssBasePrefix + LocalConfig.CloudConfigFilename // 默认配置文件路径
  165. }
  166. OssMutex.Lock()
  167. err = OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  168. OssMutex.Unlock()
  169. if err != nil {
  170. c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
  171. return
  172. }
  173. content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
  174. if err != nil {
  175. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  176. return
  177. }
  178. // 4 ------- 解析YAML内容 -------
  179. var newCloudConfig CloudConfigStruct
  180. err = yaml.Unmarshal(content, &newCloudConfig)
  181. if err != nil {
  182. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  183. return
  184. }
  185. // 5 ------- 校验 yaml -------
  186. if checkConfig(newCloudConfig) {
  187. CloudConfigMutex.RLock()
  188. CloudConfig = newCloudConfig
  189. CloudConfigMutex.RUnlock()
  190. } else {
  191. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  192. return
  193. }
  194. util.CreateDir(CloudConfig.BagDataDir)
  195. util.CreateDir(CloudConfig.BagCopyDir)
  196. }
  197. // RefreshCloudConfig 轮询oss上的配置文件更新到本地
  198. func RefreshCloudConfig() {
  199. for {
  200. time.Sleep(time.Duration(CloudConfig.ConfigRefreshInterval) * time.Second)
  201. refreshCloudConfig()
  202. }
  203. }
  204. // CheckConfig 校验 cfg.yaml 文件
  205. func checkConfig(check CloudConfigStruct) bool {
  206. if len(check.Hosts) != 2 {
  207. c_log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为2。")
  208. os.Exit(-1)
  209. }
  210. return true
  211. }