孟令鑫 1 年之前
父节点
当前提交
5099c4df44

+ 8 - 8
kinglong/common/svc/kill_self.go

@@ -10,11 +10,11 @@ import (
 )
 
 var (
-	ChannelKillRosRecord  = make(chan int)
-	ChannelKillDiskClean  = make(chan int)
-	ChannelKillSubscriber = make(chan int)
-	ChannelKillMove       = make(chan int)
-	ChannelKillConsume    = make(chan int)
+	ChannelKillRosRecord      = make(chan int)
+	ChannelKillDiskClean      = make(chan int)
+	ChannelKillWindowProducer = make(chan int)
+	ChannelKillMove           = make(chan int)
+	ChannelKillConsume        = make(chan int)
 
 	KillChannel = 5
 	KillTimes   = 0
@@ -35,7 +35,7 @@ type KillService struct{}
 func (m *KillService) Kill(args *KillSignal, reply *int) error {
 	log.GlobalLogger.Info("接收到自杀信号:", *args)
 	ChannelKillRosRecord <- 1
-	ChannelKillSubscriber <- 1
+	ChannelKillWindowProducer <- 1
 	if args.DropUploadData == true {
 		// 3-1 等待上传结束再杀死
 		ChannelKillMove <- 1
@@ -79,9 +79,9 @@ func AddKillTimes(info string) {
 		KillTimes++
 		close(ChannelKillDiskClean)
 	case "3":
-		log.GlobalLogger.Infof("已杀死rosnode和ros订阅者goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		log.GlobalLogger.Infof("已杀死时间窗口生产者,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		KillTimes++
-		close(ChannelKillSubscriber)
+		close(ChannelKillWindowProducer)
 	case "4":
 		log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		KillTimes++

+ 1 - 0
kinglong/common/svc/rosbag_upload.go

@@ -29,6 +29,7 @@ outLoop:
 				AddKillTimes("5")
 				return
 			}
+		default:
 		}
 		if len(global.TimeWindowConsumerQueue) > 0 {
 			// 1 获取即将处理的窗口

+ 1 - 0
kinglong/master/pkg/svc/move_bag_and_send_window.go

@@ -29,6 +29,7 @@ func RunTimeWindowProducerQueue() {
 				commonService.AddKillTimes("4")
 				return
 			}
+		default:
 		}
 
 		time.Sleep(time.Duration(1) * time.Second)

+ 1 - 1
kinglong/master/pkg/svc/produce_window.go

@@ -226,7 +226,7 @@ func PrepareTimeWindowProducerQueue() {
 		os.Exit(-1)
 	}
 	select {
-	case signal := <-svc.ChannelKillSubscriber:
+	case signal := <-svc.ChannelKillWindowProducer:
 		if signal == 1 {
 			defer svc.AddKillTimes("3")
 			subscriber0.Close()

+ 13 - 0
kinglong/slave/pkg/svc/accept_window.go

@@ -2,7 +2,9 @@ package svc
 
 import (
 	"cicv-data-closedloop/kinglong/common/ent"
+	"cicv-data-closedloop/kinglong/common/global"
 	commonCfg "cicv-data-closedloop/kinglong/common/log"
+	"cicv-data-closedloop/kinglong/common/svc"
 	"cicv-data-closedloop/kinglong/common/util"
 	slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
 	"encoding/json"
@@ -16,6 +18,17 @@ var (
 func PrepareTimeWindowProducerQueue() {
 
 	for {
+
+		select {
+		case signal := <-svc.ChannelKillWindowProducer:
+			if signal == 1 {
+				if len(global.TimeWindowConsumerQueue) == 0 {
+					svc.AddKillTimes("3")
+					return
+				}
+			}
+		default:
+		}
 		// 等待新连接
 		conn, err := slaveConfig.TcpListener.Accept()
 		if err != nil {

+ 17 - 12
kinglong/slave/pkg/svc/move_bag.go

@@ -1,9 +1,10 @@
 package svc
 
 import (
-	commonCfg "cicv-data-closedloop/kinglong/common/cfg"
+	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
 	"cicv-data-closedloop/kinglong/common/global"
 	"cicv-data-closedloop/kinglong/common/log"
+	commonService "cicv-data-closedloop/kinglong/common/svc"
 	"cicv-data-closedloop/kinglong/common/util"
 	"time"
 )
@@ -12,16 +13,20 @@ import (
 func RunTimeWindowProducerQueue() {
 	log.GlobalLogger.Info("------- 生产者队列 - 启动 -------")
 	for { // 必须串行排队处理
-		//TODO 测试更新任务时放开
-		//select {
-		//case signal := <-commonSvc.ChannelKillTcp:
-		//	if signal == 1 {
-		//		defer commonSvc.AddKillTimes()
-		//		slaveCfg.TcpListener.Close()
-		//		return
-		//	}
-		//default: // 添加default语句块防止
-		//}
+		select {
+		case signal := <-commonService.ChannelKillMove:
+			if signal == 1 {
+				commonService.ChannelKillMove <- 1
+				if len(global.TimeWindowProducerQueue) == 0 {
+					commonService.AddKillTimes("4")
+					return
+				}
+			} else { //signal == 2
+				commonService.AddKillTimes("4")
+				return
+			}
+		default:
+		}
 		time.Sleep(time.Duration(1) * time.Second)
 		if len(global.TimeWindowProducerQueue) > 0 {
 			currentTimeWindow := global.TimeWindowProducerQueue[0]
@@ -35,7 +40,7 @@ func RunTimeWindowProducerQueue() {
 			}
 
 			// 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
-			bags := util.ListAbsolutePathWithSuffixAndSort(commonCfg.CloudConfig.BagDataDir, ".bag")
+			bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
 			for _, bag := range bags {
 				bagTime := util.GetBagTime(bag)
 				compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)

+ 1 - 0
pji/common/svc/rosbag_upload.go

@@ -29,6 +29,7 @@ outLoop:
 				AddKillTimes("5")
 				return
 			}
+		default:
 		}
 
 		if len(global.TimeWindowConsumerQueue) > 0 {

+ 1 - 0
pji/master/pkg/svc/move_bag_and_send_window.go

@@ -26,6 +26,7 @@ func RunTimeWindowProducerQueue() {
 				commonService.AddKillTimes("4")
 				return
 			}
+		default:
 		}
 
 		// 处理