孟令鑫 1 年之前
父節點
當前提交
cee3ddfad0

+ 0 - 1
kinglong/master/pkg/svc/move_bag_and_send_window.go

@@ -17,7 +17,6 @@ func RunTimeWindowProducerQueue() {
 	log.GlobalLogger.Info("生产者队列goroutine - 启动")
 	for {
 		// 收到自杀信号
-		// TODO 此处也应该清理TCP
 		select {
 		case signal := <-commonService.ChannelKillMove:
 			if signal == 1 {

+ 41 - 35
kinglong/slave/pkg/svc/accept_window.go

@@ -2,56 +2,62 @@ package svc
 
 import (
 	"cicv-data-closedloop/kinglong/common/ent"
-	"cicv-data-closedloop/kinglong/common/global"
 	commonCfg "cicv-data-closedloop/kinglong/common/log"
 	"cicv-data-closedloop/kinglong/common/svc"
 	"cicv-data-closedloop/kinglong/common/util"
 	slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
+	"context"
 	"encoding/json"
 	"sync"
 )
 
-var (
-	prepareTimeWindowProducerQueueMutex sync.Mutex
-)
-
 func PrepareTimeWindowProducerQueue() {
-
-	for {
-
+	var prepareTimeWindowProducerQueueMutex sync.Mutex
+	ctx, cancel := context.WithCancel(context.Background())
+	// 处理退出信号
+	go func() {
 		select {
 		case signal := <-svc.ChannelKillWindowProducer:
 			if signal == 1 {
-				if len(global.TimeWindowConsumerQueue) == 0 {
-					svc.AddKillTimes("3")
+				cancel()
+				slaveConfig.TcpListener.Close()
+				svc.AddKillTimes("3")
+				return
+			}
+		}
+	}()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			conn, err := slaveConfig.TcpListener.Accept()
+			if err != nil {
+				select {
+				case <-ctx.Done():
 					return
+				default:
+					commonCfg.GlobalLogger.Error("接受连接错误:", err)
+					continue
 				}
 			}
-		default:
-		}
-		// 等待新连接
-		conn, err := slaveConfig.TcpListener.Accept()
-		if err != nil {
-			commonCfg.GlobalLogger.Error("接受连接错误:", err)
-			continue
-		}
-		prepareTimeWindowProducerQueueMutex.Lock()
-		// 接收数据
-		buffer := make([]byte, 2048)
-		total, err := conn.Read(buffer)
-		if err != nil {
-			commonCfg.GlobalLogger.Error("读取数据错误:", err)
-			continue
-		}
-		// 将JSON转换为结构体
-		var timeWindow ent.TimeWindow
-		err = json.Unmarshal(buffer[:total], &timeWindow)
-		if err != nil {
-			commonCfg.GlobalLogger.Error("解析Json时出错:", err)
-			continue
+			prepareTimeWindowProducerQueueMutex.Lock()
+			buffer := make([]byte, 2048)
+			total, err := conn.Read(buffer)
+			if err != nil {
+				commonCfg.GlobalLogger.Error("读取数据错误:", err)
+				continue
+			}
+			var timeWindow ent.TimeWindow
+			err = json.Unmarshal(buffer[:total], &timeWindow)
+			if err != nil {
+				commonCfg.GlobalLogger.Error("解析Json时出错:", err)
+				continue
+			}
+			util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
+			prepareTimeWindowProducerQueueMutex.Unlock()
 		}
-		//commonCfg.GlobalLogger.Info("接收到时间窗口:",timeWindow)
-		util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
-		prepareTimeWindowProducerQueueMutex.Unlock()
+
 	}
 }

+ 1 - 1
kinglong/slave/pkg/svc/move_bag.go

@@ -11,7 +11,7 @@ import (
 
 // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
 func RunTimeWindowProducerQueue() {
-	log.GlobalLogger.Info("------- 生产者队列 - 启动 -------")
+	log.GlobalLogger.Info("生产者队列 - 启动")
 	for { // 必须串行排队处理
 		select {
 		case signal := <-commonService.ChannelKillMove: