control.go 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package main
  2. import (
  3. "cicv-data-closedloop/common/config/c_log"
  4. "cicv-data-closedloop/kinglong/common/cfg"
  5. "cicv-data-closedloop/kinglong/common/log"
  6. commonService "cicv-data-closedloop/kinglong/common/svc"
  7. controlConfig "cicv-data-closedloop/kinglong/control/pkg/cfg"
  8. commonConfig "cicv-data-closedloop/pji/common/cfg"
  9. "net/rpc"
  10. "time"
  11. )
  12. func init() {
  13. //runtime.GOMAXPROCS(1)
  14. // 初始化日志配置
  15. log.InitLogConfig()
  16. // 初始化本地配置文件(第1处配置,在本地文件)
  17. cfg.InitLocalConfig()
  18. // 初始化Oss连接信息
  19. cfg.InitOssConfig()
  20. // 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
  21. cfg.InitCloudConfig()
  22. // 首先初始化平台配置。
  23. cfg.InitPlatformConfig()
  24. // 初始化rpc客户端,用于杀死旧的采集程序
  25. controlConfig.InitRpcClient()
  26. }
  27. func main() {
  28. // 轮询任务接口判断是否有更新
  29. for {
  30. time.Sleep(time.Duration(1) * time.Second)
  31. // 1 获取当前设备的任务的 status
  32. status, err := cfg.GetStatus(cfg.PlatformConfig.TaskConfigId)
  33. if err != nil {
  34. log.GlobalLogger.Error("获取配置status失败:", err)
  35. continue
  36. }
  37. // 2 判断 status
  38. // UN_CHANGE 没有新的任务,无需更改
  39. // CHANGE 有新的任务,需要杀死旧的任务并重启
  40. // NONE 设备没有配置任务,需要杀死旧的任务
  41. if status == "UN_CHANGE" {
  42. continue
  43. } else if status == "CHANGE" || status == "NONE" {
  44. // 发送rpc信号杀死两个服务,并重启程序
  45. var masterArgs *commonService.KillSignal
  46. var slaveArgs *commonService.KillSignal
  47. if status == "CHANGE" {
  48. masterArgs = &commonService.KillSignal{NodeName: "master", DropUploadData: cfg.PlatformConfig.DropUploadData, Restart: true}
  49. c_log.GlobalLogger.Info("更新任务,发送rpc重启信号到主节点:", masterArgs)
  50. slaveArgs = &commonService.KillSignal{NodeName: "slave", DropUploadData: cfg.PlatformConfig.DropUploadData, Restart: true}
  51. c_log.GlobalLogger.Info("更新任务,发送rpc重启信号到从节点:", slaveArgs)
  52. }
  53. if status == "NONE" {
  54. masterArgs = &commonService.KillSignal{NodeName: "master", DropUploadData: cfg.PlatformConfig.DropUploadData, Restart: false}
  55. c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号到主节点:", masterArgs)
  56. slaveArgs = &commonService.KillSignal{NodeName: "slave", DropUploadData: cfg.PlatformConfig.DropUploadData, Restart: false}
  57. c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号到从节点:", slaveArgs)
  58. }
  59. KillRpcClientMaster, err := rpc.Dial("tcp", commonConfig.CloudConfig.Hosts[0].Ip+":"+commonConfig.CloudConfig.RpcPort)
  60. if err != nil {
  61. log.GlobalLogger.Error("创建rpc连接master失败:", err)
  62. KillRpcClientMaster.Close()
  63. continue
  64. }
  65. KillRpcClientSlave, err := rpc.Dial("tcp", commonConfig.CloudConfig.Hosts[1].Ip+":"+commonConfig.CloudConfig.RpcPort)
  66. if err != nil {
  67. log.GlobalLogger.Error("创建rpc连接slave失败:", err)
  68. KillRpcClientSlave.Close()
  69. continue
  70. }
  71. reply := 0
  72. if err = KillRpcClientMaster.Call("KillService.Kill", masterArgs, &reply); err != nil {
  73. log.GlobalLogger.Error("发送rpc请求到master失败:", err)
  74. KillRpcClientMaster.Close()
  75. continue
  76. }
  77. if err = KillRpcClientSlave.Call("KillService.Kill", slaveArgs, &reply); err != nil {
  78. log.GlobalLogger.Error("发送rpc请求到slave失败:", err)
  79. KillRpcClientSlave.Close()
  80. continue
  81. }
  82. // 获取一下最新配置
  83. cfg.InitPlatformConfig()
  84. KillRpcClientMaster.Close()
  85. KillRpcClientSlave.Close()
  86. } else {
  87. log.GlobalLogger.Error("未知的采集任务状态。status=", status)
  88. }
  89. }
  90. }