c_cloud.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package config
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. "cicv-data-closedloop/common/util"
  5. "gopkg.in/yaml.v3"
  6. "os"
  7. "sync"
  8. "time"
  9. )
  10. type MonitorStruct struct {
  11. Url string `yaml:"url"`
  12. }
  13. type platform struct {
  14. UrlDeviceAuth string `yaml:"url-device-auth"`
  15. UrlTaskPoll string `yaml:"url-task-poll"`
  16. UrlTask string `yaml:"url-task"`
  17. }
  18. type rosbagStruct struct {
  19. Path string `yaml:"path"`
  20. Envs []string `yaml:"envs"`
  21. }
  22. type hostStruct struct {
  23. Name string `yaml:"name"`
  24. Ip string `yaml:"ip"`
  25. Topics []string `yaml:"topics"`
  26. Rosbag rosbagStruct `yaml:"rosbag"`
  27. }
  28. type ros struct {
  29. MasterAddress string `yaml:"master-address"`
  30. Nodes []string `yaml:"nodes"`
  31. }
  32. type disk struct {
  33. Name string `yaml:"name"`
  34. Used uint64 `yaml:"used"`
  35. }
  36. type trigger struct {
  37. Label string `yaml:"label"`
  38. Topics []string `yaml:"topics"`
  39. }
  40. type cloudConfig struct {
  41. FullCollect bool `yaml:"full-collect"`
  42. ConfigRefreshInterval int `yaml:"config-refresh-interval"` // 配置刷新时间间隔
  43. BagNumber int `yaml:"bag-number"`
  44. TimeWindowSendGap int `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
  45. BagDataDir string `yaml:"bag-data-dir"`
  46. BagCopyDir string `yaml:"bag-copy-dir"`
  47. TriggersDir string `yaml:"triggers-dir"`
  48. RpcPort string `yaml:"rpc-port"`
  49. Triggers []trigger `yaml:"triggers"`
  50. Hosts []hostStruct `yaml:"hosts"`
  51. Ros ros `yaml:"ros"`
  52. Platform platform `yaml:"platform"`
  53. Disk disk `yaml:"disk"`
  54. Monitor MonitorStruct `yaml:"monitor"`
  55. }
  56. var (
  57. CloudConfig cloudConfig
  58. CloudConfigMutex sync.RWMutex
  59. )
  60. // InitCloudConfig 初始化业务配置
  61. func InitCloudConfig() {
  62. c_log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
  63. // 获取文件的目录
  64. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  65. // 3 ------- 获取 yaml 字符串 -------
  66. var content []byte
  67. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  68. OssMutex.Lock()
  69. err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  70. OssMutex.Unlock()
  71. if err != nil {
  72. c_log.GlobalLogger.Error("程序崩溃,下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
  73. os.Exit(-1)
  74. }
  75. content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
  76. if err != nil {
  77. c_log.GlobalLogger.Error("程序崩溃,配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  78. os.Exit(-1)
  79. }
  80. // 4 ------- 解析YAML内容 -------
  81. var newCloudConfig cloudConfig
  82. err = yaml.Unmarshal(content, &newCloudConfig)
  83. if err != nil {
  84. c_log.GlobalLogger.Error("程序崩溃,配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  85. os.Exit(-1)
  86. }
  87. // 5 ------- 校验 yaml -------
  88. if checkCloudConfig(newCloudConfig) {
  89. CloudConfigMutex.RLock()
  90. CloudConfig = newCloudConfig
  91. CloudConfigMutex.RUnlock()
  92. } else {
  93. c_log.GlobalLogger.Error("程序崩溃,配置文件格式错误:", newCloudConfig)
  94. os.Exit(-1)
  95. }
  96. c_log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
  97. util.CreateDir(CloudConfig.BagDataDir)
  98. util.CreateDir(CloudConfig.BagCopyDir)
  99. }
  100. // refreshCloudConfig 更新业务配置
  101. func refreshCloudConfig() {
  102. // 获取文件的目录
  103. _ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
  104. // 3 ------- 获取 yaml 字符串 -------
  105. var content []byte
  106. cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
  107. OssMutex.Lock()
  108. err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
  109. OssMutex.Unlock()
  110. if err != nil {
  111. c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
  112. //os.Exit(-1)
  113. }
  114. content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
  115. if err != nil {
  116. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
  117. os.Exit(-1)
  118. }
  119. // 4 ------- 解析YAML内容 -------
  120. var newCloudConfig cloudConfig
  121. err = yaml.Unmarshal(content, &newCloudConfig)
  122. if err != nil {
  123. c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
  124. os.Exit(-1)
  125. }
  126. // 5 ------- 校验 yaml -------
  127. if checkCloudConfig(newCloudConfig) {
  128. CloudConfigMutex.RLock()
  129. CloudConfig = newCloudConfig
  130. CloudConfigMutex.RUnlock()
  131. } else {
  132. c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
  133. os.Exit(-1)
  134. }
  135. util.CreateDir(CloudConfig.BagDataDir)
  136. util.CreateDir(CloudConfig.BagCopyDir)
  137. }
  138. // RefreshCloudConfig 轮询oss上的配置文件更新到本地
  139. func RefreshCloudConfig() {
  140. for {
  141. time.Sleep(time.Duration(CloudConfig.ConfigRefreshInterval) * time.Second)
  142. refreshCloudConfig()
  143. }
  144. }
  145. // CheckConfig 校验 cfg.yaml 文件
  146. func checkCloudConfig(check cloudConfig) bool {
  147. if len(check.Hosts) != 1 {
  148. c_log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为1。")
  149. os.Exit(-1)
  150. }
  151. return true
  152. }