c_cloud.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package config
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. "cicv-data-closedloop/common/util"
  5. "gopkg.in/yaml.v3"
  6. "os"
  7. "sync"
  8. "time"
  9. )
  10. type platform struct {
  11. UrlDeviceAuth string `yaml:"url-device-auth"`
  12. UrlTaskPoll string `yaml:"url-task-poll"`
  13. UrlTask string `yaml:"url-task"`
  14. }
  15. type rosbagStruct struct {
  16. Path string `yaml:"path"`
  17. Envs []string `yaml:"envs"`
  18. }
  19. type hostStruct struct {
  20. Name string `yaml:"name"`
  21. Ip string `yaml:"ip"`
  22. Topics []string `yaml:"topics"`
  23. Rosbag rosbagStruct `yaml:"rosbag"`
  24. }
  25. type ros struct {
  26. MasterAddress string `yaml:"master-address"`
  27. Nodes []string `yaml:"nodes"`
  28. }
  29. type disk struct {
  30. Name string `yaml:"name"`
  31. Used uint64 `yaml:"used"`
  32. }
  33. type trigger struct {
  34. Label string `yaml:"label"`
  35. Topics []string `yaml:"topics"`
  36. }
  37. type cloudConfig struct {
  38. FullCollect bool `yaml:"full-collect"`
  39. ConfigRefreshInterval int `yaml:"config-refresh-interval"` // 配置刷新时间间隔
  40. BagNumber int `yaml:"bag-number"`
  41. TimeWindowSendGap int `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
  42. TimeToLabelJsonPath string `yaml:"time-to-label-json-path"`
  43. BagDataDir string `yaml:"bag-data-dir"`
  44. BagCopyDir string `yaml:"bag-copy-dir"`
  45. TriggersDir string `yaml:"triggers-dir"`
  46. TcpPort string `yaml:"tcp-port"`
  47. RpcPort string `yaml:"rpc-port"`
  48. Triggers []trigger `yaml:"triggers"`
  49. Hosts []hostStruct `yaml:"hosts"`
  50. Ros ros `yaml:"ros"`
  51. Platform platform `yaml:"platform"`
  52. Disk disk `yaml:"disk"`
  53. }
  54. var (
  55. CloudConfig cloudConfig
  56. CloudConfigMutex sync.RWMutex
  57. )
  58. // InitCloudConfig 初始化业务配置
  59. func InitCloudConfig() {
  60. c_log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
  61. // 获取文件的目录
  62. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  63. // 3 ------- 获取 yaml 字符串 -------
  64. var content []byte
  65. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  66. // 等待时间同步
  67. for {
  68. OssMutex.Lock()
  69. err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  70. OssMutex.Unlock()
  71. if err != nil {
  72. c_log.GlobalLogger.Error("下载oss上的配置文件 "+cloudConfigObjectKey+" 失败。可能需要等待时间同步完成:", err)
  73. time.Sleep(time.Duration(2) * time.Second)
  74. continue
  75. }
  76. break
  77. }
  78. content, err := os.ReadFile(LocalConfig.CloudConfigLocalPath)
  79. if err != nil {
  80. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  81. os.Exit(-1)
  82. }
  83. // 4 ------- 解析YAML内容 -------
  84. var newCloudConfig cloudConfig
  85. err = yaml.Unmarshal(content, &newCloudConfig)
  86. if err != nil {
  87. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  88. os.Exit(-1)
  89. }
  90. // 5 ------- 校验 yaml -------
  91. if checkConfig(newCloudConfig) {
  92. CloudConfigMutex.RLock()
  93. CloudConfig = newCloudConfig
  94. CloudConfigMutex.RUnlock()
  95. } else {
  96. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  97. os.Exit(-1)
  98. }
  99. c_log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
  100. _ = util.CreateDir(CloudConfig.BagDataDir)
  101. _ = util.CreateDir(CloudConfig.BagCopyDir)
  102. timeToLabelJson, _ := util.MapToJsonString(map[string]interface{}{"time": "label"})
  103. _ = util.WriteFile(timeToLabelJson, CloudConfig.TimeToLabelJsonPath)
  104. }
  105. // RefreshCloudConfig 初始化业务配置
  106. func refreshCloudConfig() {
  107. // 获取文件的目录
  108. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  109. // 3 ------- 获取 yaml 字符串 -------
  110. var content []byte
  111. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  112. OssMutex.Lock()
  113. err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  114. OssMutex.Unlock()
  115. if err != nil {
  116. c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
  117. return
  118. }
  119. content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
  120. if err != nil {
  121. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  122. return
  123. }
  124. // 4 ------- 解析YAML内容 -------
  125. var newCloudConfig cloudConfig
  126. err = yaml.Unmarshal(content, &newCloudConfig)
  127. if err != nil {
  128. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  129. return
  130. }
  131. // 5 ------- 校验 yaml -------
  132. if checkConfig(newCloudConfig) {
  133. CloudConfigMutex.RLock()
  134. CloudConfig = newCloudConfig
  135. CloudConfigMutex.RUnlock()
  136. } else {
  137. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  138. return
  139. }
  140. _ = util.CreateDir(CloudConfig.BagDataDir)
  141. _ = util.CreateDir(CloudConfig.BagCopyDir)
  142. }
  143. // RefreshCloudConfig 轮询oss上的配置文件更新到本地
  144. func RefreshCloudConfig() {
  145. for {
  146. time.Sleep(time.Duration(CloudConfig.ConfigRefreshInterval) * time.Second)
  147. refreshCloudConfig()
  148. }
  149. }
  150. // CheckConfig 校验 cfg.yaml 文件
  151. func checkConfig(check cloudConfig) bool {
  152. if len(check.Hosts) != 2 {
  153. c_log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为2。")
  154. os.Exit(-1)
  155. }
  156. return true
  157. }