孟令鑫 1 年之前
父节点
当前提交
77589ac097
共有 2 个文件被更改,包括 48 次插入108 次删除
  1. 44 104
      aarch64/pjisuv/master/package/service/produce_window.go
  2. 4 4
      common/entity/time_window.go

+ 44 - 104
aarch64/pjisuv/master/package/service/produce_window.go

@@ -29,75 +29,44 @@ var (
 // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
 func PrepareTimeWindowProducerQueue() {
 	c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
-
-	//创建订阅者0订阅主题 nodefault_info
-	c_log.GlobalLogger.Info("创建订阅者0订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
-	subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfNodeFaultInfo,
-		Callback: func(data *pjisuv_msgs.FaultInfo) {
-			if len(masterConfig.RuleOfNodefaultInfo) == 0 {
-				//c_log.GlobalLogger.Info("话题 nodefault_info没有触发器")
-				return
-			}
-			entity.Subscriber0TimeMutex.Lock()
-			// 判断是否是连续故障码
-			gap := time.Since(entity.Subscriber0Time).Seconds()
-			if gap < 2 {
-				entity.Subscriber0Time = time.Now()
-				entity.Subscriber0TimeMutex.Unlock()
-				return
-			} else {
-				// 2 不是连续故障码
-				entity.Subscriber0Time = time.Now()
-				entity.Subscriber0TimeMutex.Unlock()
-				subscriber0Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfNodefaultInfo {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						break
-					}
-				}
-				subscriber0Mutex.Unlock()
-			}
-
-		}})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者0发生故障:", err)
-		os.Exit(-1)
-	}
 	// 创建订阅者1订阅主题 cicv_location
 	c_log.GlobalLogger.Info("创建订阅者1订阅话题 ", masterConfig.TopicOfCicvLocation)
 	subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfCicvLocation,
 		Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
-			if len(masterConfig.RuleOfCicvLocation) == 0 {
-				c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
-				return
-			}
-			subscriber1Mutex.Lock()
-			faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-			lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
 			// 更新共享变量
 			m.RLock()
 			velocityX = data.VelocityX
 			velocityY = data.VelocityY
 			yaw = data.Yaw
 			m.RUnlock()
-			var faultLabel string
-			for _, f := range masterConfig.RuleOfCicvLocation {
-				faultLabel = f(data)
-				if faultLabel != "" {
-					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-					break
+
+			if len(masterConfig.RuleOfCicvLocation) == 0 {
+				c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
+				return
+			}
+			// 判断是否是连续故障码
+			entity.Subscriber1TimeMutex.Lock()
+			gap := time.Since(entity.Subscriber1Time).Seconds()
+			if gap > 1 {
+				subscriber1Mutex.Lock()
+				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+
+				// 规则判断
+				var faultLabel string
+				for _, f := range masterConfig.RuleOfCicvLocation {
+					faultLabel = f(data)
+					if faultLabel != "" {
+						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+						entity.Subscriber1Time = time.Now()
+						break
+					}
 				}
+				subscriber1Mutex.Unlock()
 			}
-			subscriber1Mutex.Unlock()
+			entity.Subscriber1TimeMutex.Unlock()
 		},
 	})
 	if err != nil {
@@ -114,16 +83,17 @@ func PrepareTimeWindowProducerQueue() {
 				c_log.GlobalLogger.Info("话题 tpperception 没有触发器")
 				return
 			}
-			entity.Subscriber2TimeMutex.Lock()
 			// 判断是否是连续故障码
+			entity.Subscriber2TimeMutex.Lock()
 			gap := time.Since(entity.Subscriber2Time).Seconds()
 			if gap > 1 {
 				// 2 不是连续故障码
 				subscriber2Mutex.Lock()
 				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
 				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				var faultLabel string
 				//c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
+				// 规则判断
+				var faultLabel string
 				for _, f := range masterConfig.RuleOfTpperception {
 					faultLabel = f(data, velocityX, velocityY, yaw)
 					if faultLabel != "" {
@@ -140,65 +110,35 @@ func PrepareTimeWindowProducerQueue() {
 		c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
 		os.Exit(-1)
 	}
-	// 创建订阅者3订阅主题 fault_info
-	c_log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo)
-	subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
+	// 创建订阅者4订阅主题 data_read
+	c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
+	subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfFaultInfo,
-		Callback: func(data *pjisuv_msgs.FaultVec) {
-			if len(masterConfig.RuleOfFaultInfo) == 0 {
-				c_log.GlobalLogger.Info("话题 fault_info 没有触发器")
+		Topic: masterConfig.TopicOfDataRead,
+		Callback: func(data *pjisuv_msgs.Retrieval) {
+			if len(masterConfig.RuleOfDataRead) == 0 {
+				//c_log.GlobalLogger.Info("话题 data_read 没有触发器")
 				return
 			}
-			entity.Subscriber3TimeMutex.Lock()
 			// 判断是否是连续故障码
-			gap := time.Since(entity.Subscriber3Time).Seconds()
-			if gap > 2 {
-				// 2 不是连续故障码
-				entity.Subscriber3Time = time.Now()
-				entity.Subscriber3TimeMutex.Unlock()
-				subscriber3Mutex.Lock()
+			entity.Subscriber4TimeMutex.Lock()
+			gap := time.Since(entity.Subscriber4Time).Seconds()
+			if gap > 1 {
+				subscriber4Mutex.Lock()
 				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
 				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
 				var faultLabel string
-				for _, f := range masterConfig.RuleOfFaultInfo {
+				for _, f := range masterConfig.RuleOfDataRead {
 					faultLabel = f(data)
 					if faultLabel != "" {
 						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+						entity.Subscriber4Time = time.Now()
 						break
 					}
 				}
-				subscriber3Mutex.Unlock()
-			}
-			entity.Subscriber3TimeMutex.Unlock()
-		}})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
-		os.Exit(-1)
-	}
-	// 创建订阅者4订阅主题 data_read
-	// TODO 高频率触发
-	c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
-	subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfDataRead,
-		Callback: func(data *pjisuv_msgs.Retrieval) {
-			if len(masterConfig.RuleOfDataRead) == 0 {
-				//c_log.GlobalLogger.Info("话题 data_read 没有触发器")
-				return
-			}
-			subscriber4Mutex.Lock()
-			faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-			lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-			var faultLabel string
-			for _, f := range masterConfig.RuleOfDataRead {
-				faultLabel = f(data)
-				if faultLabel != "" {
-					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-					break
-				}
+				subscriber4Mutex.Unlock()
 			}
-			subscriber4Mutex.Unlock()
+			entity.Subscriber4TimeMutex.Unlock()
 		},
 	})
 	if err != nil {
@@ -209,10 +149,10 @@ func PrepareTimeWindowProducerQueue() {
 	case signal := <-service.ChannelKillWindowProducer:
 		if signal == 1 {
 			defer service.AddKillTimes("3")
-			subscriber0.Close()
+			//subscriber0.Close()
 			subscriber1.Close()
 			subscriber2.Close()
-			subscriber3.Close()
+			//subscriber3.Close()
 			subscriber4.Close()
 			commonConfig.RosNode.Close()
 			return

+ 4 - 4
common/entity/time_window.go

@@ -13,12 +13,12 @@ var (
 	TimeWindowConsumerQueue      []TimeWindow
 	TimeWindowConsumerQueueMutex sync.RWMutex
 
-	Subscriber0Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber0TimeMutex sync.Mutex
+	Subscriber1Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber1TimeMutex sync.Mutex
 	Subscriber2Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
 	Subscriber2TimeMutex sync.Mutex
-	Subscriber3Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber3TimeMutex sync.Mutex
+	Subscriber4Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber4TimeMutex sync.Mutex
 
 	TcpSendTime      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
 	TcpSendTimeMutex sync.Mutex