kill_self.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package svc
  2. import (
  3. "cicv-data-closedloop/kinglong/common/cfg"
  4. "cicv-data-closedloop/kinglong/common/log"
  5. "cicv-data-closedloop/kinglong/common/util"
  6. "net/rpc"
  7. "os"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. ChannelKillRosRecord = make(chan int)
  13. ChannelKillDiskClean = make(chan int)
  14. ChannelKillWindowProducer = make(chan int)
  15. ChannelKillMove = make(chan int)
  16. ChannelKillConsume = make(chan int)
  17. KillChannel = 5
  18. KillTimes = 0
  19. MutexKill sync.Mutex
  20. )
  21. // KillSignal 停止信号,主从节点接收到数据后准备重启
  22. type KillSignal struct {
  23. NodeName string
  24. DropUploadData bool
  25. Restart bool
  26. }
  27. // KillService 定义要远程调用的类型和方法
  28. type KillService struct{}
  29. // Kill 杀死自身程序,通过通道实现 方法必须满足RPC规范:函数有两个参数,第一个参数是请求,第二个是响应
  30. func (m *KillService) Kill(args *KillSignal, reply *int) error {
  31. log.GlobalLogger.Info("接收到自杀信号:", *args)
  32. // 1 杀死 rosbag record 命令
  33. ChannelKillRosRecord <- 1
  34. // 2 杀死所有 ros 订阅者
  35. ChannelKillWindowProducer <- 1
  36. if args.DropUploadData == true {
  37. // 3-1 等待上传结束再杀死
  38. ChannelKillMove <- 1
  39. ChannelKillConsume <- 1
  40. } else {
  41. // 3-2 直接杀死
  42. ChannelKillMove <- 2
  43. ChannelKillConsume <- 2
  44. }
  45. go killDone(args.Restart)
  46. return nil
  47. }
  48. func WaitKillSelf() {
  49. killService := new(KillService)
  50. if err := rpc.Register(killService); err != nil {
  51. log.GlobalLogger.Error("注册rpc服务失败:", err)
  52. return
  53. }
  54. // 等待并处理远程调用请求
  55. for {
  56. conn, err := cfg.KillSignalListener.Accept()
  57. if err != nil {
  58. continue
  59. }
  60. go rpc.ServeConn(conn)
  61. }
  62. }
  63. func AddKillTimes(info string) {
  64. MutexKill.Lock()
  65. switch info {
  66. case "1":
  67. close(ChannelKillRosRecord)
  68. KillTimes++
  69. log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  70. ChannelKillDiskClean <- 1
  71. case "2":
  72. close(ChannelKillDiskClean)
  73. KillTimes++
  74. log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  75. case "3":
  76. close(ChannelKillWindowProducer)
  77. KillTimes++
  78. log.GlobalLogger.Infof("已杀死时间窗口生产者,当前自杀进度 %v / %v", KillTimes, KillChannel)
  79. case "4":
  80. close(ChannelKillMove)
  81. KillTimes++
  82. log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  83. case "5":
  84. close(ChannelKillConsume)
  85. KillTimes++
  86. log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  87. }
  88. MutexKill.Unlock()
  89. }
  90. func killDone(restart bool) {
  91. for {
  92. time.Sleep(time.Duration(1) * time.Second)
  93. if KillChannel == KillTimes {
  94. if restart {
  95. _, err := util.ExecuteWithPath(cfg.LocalConfig.RestartCmd.Dir, cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args...)
  96. if err != nil {
  97. log.GlobalLogger.Info("启动新程序失败,【path】=", cfg.LocalConfig.RestartCmd.Dir, "【cmd】=", cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args, ":", err)
  98. os.Exit(-1)
  99. }
  100. log.GlobalLogger.Info("数据采集任务更新,正常退出当前程序。")
  101. } else {
  102. log.GlobalLogger.Info("数据采集任务终止,正常退出当前程序。")
  103. }
  104. os.Exit(0)
  105. }
  106. }
  107. }