kill_self.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package svc
  2. import (
  3. "cicv-data-closedloop/kinglong/common/cfg"
  4. "cicv-data-closedloop/kinglong/common/log"
  5. "cicv-data-closedloop/kinglong/common/util"
  6. commonConfig "cicv-data-closedloop/pji/common/cfg"
  7. "net/rpc"
  8. "os"
  9. "sync"
  10. "time"
  11. )
  12. var (
  13. ChannelKillRosRecord = make(chan int)
  14. ChannelKillDiskClean = make(chan int)
  15. ChannelKillWindowProducer = make(chan int)
  16. ChannelKillMove = make(chan int)
  17. ChannelKillConsume = make(chan int)
  18. KillChannel = 5
  19. KillTimes = 0
  20. MutexKill sync.Mutex
  21. )
  22. // KillSignal 停止信号,主从节点接收到数据后准备重启
  23. type KillSignal struct {
  24. NodeName string
  25. DropUploadData bool
  26. Restart bool
  27. }
  28. // KillService 定义要远程调用的类型和方法
  29. type KillService struct{}
  30. // Kill 杀死自身程序,通过通道实现 方法必须满足RPC规范:函数有两个参数,第一个参数是请求,第二个是响应
  31. func (m *KillService) Kill(args *KillSignal, reply *int) error {
  32. log.GlobalLogger.Info("接收到自杀信号:", *args)
  33. // 1 杀死 rosbag record 命令
  34. ChannelKillRosRecord <- 1
  35. // 2 杀死所有 ros 订阅者
  36. ChannelKillWindowProducer <- 1
  37. if args.DropUploadData == true {
  38. // 3-1 等待上传结束再杀死
  39. ChannelKillMove <- 1
  40. ChannelKillConsume <- 1
  41. } else {
  42. // 3-2 直接杀死
  43. ChannelKillMove <- 2
  44. ChannelKillConsume <- 2
  45. }
  46. go killDone(args.Restart)
  47. return nil
  48. }
  49. func WaitKillSelf() {
  50. killService := new(KillService)
  51. if err := rpc.Register(killService); err != nil {
  52. log.GlobalLogger.Error("注册rpc服务失败:", err)
  53. return
  54. }
  55. // 等待并处理远程调用请求
  56. for {
  57. conn, err := cfg.KillSignalListener.Accept()
  58. if err != nil {
  59. continue
  60. }
  61. go rpc.ServeConn(conn)
  62. }
  63. }
  64. func AddKillTimes(info string) {
  65. MutexKill.Lock()
  66. switch info {
  67. case "1":
  68. close(ChannelKillRosRecord)
  69. KillTimes++
  70. log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  71. ChannelKillDiskClean <- 1
  72. case "2":
  73. close(ChannelKillDiskClean)
  74. KillTimes++
  75. log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  76. case "3":
  77. close(ChannelKillWindowProducer)
  78. KillTimes++
  79. log.GlobalLogger.Infof("已杀死时间窗口生产者,当前自杀进度 %v / %v", KillTimes, KillChannel)
  80. case "4":
  81. close(ChannelKillMove)
  82. KillTimes++
  83. log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  84. case "5":
  85. close(ChannelKillConsume)
  86. KillTimes++
  87. log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  88. }
  89. MutexKill.Unlock()
  90. }
  91. func killDone(restart bool) {
  92. for {
  93. time.Sleep(time.Duration(1) * time.Second)
  94. if KillChannel == KillTimes {
  95. if restart {
  96. _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...)
  97. if err != nil {
  98. log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
  99. os.Exit(-1)
  100. }
  101. log.GlobalLogger.Info("数据采集任务更新,正常退出当前程序。")
  102. } else {
  103. log.GlobalLogger.Info("数据采集任务终止,正常退出当前程序。")
  104. }
  105. os.Exit(0)
  106. }
  107. }
  108. }