c_cloud.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package config
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. "cicv-data-closedloop/common/util"
  5. "gopkg.in/yaml.v3"
  6. "os"
  7. "sync"
  8. "time"
  9. )
  10. type MonitorStruct struct {
  11. Url string `yaml:"url"`
  12. }
  13. type platform struct {
  14. UrlDeviceAuth string `yaml:"url-device-auth"`
  15. UrlTaskPoll string `yaml:"url-task-poll"`
  16. UrlTask string `yaml:"url-task"`
  17. }
  18. type rosbagStruct struct {
  19. Path string `yaml:"path"`
  20. Envs []string `yaml:"envs"`
  21. }
  22. type hostStruct struct {
  23. Name string `yaml:"name"`
  24. Ip string `yaml:"ip"`
  25. Topics []string `yaml:"topics"`
  26. Rosbag rosbagStruct `yaml:"rosbag"`
  27. }
  28. type ros struct {
  29. MasterAddress string `yaml:"master-address"`
  30. Nodes []string `yaml:"nodes"`
  31. }
  32. type disk struct {
  33. Name string `yaml:"name"`
  34. Used uint64 `yaml:"used"`
  35. }
  36. type trigger struct {
  37. Label string `yaml:"label"`
  38. Topics []string `yaml:"topics"`
  39. }
  40. type cloudConfig struct {
  41. CompressBag bool `yaml:"compress-bag"`
  42. CleanBeforeStart bool `yaml:"clean-before-start"`
  43. FullCollect bool `yaml:"full-collect"`
  44. ConfigRefreshInterval int `yaml:"config-refresh-interval"` // 配置刷新时间间隔
  45. BagNumber int `yaml:"bag-number"`
  46. TimeWindowSendGap int `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
  47. TimeToLabelJsonPath string `yaml:"time-to-label-json-path"`
  48. BagDataDir string `yaml:"bag-data-dir"`
  49. BagCopyDir string `yaml:"bag-copy-dir"`
  50. TriggersDir string `yaml:"triggers-dir"`
  51. TcpPort string `yaml:"tcp-port"`
  52. RpcPort string `yaml:"rpc-port"`
  53. Triggers []trigger `yaml:"triggers"`
  54. Hosts []hostStruct `yaml:"hosts"`
  55. Ros ros `yaml:"ros"`
  56. Platform platform `yaml:"platform"`
  57. Disk disk `yaml:"disk"`
  58. Monitor MonitorStruct `yaml:"monitor"`
  59. }
  60. var (
  61. CloudConfig cloudConfig
  62. CloudConfigMutex sync.RWMutex
  63. )
  64. // InitCloudConfig 初始化业务配置
  65. func InitCloudConfig() {
  66. c_log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
  67. // 获取文件的目录
  68. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  69. // 3 ------- 获取 yaml 字符串 -------
  70. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  71. // todo 等待时间同步
  72. // 判断文件是否存在。如果不存在则使用默认的
  73. isExist, err := OssBucket.IsObjectExist(cloudConfigObjectKey)
  74. if err != nil {
  75. c_log.GlobalLogger.Errorf("判断配置文件是否存在失败,错误信息为:%v", err)
  76. }
  77. if isExist {
  78. c_log.GlobalLogger.Info("使用机器人自定义配置文件:", cloudConfigObjectKey)
  79. } else {
  80. cloudConfigObjectKey = LocalConfig.OssBasePrefix + LocalConfig.CloudConfigFilename // 默认配置文件路径
  81. c_log.GlobalLogger.Info("使用机器人默认配置文件:", cloudConfigObjectKey)
  82. }
  83. for {
  84. OssMutex.Lock()
  85. err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  86. OssMutex.Unlock()
  87. if err != nil {
  88. c_log.GlobalLogger.Error("下载oss上的配置文件 "+cloudConfigObjectKey+" 失败。网络授时未完成或者未配置阿里云网络映射到/etc/hosts:", err)
  89. time.Sleep(time.Duration(2) * time.Second)
  90. continue
  91. }
  92. break
  93. }
  94. content, err := os.ReadFile(LocalConfig.CloudConfigLocalPath)
  95. if err != nil {
  96. c_log.GlobalLogger.Error("程序退出。配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  97. os.Exit(-1)
  98. }
  99. // 4 ------- 解析YAML内容 -------
  100. var newCloudConfig cloudConfig
  101. err = yaml.Unmarshal(content, &newCloudConfig)
  102. if err != nil {
  103. c_log.GlobalLogger.Error("程序退出。配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  104. os.Exit(-1)
  105. }
  106. // 5 ------- 校验 yaml -------
  107. if checkConfig(newCloudConfig) {
  108. CloudConfigMutex.RLock()
  109. CloudConfig = newCloudConfig
  110. CloudConfigMutex.RUnlock()
  111. } else {
  112. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  113. os.Exit(-1)
  114. }
  115. c_log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
  116. util.CreateDir(CloudConfig.BagDataDir)
  117. util.CreateDir(CloudConfig.BagCopyDir)
  118. _ = util.RemoveSubFiles(CloudConfig.BagDataDir)
  119. if CloudConfig.CleanBeforeStart { // 判断是否需要清空原数据缓存
  120. if LocalConfig.Node.Name == "node1" {
  121. // 清空 timeToLabel.json
  122. timeToLabelJson, _ := util.MapToJsonString(map[string]interface{}{"time": "label"})
  123. _ = util.WriteFile(timeToLabelJson, CloudConfig.TimeToLabelJsonPath)
  124. }
  125. _ = util.RemoveSubFiles(CloudConfig.BagCopyDir)
  126. }
  127. }
  128. // RefreshCloudConfig 初始化业务配置
  129. func refreshCloudConfig() {
  130. // 获取文件的目录
  131. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  132. // 3 ------- 获取 yaml 字符串 -------
  133. var content []byte
  134. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  135. // 判断文件是否存在。如果不存在则使用默认的
  136. isExist, err := OssBucket.IsObjectExist(cloudConfigObjectKey)
  137. if err != nil {
  138. c_log.GlobalLogger.Errorf("判断配置文件是否存在失败,错误信息为:%v", err)
  139. }
  140. if !isExist {
  141. cloudConfigObjectKey = LocalConfig.OssBasePrefix + LocalConfig.CloudConfigFilename // 默认配置文件路径
  142. }
  143. OssMutex.Lock()
  144. err = OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  145. OssMutex.Unlock()
  146. if err != nil {
  147. c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
  148. return
  149. }
  150. content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
  151. if err != nil {
  152. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  153. return
  154. }
  155. // 4 ------- 解析YAML内容 -------
  156. var newCloudConfig cloudConfig
  157. err = yaml.Unmarshal(content, &newCloudConfig)
  158. if err != nil {
  159. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  160. return
  161. }
  162. // 5 ------- 校验 yaml -------
  163. if checkConfig(newCloudConfig) {
  164. CloudConfigMutex.RLock()
  165. CloudConfig = newCloudConfig
  166. CloudConfigMutex.RUnlock()
  167. } else {
  168. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  169. return
  170. }
  171. util.CreateDir(CloudConfig.BagDataDir)
  172. util.CreateDir(CloudConfig.BagCopyDir)
  173. }
  174. // RefreshCloudConfig 轮询oss上的配置文件更新到本地
  175. func RefreshCloudConfig() {
  176. for {
  177. time.Sleep(time.Duration(CloudConfig.ConfigRefreshInterval) * time.Second)
  178. refreshCloudConfig()
  179. }
  180. }
  181. // CheckConfig 校验 cfg.yaml 文件
  182. func checkConfig(check cloudConfig) bool {
  183. if len(check.Hosts) != 2 {
  184. c_log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为2。")
  185. os.Exit(-1)
  186. }
  187. return true
  188. }