孟令鑫 1 год назад
Родитель
Сommit
7d0df251d0

+ 21 - 2
aarch64/kinglong/common/config/c_platform.go

@@ -4,6 +4,7 @@ import (
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
 	"encoding/json"
+	"strings"
 	"time"
 )
 
@@ -22,6 +23,7 @@ type platformConfig struct {
 	TaskBeforeTime  int           `json:"taskBeforeTime"`
 	TaskAfterTime   int           `json:"taskAfterTime"`
 	TaskCachePolicy string        `json:"taskCachePolicy"`
+	EquipmentTopic  string        `json:"equipmentTopic"` // topic序列
 	Lru             []string      `json:"LRU"`
 	TaskTriggers    []taskTrigger `json:"taskTriggers"`
 }
@@ -34,7 +36,10 @@ type response struct {
 	NowTime string         `json:"nowTime"`
 }
 
-var PlatformConfig platformConfig
+var (
+	PlatformConfig  platformConfig
+	SubscribeTopics []string
+)
 
 // InitPlatformConfig 初始化数据闭环平台的配置
 func InitPlatformConfig() {
@@ -49,7 +54,14 @@ func InitPlatformConfig() {
 			c_log.GlobalLogger.Error("获取配置status失败:", err)
 			continue
 		}
-		break
+		if checkPlatformConfig() {
+			SubscribeTopics = strings.Split(PlatformConfig.EquipmentTopic, ",")
+			// 去掉首尾空格
+			for i, topic := range SubscribeTopics {
+				SubscribeTopics[i] = strings.TrimSpace(topic)
+			}
+			break
+		}
 	}
 	c_log.GlobalLogger.Info("获取数据闭环平台配置 - 成功。")
 }
@@ -167,3 +179,10 @@ func getConfig() (platformConfig, error) {
 	}
 	return result.Data, nil
 }
+func checkPlatformConfig() bool {
+	if PlatformConfig.EquipmentTopic == "" {
+		c_log.GlobalLogger.Error("数据闭环平台没有配置topic序列。")
+		return false
+	}
+	return true
+}

+ 278 - 174
aarch64/kinglong/master/package/service/produce_window.go

@@ -9,7 +9,6 @@ import (
 	"cicv-data-closedloop/common/util"
 	"cicv-data-closedloop/kinglong_msgs"
 	"github.com/bluenviron/goroslib/v2"
-	"os"
 	"sync"
 	"time"
 )
@@ -29,190 +28,295 @@ var (
 // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
 func PrepareTimeWindowProducerQueue() {
 	c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
-	//创建订阅者0订阅主题 nodefault_info
-	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
-	subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfNodeFaultInfo,
-		Callback: func(data *kinglong_msgs.FaultInfo) {
-			if len(masterConfig.RuleOfNodefaultInfo) == 0 {
-				//c_log.GlobalLogger.Info("话题 nodefault_info没有触发器")
-				return
-			}
-			entity.Subscriber0TimeMutex.Lock()
-			if time.Since(entity.Subscriber0Time).Seconds() > 1 {
-				entity.Subscriber0TimeMutex.Unlock()
-				subscriber0Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfNodefaultInfo {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						entity.Subscriber0Time = time.Now()
-						break
+
+	var err error
+	subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
+	subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
+	subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
+	subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
+	for i, topic := range commonConfig.SubscribeTopics {
+		c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
+		if topic == masterConfig.TopicOfCicvLocation {
+			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+				Node:  commonConfig.RosNode,
+				Topic: topic,
+				Callback: func(data *kinglong_msgs.PerceptionLocalization) {
+					// 更新共享变量
+					m.RLock()
+					velocityX = data.VelocityX
+					velocityY = data.VelocityY
+					yaw = data.Yaw
+					m.RUnlock()
+
+					if len(masterConfig.TopicOfCicvLocation) == 0 {
+						c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
+						return
 					}
-				}
-				subscriber0Mutex.Unlock()
-			}
-			entity.Subscriber0TimeMutex.Unlock()
-		}})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
-		os.Exit(-1)
-	}
-	// 创建订阅者1订阅主题 cicv_location
-	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
-	subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfCicvLocation,
-		Callback: func(data *kinglong_msgs.PerceptionLocalization) {
-			m.RLock()
-			velocityX = data.VelocityX
-			velocityY = data.VelocityY
-			yaw = data.Yaw
-			m.RUnlock()
+					subscribersTimeMutexes[i].Lock()
+					if time.Since(subscribersTimes[i]).Seconds() > 1 {
+						subscribersMutexes[i].Lock()
+						faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+						lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+						var faultLabel string
+						for _, f := range masterConfig.RuleOfCicvLocation {
+							faultLabel = f(data)
+							if faultLabel != "" {
+								saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+								break
+							}
+						}
+						subscribersMutexes[i].Unlock()
+					}
+					subscribersTimeMutexes[i].Unlock()
+				},
+			})
+		}
 
-			if len(masterConfig.RuleOfCicvLocation) == 0 {
-				c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
-				return
-			}
-			entity.Subscriber1TimeMutex.Lock()
-			if time.Since(entity.Subscriber1Time).Seconds() > 1 {
-				subscriber1Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				// 更新共享变量
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfCicvLocation {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						entity.Subscriber1Time = time.Now()
-						break
+		if topic == masterConfig.TopicOfTpperception {
+			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+				Node:  commonConfig.RosNode,
+				Topic: topic,
+				Callback: func(data *kinglong_msgs.PerceptionObjects) {
+					if len(masterConfig.TopicOfTpperception) == 0 {
+						c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
+						return
 					}
-				}
-				subscriber1Mutex.Unlock()
-			}
-			entity.Subscriber1TimeMutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
-		os.Exit(-1)
-	}
-	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
-	subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfTpperception,
-		Callback: func(data *kinglong_msgs.PerceptionObjects) {
-			if len(masterConfig.RuleOfTpperception) == 0 {
-				c_log.GlobalLogger.Info("话题 tpperception 没有触发器")
-				return
-			}
-			entity.Subscriber2TimeMutex.Lock()
-			// 判断是否是连续故障码
-			if time.Since(entity.Subscriber2Time).Seconds() > 1 {
-				// 2 不是连续故障码
-				subscriber2Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				var faultLabel string
-				//c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
-				for _, f := range masterConfig.RuleOfTpperception {
-					faultLabel = f(data, velocityX, velocityY, yaw)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						entity.Subscriber2Time = time.Now()
-						break
+					subscribersTimeMutexes[i].Lock()
+					if time.Since(subscribersTimes[i]).Seconds() > 1 {
+						subscribersMutexes[i].Lock()
+						faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+						lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+						var faultLabel string
+						for _, f := range masterConfig.RuleOfTpperception {
+							faultLabel = f(data, velocityX, velocityY, yaw)
+							if faultLabel != "" {
+								saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+								break
+							}
+						}
+						subscribersMutexes[i].Unlock()
 					}
-				}
-				subscriber2Mutex.Unlock()
-			}
-			entity.Subscriber2TimeMutex.Unlock()
-		}})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
-		os.Exit(-1)
-	}
-	// 创建订阅者3订阅主题 fault_info
-	c_log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo)
-	subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfFaultInfo,
-		Callback: func(data *kinglong_msgs.FaultVec) {
-			if len(masterConfig.RuleOfFaultInfo) == 0 {
-				c_log.GlobalLogger.Info("话题 fault_info 没有触发器")
-				return
-			}
-			entity.Subscriber3TimeMutex.Lock()
-			if time.Since(entity.Subscriber3Time).Seconds() > 1 {
-				// 2 不是连续故障码
-				subscriber3Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfFaultInfo {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						entity.Subscriber3Time = time.Now()
-						break
+					subscribersTimeMutexes[i].Unlock()
+				},
+			})
+		}
+
+		if topic == masterConfig.TopicOfDataRead {
+			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+				Node:  commonConfig.RosNode,
+				Topic: topic,
+				Callback: func(data *kinglong_msgs.Retrieval) {
+					if len(masterConfig.TopicOfDataRead) == 0 {
+						c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
+						return
 					}
-				}
-				subscriber3Mutex.Unlock()
-			}
-			entity.Subscriber3TimeMutex.Unlock()
-		}})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
-		os.Exit(-1)
-	}
-	// 创建订阅者4订阅主题 data_read
-	// TODO 高频率触发
-	c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
-	subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfDataRead,
-		Callback: func(data *kinglong_msgs.Retrieval) {
-			if len(masterConfig.RuleOfDataRead) == 0 {
-				//c_log.GlobalLogger.Info("话题 data_read 没有触发器")
-				return
-			}
-			entity.Subscriber4TimeMutex.Lock()
-			if time.Since(entity.Subscriber4Time).Seconds() > 1 {
-				subscriber4Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfDataRead {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						entity.Subscriber4Time = time.Now()
-						break
+					subscribersTimeMutexes[i].Lock()
+					if time.Since(subscribersTimes[i]).Seconds() > 1 {
+						subscribersMutexes[i].Lock()
+						faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+						lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+						var faultLabel string
+						for _, f := range masterConfig.RuleOfDataRead {
+							faultLabel = f(data)
+							if faultLabel != "" {
+								saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+								subscribersTimes[i] = time.Now()
+								break
+							}
+						}
+						subscribersMutexes[i].Unlock()
 					}
-				}
-				subscriber4Mutex.Unlock()
-			}
-			entity.Subscriber4TimeMutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
-		os.Exit(-1)
+					subscribersTimeMutexes[i].Unlock()
+				},
+			})
+		}
+
+		if err != nil {
+			c_log.GlobalLogger.Info("创建订阅者报错:", err)
+			//TODO 如何回传日志
+			continue
+		}
 	}
+
+	////创建订阅者0订阅主题 nodefault_info
+	//c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
+	//subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
+	//	Node:  commonConfig.RosNode,
+	//	Topic: masterConfig.TopicOfNodeFaultInfo,
+	//	Callback: func(data *kinglong_msgs.FaultInfo) {
+	//		if len(masterConfig.RuleOfNodefaultInfo) == 0 {
+	//			//c_log.GlobalLogger.Info("话题 nodefault_info没有触发器")
+	//			return
+	//		}
+	//		entity.Subscriber0TimeMutex.Lock()
+	//		if time.Since(entity.Subscriber0Time).Seconds() > 1 {
+	//			entity.Subscriber0TimeMutex.Unlock()
+	//			subscriber0Mutex.Lock()
+	//			faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+	//			lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+	//			var faultLabel string
+	//			for _, f := range masterConfig.RuleOfNodefaultInfo {
+	//				faultLabel = f(data)
+	//				if faultLabel != "" {
+	//					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+	//					entity.Subscriber0Time = time.Now()
+	//					break
+	//				}
+	//			}
+	//			subscriber0Mutex.Unlock()
+	//		}
+	//		entity.Subscriber0TimeMutex.Unlock()
+	//	}})
+	//if err != nil {
+	//	c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
+	//	os.Exit(-1)
+	//}
+	//// 创建订阅者1订阅主题 cicv_location
+	//c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
+	//subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
+	//	Node:  commonConfig.RosNode,
+	//	Topic: masterConfig.TopicOfCicvLocation,
+	//	Callback: func(data *kinglong_msgs.PerceptionLocalization) {
+	//		m.RLock()
+	//		velocityX = data.VelocityX
+	//		velocityY = data.VelocityY
+	//		yaw = data.Yaw
+	//		m.RUnlock()
+	//
+	//		if len(masterConfig.RuleOfCicvLocation) == 0 {
+	//			c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
+	//			return
+	//		}
+	//		entity.Subscriber1TimeMutex.Lock()
+	//		if time.Since(entity.Subscriber1Time).Seconds() > 1 {
+	//			subscriber1Mutex.Lock()
+	//			faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+	//			lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+	//			// 更新共享变量
+	//			var faultLabel string
+	//			for _, f := range masterConfig.RuleOfCicvLocation {
+	//				faultLabel = f(data)
+	//				if faultLabel != "" {
+	//					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+	//					entity.Subscriber1Time = time.Now()
+	//					break
+	//				}
+	//			}
+	//			subscriber1Mutex.Unlock()
+	//		}
+	//		entity.Subscriber1TimeMutex.Unlock()
+	//	},
+	//})
+	//if err != nil {
+	//	c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
+	//	os.Exit(-1)
+	//}
+	//c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
+	//subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
+	//	Node:  commonConfig.RosNode,
+	//	Topic: masterConfig.TopicOfTpperception,
+	//	Callback: func(data *kinglong_msgs.PerceptionObjects) {
+	//		if len(masterConfig.RuleOfTpperception) == 0 {
+	//			c_log.GlobalLogger.Info("话题 tpperception 没有触发器")
+	//			return
+	//		}
+	//		entity.Subscriber2TimeMutex.Lock()
+	//		// 判断是否是连续故障码
+	//		if time.Since(entity.Subscriber2Time).Seconds() > 1 {
+	//			// 2 不是连续故障码
+	//			subscriber2Mutex.Lock()
+	//			faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+	//			lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+	//			var faultLabel string
+	//			//c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
+	//			for _, f := range masterConfig.RuleOfTpperception {
+	//				faultLabel = f(data, velocityX, velocityY, yaw)
+	//				if faultLabel != "" {
+	//					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+	//					entity.Subscriber2Time = time.Now()
+	//					break
+	//				}
+	//			}
+	//			subscriber2Mutex.Unlock()
+	//		}
+	//		entity.Subscriber2TimeMutex.Unlock()
+	//	}})
+	//if err != nil {
+	//	c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
+	//	os.Exit(-1)
+	//}
+	//// 创建订阅者3订阅主题 fault_info
+	//c_log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo)
+	//subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
+	//	Node:  commonConfig.RosNode,
+	//	Topic: masterConfig.TopicOfFaultInfo,
+	//	Callback: func(data *kinglong_msgs.FaultVec) {
+	//		if len(masterConfig.RuleOfFaultInfo) == 0 {
+	//			c_log.GlobalLogger.Info("话题 fault_info 没有触发器")
+	//			return
+	//		}
+	//		entity.Subscriber3TimeMutex.Lock()
+	//		if time.Since(entity.Subscriber3Time).Seconds() > 1 {
+	//			// 2 不是连续故障码
+	//			subscriber3Mutex.Lock()
+	//			faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+	//			lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+	//			var faultLabel string
+	//			for _, f := range masterConfig.RuleOfFaultInfo {
+	//				faultLabel = f(data)
+	//				if faultLabel != "" {
+	//					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+	//					entity.Subscriber3Time = time.Now()
+	//					break
+	//				}
+	//			}
+	//			subscriber3Mutex.Unlock()
+	//		}
+	//		entity.Subscriber3TimeMutex.Unlock()
+	//	}})
+	//if err != nil {
+	//	c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
+	//	os.Exit(-1)
+	//}
+	//// 创建订阅者4订阅主题 data_read
+	//// TODO 高频率触发
+	//c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
+	//subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
+	//	Node:  commonConfig.RosNode,
+	//	Topic: masterConfig.TopicOfDataRead,
+	//	Callback: func(data *kinglong_msgs.Retrieval) {
+	//		if len(masterConfig.RuleOfDataRead) == 0 {
+	//			//c_log.GlobalLogger.Info("话题 data_read 没有触发器")
+	//			return
+	//		}
+	//		entity.Subscriber4TimeMutex.Lock()
+	//		if time.Since(entity.Subscriber4Time).Seconds() > 1 {
+	//			subscriber4Mutex.Lock()
+	//			faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+	//			lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+	//			var faultLabel string
+	//			for _, f := range masterConfig.RuleOfDataRead {
+	//				faultLabel = f(data)
+	//				if faultLabel != "" {
+	//					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+	//					entity.Subscriber4Time = time.Now()
+	//					break
+	//				}
+	//			}
+	//			subscriber4Mutex.Unlock()
+	//		}
+	//		entity.Subscriber4TimeMutex.Unlock()
+	//	},
+	//})
+	//if err != nil {
+	//	c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
+	//	os.Exit(-1)
+	//}
 	select {
 	case signal := <-service.ChannelKillWindowProducer:
 		if signal == 1 {
-			defer service.AddKillTimes("3")
-			subscriber0.Close()
-			subscriber1.Close()
-			subscriber2.Close()
-			subscriber3.Close()
-			subscriber4.Close()
 			commonConfig.RosNode.Close()
+			service.AddKillTimes("3")
 			return
 		}
 	}

+ 1 - 0
aarch64/pji/master/package/service/produce_window.go

@@ -42,6 +42,7 @@ func PrepareTimeWindowProducerQueue() {
 							faultLabel = f(data)
 							if faultLabel != "" {
 								saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+								subscribersTimes[i] = time.Now()
 								break
 							}
 						}

+ 22 - 2
aarch64/pjisuv/common/config/c_platform.go

@@ -4,6 +4,7 @@ import (
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
 	"encoding/json"
+	"strings"
 	"time"
 )
 
@@ -22,6 +23,7 @@ type platformConfig struct {
 	TaskBeforeTime  int           `json:"taskBeforeTime"`
 	TaskAfterTime   int           `json:"taskAfterTime"`
 	TaskCachePolicy string        `json:"taskCachePolicy"`
+	EquipmentTopic  string        `json:"equipmentTopic"` // topic序列
 	Lru             []string      `json:"LRU"`
 	TaskTriggers    []taskTrigger `json:"taskTriggers"`
 }
@@ -34,7 +36,10 @@ type response struct {
 	NowTime string         `json:"nowTime"`
 }
 
-var PlatformConfig platformConfig
+var (
+	PlatformConfig  platformConfig
+	SubscribeTopics []string
+)
 
 // InitPlatformConfig 初始化数据闭环平台的配置
 func InitPlatformConfig() {
@@ -49,7 +54,14 @@ func InitPlatformConfig() {
 			c_log.GlobalLogger.Error("获取配置status失败:", err)
 			continue
 		}
-		break
+		if checkPlatformConfig() {
+			SubscribeTopics = strings.Split(PlatformConfig.EquipmentTopic, ",")
+			// 去掉首尾空格
+			for i, topic := range SubscribeTopics {
+				SubscribeTopics[i] = strings.TrimSpace(topic)
+			}
+			break
+		}
 	}
 	c_log.GlobalLogger.Info("获取数据闭环平台配置 - 成功。")
 }
@@ -167,3 +179,11 @@ func getConfig() (platformConfig, error) {
 	}
 	return result.Data, nil
 }
+
+func checkPlatformConfig() bool {
+	if PlatformConfig.EquipmentTopic == "" {
+		c_log.GlobalLogger.Error("数据闭环平台没有配置topic序列。")
+		return false
+	}
+	return true
+}

+ 109 - 115
aarch64/pjisuv/master/package/service/produce_window.go

@@ -9,142 +9,136 @@ import (
 	"cicv-data-closedloop/common/util"
 	"cicv-data-closedloop/pjisuv_msgs"
 	"github.com/bluenviron/goroslib/v2"
-	"os"
 	"sync"
 	"time"
 )
 
 var (
-	subscriber1Mutex sync.Mutex
-	subscriber2Mutex sync.Mutex
-	subscriber4Mutex sync.Mutex
-	m                sync.RWMutex
-	velocityX        float64
-	velocityY        float64
-	yaw              float64
+	m         sync.RWMutex
+	velocityX float64
+	velocityY float64
+	yaw       float64
 )
 
 // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
 func PrepareTimeWindowProducerQueue() {
 	c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
 	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
-	subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfCicvLocation,
-		Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
-			// 更新共享变量
-			m.RLock()
-			velocityX = data.VelocityX
-			velocityY = data.VelocityY
-			yaw = data.Yaw
-			m.RUnlock()
 
-			if len(masterConfig.RuleOfCicvLocation) == 0 {
-				c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
-				return
-			}
-			// 判断是否是连续故障码
-			entity.Subscriber1TimeMutex.Lock()
-			if time.Since(entity.Subscriber1Time).Seconds() > 1 {
-				subscriber1Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+	var err error
+	subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
+	subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
+	subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
+	subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
+	for i, topic := range commonConfig.SubscribeTopics {
+		c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
+		if topic == masterConfig.TopicOfCicvLocation {
+			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+				Node:  commonConfig.RosNode,
+				Topic: topic,
+				Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
+					// 更新共享变量
+					m.RLock()
+					velocityX = data.VelocityX
+					velocityY = data.VelocityY
+					yaw = data.Yaw
+					m.RUnlock()
 
-				// 规则判断
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfCicvLocation {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						entity.Subscriber1Time = time.Now()
-						break
+					if len(masterConfig.TopicOfCicvLocation) == 0 {
+						c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
+						return
 					}
-				}
-				subscriber1Mutex.Unlock()
-			}
-			entity.Subscriber1TimeMutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
-		os.Exit(-1)
-	}
-	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
-	subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfTpperception,
-		Callback: func(data *pjisuv_msgs.PerceptionObjects) {
-			if len(masterConfig.RuleOfTpperception) == 0 {
-				c_log.GlobalLogger.Info("话题 tpperception 没有触发器")
-				return
-			}
-			// 判断是否是连续故障码
-			entity.Subscriber2TimeMutex.Lock()
-			if time.Since(entity.Subscriber2Time).Seconds() > 1 {
-				// 2 不是连续故障码
-				subscriber2Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				//c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
-				// 规则判断
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfTpperception {
-					faultLabel = f(data, velocityX, velocityY, yaw)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						entity.Subscriber2Time = time.Now()
-						break
+					subscribersTimeMutexes[i].Lock()
+					if time.Since(subscribersTimes[i]).Seconds() > 1 {
+						subscribersMutexes[i].Lock()
+						faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+						lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+						var faultLabel string
+						for _, f := range masterConfig.RuleOfCicvLocation {
+							faultLabel = f(data)
+							if faultLabel != "" {
+								saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+								break
+							}
+						}
+						subscribersMutexes[i].Unlock()
 					}
-				}
-				subscriber2Mutex.Unlock()
-			}
-			entity.Subscriber2TimeMutex.Unlock()
-		}})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
-		os.Exit(-1)
-	}
-	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfDataRead)
-	subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  commonConfig.RosNode,
-		Topic: masterConfig.TopicOfDataRead,
-		Callback: func(data *pjisuv_msgs.Retrieval) {
-			if len(masterConfig.RuleOfDataRead) == 0 {
-				//c_log.GlobalLogger.Info("话题 data_read 没有触发器")
-				return
-			}
-			// 判断是否是连续故障码
-			entity.Subscriber4TimeMutex.Lock()
-			if time.Since(entity.Subscriber4Time).Seconds() > 1 {
-				subscriber4Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
-				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfDataRead {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						entity.Subscriber4Time = time.Now()
-						break
+					subscribersTimeMutexes[i].Unlock()
+				},
+			})
+		}
+
+		if topic == masterConfig.TopicOfTpperception {
+			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+				Node:  commonConfig.RosNode,
+				Topic: topic,
+				Callback: func(data *pjisuv_msgs.PerceptionObjects) {
+					if len(masterConfig.TopicOfTpperception) == 0 {
+						c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
+						return
 					}
-				}
-				subscriber4Mutex.Unlock()
-			}
-			entity.Subscriber4TimeMutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
-		os.Exit(-1)
+					subscribersTimeMutexes[i].Lock()
+					if time.Since(subscribersTimes[i]).Seconds() > 1 {
+						subscribersMutexes[i].Lock()
+						faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+						lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+						var faultLabel string
+						for _, f := range masterConfig.RuleOfTpperception {
+							faultLabel = f(data, velocityX, velocityY, yaw)
+							if faultLabel != "" {
+								saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+								break
+							}
+						}
+						subscribersMutexes[i].Unlock()
+					}
+					subscribersTimeMutexes[i].Unlock()
+				},
+			})
+		}
+
+		if topic == masterConfig.TopicOfDataRead {
+			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+				Node:  commonConfig.RosNode,
+				Topic: topic,
+				Callback: func(data *pjisuv_msgs.Retrieval) {
+					if len(masterConfig.TopicOfDataRead) == 0 {
+						c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
+						return
+					}
+					subscribersTimeMutexes[i].Lock()
+					if time.Since(subscribersTimes[i]).Seconds() > 1 {
+						subscribersMutexes[i].Lock()
+						faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+						lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
+						var faultLabel string
+						for _, f := range masterConfig.RuleOfDataRead {
+							faultLabel = f(data)
+							if faultLabel != "" {
+								saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+								subscribersTimes[i] = time.Now()
+								break
+							}
+						}
+						subscribersMutexes[i].Unlock()
+					}
+					subscribersTimeMutexes[i].Unlock()
+				},
+			})
+		}
+
+		if err != nil {
+			c_log.GlobalLogger.Info("创建订阅者报错:", err)
+			//TODO 如何回传日志
+			continue
+		}
 	}
+
 	select {
 	case signal := <-service.ChannelKillWindowProducer:
 		if signal == 1 {
-			defer service.AddKillTimes("3")
-			subscriber1.Close()
-			subscriber2.Close()
-			subscriber4.Close()
 			commonConfig.RosNode.Close()
+			service.AddKillTimes("3")
 			return
 		}
 	}