孟令鑫 1 年之前
父节点
当前提交
57898ec282
共有 2 个文件被更改,包括 62 次插入59 次删除
  1. 7 0
      kinglong/common/global/global.go
  2. 55 59
      kinglong/master/pkg/svc/produce_window.go

+ 7 - 0
kinglong/common/global/global.go

@@ -15,10 +15,17 @@ var (
 
 	Subscriber0Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
 	Subscriber0TimeMutex sync.Mutex
+
+	Subscriber1Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber1TimeMutex sync.Mutex
 	Subscriber2Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
 	Subscriber2TimeMutex sync.Mutex
 	Subscriber3Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
 	Subscriber3TimeMutex sync.Mutex
+	Subscriber4Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber4TimeMutex sync.Mutex
+	Subscriber5Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber5TimeMutex sync.Mutex
 
 	TcpSendTime      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
 	TcpSendTimeMutex sync.Mutex

+ 55 - 59
kinglong/master/pkg/svc/produce_window.go

@@ -28,12 +28,11 @@ var (
 	yaw              float64
 )
 
-// TODO 参考pjisuv修改
 // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
 func PrepareTimeWindowProducerQueue() {
 	log.GlobalLogger.Info("订阅者 goroutine,启动。")
 	//创建订阅者0订阅主题 nodefault_info
-	log.GlobalLogger.Info("创建订阅者0订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
+	log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
 	subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfNodeFaultInfo,
@@ -43,73 +42,69 @@ func PrepareTimeWindowProducerQueue() {
 				return
 			}
 			global.Subscriber0TimeMutex.Lock()
-			// 判断是否是连续故障码
-			gap := time.Since(global.Subscriber0Time).Seconds()
-			if gap < 2 {
-				global.Subscriber0Time = time.Now()
-				global.Subscriber0TimeMutex.Unlock()
-				return
-			} else {
-				// 2 不是连续故障码
-				global.Subscriber0Time = time.Now()
+			if time.Since(global.Subscriber0Time).Seconds() > 1 {
 				global.Subscriber0TimeMutex.Unlock()
 				subscriber0Mutex.Lock()
-				{
-					faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-					lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
-					var faultLabel string
-					for _, f := range masterConfig.RuleOfNodefaultInfo {
-						faultLabel = f(data)
-						if faultLabel != "" {
-							saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-							break
-						}
+				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
+				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
+				var faultLabel string
+				for _, f := range masterConfig.RuleOfNodefaultInfo {
+					faultLabel = f(data)
+					if faultLabel != "" {
+						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+						global.Subscriber0Time = time.Now()
+						break
 					}
 				}
 				subscriber0Mutex.Unlock()
 			}
-
+			global.Subscriber0TimeMutex.Unlock()
 		}})
 	if err != nil {
-		log.GlobalLogger.Info("创建订阅者0发生故障:", err)
+		log.GlobalLogger.Info("创建订阅者发生故障:", err)
 		os.Exit(-1)
 	}
 	// 创建订阅者1订阅主题 cicv_location
-	log.GlobalLogger.Info("创建订阅者1订阅话题 ", masterConfig.TopicOfCicvLocation)
+	log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
 	subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfCicvLocation,
 		Callback: func(data *kinglong_msgs.PerceptionLocalization) {
-			if len(masterConfig.RuleOfCicvLocation) == 0 {
-				log.GlobalLogger.Info("话题 cicv_location 没有触发器")
-				return
-			}
-			subscriber1Mutex.Lock()
-			faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-			lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
-			// 更新共享变量
 			m.RLock()
 			velocityX = data.VelocityX
 			velocityY = data.VelocityY
 			yaw = data.Yaw
 			m.RUnlock()
-			var faultLabel string
-			for _, f := range masterConfig.RuleOfCicvLocation {
-				faultLabel = f(data)
-				if faultLabel != "" {
-					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-					break
+
+			if len(masterConfig.RuleOfCicvLocation) == 0 {
+				log.GlobalLogger.Info("话题 cicv_location 没有触发器")
+				return
+			}
+			global.Subscriber1TimeMutex.Lock()
+			if time.Since(global.Subscriber1Time).Seconds() > 1 {
+				subscriber1Mutex.Lock()
+				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
+				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
+				// 更新共享变量
+				var faultLabel string
+				for _, f := range masterConfig.RuleOfCicvLocation {
+					faultLabel = f(data)
+					if faultLabel != "" {
+						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+						global.Subscriber1Time = time.Now()
+						break
+					}
 				}
+				subscriber1Mutex.Unlock()
 			}
-			subscriber1Mutex.Unlock()
+			global.Subscriber1TimeMutex.Unlock()
 		},
 	})
 	if err != nil {
-		log.GlobalLogger.Info("创建订阅者1发生故障:", err)
+		log.GlobalLogger.Info("创建订阅者发生故障:", err)
 		os.Exit(-1)
 	}
-	// 创建订阅者2订阅主题 tpperception
-	log.GlobalLogger.Info("创建订阅者2订阅话题 ", masterConfig.TopicOfTpperception)
+	log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
 	subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfTpperception,
@@ -120,8 +115,7 @@ func PrepareTimeWindowProducerQueue() {
 			}
 			global.Subscriber2TimeMutex.Lock()
 			// 判断是否是连续故障码
-			gap := time.Since(global.Subscriber2Time).Seconds()
-			if gap > 1 {
+			if time.Since(global.Subscriber2Time).Seconds() > 1 {
 				// 2 不是连续故障码
 				subscriber2Mutex.Lock()
 				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
@@ -155,12 +149,8 @@ func PrepareTimeWindowProducerQueue() {
 				return
 			}
 			global.Subscriber3TimeMutex.Lock()
-			// 判断是否是连续故障码
-			gap := time.Since(global.Subscriber3Time).Seconds()
-			if gap > 2 {
+			if time.Since(global.Subscriber3Time).Seconds() > 1 {
 				// 2 不是连续故障码
-				global.Subscriber3Time = time.Now()
-				global.Subscriber3TimeMutex.Unlock()
 				subscriber3Mutex.Lock()
 				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
 				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
@@ -169,6 +159,7 @@ func PrepareTimeWindowProducerQueue() {
 					faultLabel = f(data)
 					if faultLabel != "" {
 						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+						global.Subscriber3Time = time.Now()
 						break
 					}
 				}
@@ -191,18 +182,23 @@ func PrepareTimeWindowProducerQueue() {
 				//log.GlobalLogger.Info("话题 data_read 没有触发器")
 				return
 			}
-			subscriber4Mutex.Lock()
-			faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-			lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
-			var faultLabel string
-			for _, f := range masterConfig.RuleOfDataRead {
-				faultLabel = f(data)
-				if faultLabel != "" {
-					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-					break
+			global.Subscriber4TimeMutex.Lock()
+			if time.Since(global.Subscriber4Time).Seconds() > 1 {
+				subscriber4Mutex.Lock()
+				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
+				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
+				var faultLabel string
+				for _, f := range masterConfig.RuleOfDataRead {
+					faultLabel = f(data)
+					if faultLabel != "" {
+						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+						global.Subscriber4Time = time.Now()
+						break
+					}
 				}
+				subscriber4Mutex.Unlock()
 			}
-			subscriber4Mutex.Unlock()
+			global.Subscriber4TimeMutex.Unlock()
 		},
 	})
 	if err != nil {