kill_self.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package svc
  2. import (
  3. "cicv-data-closedloop/kinglong/common/cfg"
  4. "cicv-data-closedloop/kinglong/common/log"
  5. "cicv-data-closedloop/kinglong/common/util"
  6. "net/rpc"
  7. "os"
  8. "sync"
  9. )
  10. var (
  11. ChannelKillRosRecord = make(chan int)
  12. ChannelKillDiskClean = make(chan int)
  13. ChannelKillSubscriber = make(chan int)
  14. ChannelKillMove = make(chan int)
  15. ChannelKillConsume = make(chan int)
  16. KillChannel = 5
  17. KillTimes = 0
  18. MutexKill sync.Mutex
  19. )
  20. // KillSignal 停止信号,主从节点接收到数据后准备重启
  21. type KillSignal struct {
  22. NodeName string
  23. DropUploadData bool
  24. Restart bool
  25. }
  26. // KillService 定义要远程调用的类型和方法
  27. type KillService struct{}
  28. // Kill 杀死自身程序,通过通道实现 方法必须满足RPC规范:函数有两个参数,第一个参数是请求,第二个是响应
  29. func (m *KillService) Kill(args *KillSignal, reply *int) error {
  30. log.GlobalLogger.Info("接收到自杀信号:", *args)
  31. ChannelKillRosRecord <- 1
  32. ChannelKillSubscriber <- 1
  33. if args.DropUploadData == true {
  34. // 3-1 等待上传结束再杀死
  35. ChannelKillMove <- 1
  36. ChannelKillConsume <- 1
  37. } else {
  38. // 3-2 直接杀死
  39. ChannelKillMove <- 2
  40. ChannelKillConsume <- 2
  41. }
  42. go killDone()
  43. return nil
  44. }
  45. func WaitKillSelf() {
  46. killService := new(KillService)
  47. if err := rpc.Register(killService); err != nil {
  48. log.GlobalLogger.Error("注册rpc服务失败:", err)
  49. return
  50. }
  51. // 等待并处理远程调用请求
  52. for {
  53. conn, err := cfg.KillSignalListener.Accept()
  54. if err != nil {
  55. continue
  56. }
  57. go rpc.ServeConn(conn)
  58. }
  59. }
  60. func AddKillTimes(info string) {
  61. MutexKill.Lock()
  62. switch info {
  63. case "1":
  64. log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  65. ChannelKillDiskClean <- 1
  66. KillTimes++
  67. close(ChannelKillRosRecord)
  68. case "2":
  69. log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  70. KillTimes++
  71. close(ChannelKillDiskClean)
  72. case "3":
  73. log.GlobalLogger.Infof("已杀死rosnode和ros订阅者goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  74. KillTimes++
  75. close(ChannelKillSubscriber)
  76. case "4":
  77. log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  78. KillTimes++
  79. close(ChannelKillMove)
  80. case "5":
  81. log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
  82. KillTimes++
  83. close(ChannelKillConsume)
  84. }
  85. MutexKill.Unlock()
  86. }
  87. func killDone() {
  88. log.GlobalLogger.Infof("自杀完毕,启动新的程序。")
  89. if KillChannel == KillTimes {
  90. if _, err := util.ExecuteWithPath(cfg.LocalConfig.RestartDir, cfg.LocalConfig.RestartCmd); err != nil {
  91. log.GlobalLogger.Error("启动新的程序失败:", err)
  92. }
  93. os.Exit(0)
  94. }
  95. }