孟令鑫 1 年之前
父節點
當前提交
90303eab4c

+ 3 - 3
pji/common/cfg/cloud_cfg.go

@@ -80,7 +80,7 @@ func InitCloudConfig() {
 	}
 
 	// 5 ------- 校验 yaml -------
-	if checkConfig(newCloudConfig) {
+	if checkCloudConfig(newCloudConfig) {
 		CloudConfigMutex.RLock()
 		CloudConfig = newCloudConfig
 		CloudConfigMutex.RUnlock()
@@ -121,7 +121,7 @@ func refreshCloudConfig() {
 	}
 
 	// 5 ------- 校验 yaml -------
-	if checkConfig(newCloudConfig) {
+	if checkCloudConfig(newCloudConfig) {
 		CloudConfigMutex.RLock()
 		CloudConfig = newCloudConfig
 		CloudConfigMutex.RUnlock()
@@ -142,7 +142,7 @@ func RefreshCloudConfig() {
 }
 
 // CheckConfig 校验 cfg.yaml 文件
-func checkConfig(check cloudConfig) bool {
+func checkCloudConfig(check cloudConfig) bool {
 	if len(check.Hosts) != 1 {
 		c_log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为1。")
 		os.Exit(-1)

+ 22 - 2
pji/common/cfg/platform_cfg.go

@@ -5,6 +5,7 @@ import (
 	"cicv-data-closedloop/kinglong/common/log"
 	"cicv-data-closedloop/pji/common/cutil"
 	"encoding/json"
+	"strings"
 	"time"
 )
 
@@ -23,6 +24,7 @@ type platformConfig struct {
 	TaskBeforeTime  int           `json:"taskBeforeTime"`
 	TaskAfterTime   int           `json:"taskAfterTime"`
 	TaskCachePolicy string        `json:"taskCachePolicy"`
+	EquipmentTopic  string        `json:"equipmentTopic"` // topic序列
 	Lru             []string      `json:"LRU"`
 	TaskTriggers    []taskTrigger `json:"taskTriggers"`
 }
@@ -35,7 +37,10 @@ type response struct {
 	NowTime string         `json:"nowTime"`
 }
 
-var PlatformConfig platformConfig
+var (
+	PlatformConfig  platformConfig
+	SubscribeTopics []string
+)
 
 // InitPlatformConfig 初始化数据闭环平台的配置
 func InitPlatformConfig() {
@@ -51,7 +56,14 @@ func InitPlatformConfig() {
 			c_log.GlobalLogger.Error("获取配置status失败:", err)
 			continue
 		}
-		break
+		if checkPlatformConfig() {
+			SubscribeTopics = strings.Split(PlatformConfig.EquipmentTopic, ",")
+			// 去掉首尾空格
+			for i, topic := range SubscribeTopics {
+				SubscribeTopics[i] = strings.TrimSpace(topic)
+			}
+			break
+		}
 	}
 	c_log.GlobalLogger.Info("获取数据闭环平台配置 - 成功。")
 }
@@ -169,3 +181,11 @@ func getConfig() (platformConfig, error) {
 	}
 	return result.Data, nil
 }
+
+func checkPlatformConfig() bool {
+	if PlatformConfig.EquipmentTopic == "" {
+		c_log.GlobalLogger.Error("数据闭环平台没有配置topic序列。")
+		return false
+	}
+	return true
+}

+ 122 - 52
pji/master/package/cfg/master_trigger_cfg.go

@@ -2,12 +2,8 @@ package cfg
 
 import (
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/cutil"
-	"cicv-data-closedloop/pji_msgs"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/nav_msgs"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
 	"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
 	"plugin"
 	"strconv"
@@ -15,18 +11,8 @@ import (
 )
 
 var (
-	TopicOfOdom              = "/odom"
-	RuleOfOdom               []func(data *nav_msgs.Odometry) string
 	TopicOfObstacleDetection = "/obstacle_detection"
 	RuleOfObstacleDetection  []func(data *std_msgs.UInt8) string
-	TopicOfSysInfo           = "/sys_info"
-	RuleOfSysInfo            []func(data *pji_msgs.SysInfo) string
-	TopicOfLocateInfo        = "/locate_info"
-	RuleOfLocateInfo         []func(data *pji_msgs.LocateInfo) string
-	TopicOfImu               = "/imu"
-	RuleOfImu                []func(data *sensor_msgs.Imu) string
-	TopicOfDiagnostics       = "/diagnostics"
-	RuleOfDiagnostics        []func(data *diagnostic_msgs.DiagnosticArray) string
 	LabelMapTriggerId        = make(map[string]string)
 )
 
@@ -38,7 +24,7 @@ func InitTriggerConfig() {
 		fileName := slice[len(slice)-1]
 		// 下载
 		triggerLocalPath := cfg.CloudConfig.TriggersDir + fileName
-		cutil.CreateParentDir(triggerLocalPath)
+		_ = util.CreateParentDir(triggerLocalPath)
 		c_log.GlobalLogger.Info("下载触发器插件从", trigger.TriggerScriptPath, "到", triggerLocalPath)
 		err := cfg.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath)
 		if err != nil {
@@ -66,48 +52,14 @@ func InitTriggerConfig() {
 			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Rule方法失败。", err)
 			continue
 		}
-		if TopicOfOdom == topic2 {
-			f, ok := rule.(func(data *nav_msgs.Odometry) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *nav_msgs.Odometry) string):", err)
-				continue
-			}
-			RuleOfOdom = append(RuleOfOdom, f)
-		} else if TopicOfObstacleDetection == topic2 {
+		// 判断规则类型
+		if TopicOfObstacleDetection == topic2 {
 			f, ok := rule.(func(data *std_msgs.UInt8) string)
 			if ok != true {
 				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *std_msgs.UInt8) string):", err)
 				continue
 			}
 			RuleOfObstacleDetection = append(RuleOfObstacleDetection, f)
-		} else if TopicOfSysInfo == topic2 {
-			f, ok := rule.(func(data *pji_msgs.SysInfo) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *pji_msgs.SysInfo) string):", err)
-				continue
-			}
-			RuleOfSysInfo = append(RuleOfSysInfo, f)
-		} else if TopicOfLocateInfo == topic2 {
-			f, ok := rule.(func(data *pji_msgs.LocateInfo) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *pji_msgs.LocateInfo) string):", err)
-				continue
-			}
-			RuleOfLocateInfo = append(RuleOfLocateInfo, f)
-		} else if TopicOfImu == topic2 {
-			f, ok := rule.(func(data *sensor_msgs.Imu) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *sensor_msgs.Imu) string):", err)
-				continue
-			}
-			RuleOfImu = append(RuleOfImu, f)
-		} else if TopicOfDiagnostics == topic2 {
-			f, ok := rule.(func(data *diagnostic_msgs.DiagnosticArray) string)
-			if ok != true {
-				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *diagnostic_msgs.DiagnosticArray) string):", err)
-				continue
-			}
-			RuleOfDiagnostics = append(RuleOfDiagnostics, f)
 		} else {
 			c_log.GlobalLogger.Error("未知的topic:", topic2)
 			continue
@@ -129,3 +81,121 @@ func InitTriggerConfig() {
 	}
 	c_log.GlobalLogger.Info("主节点加载触发器插件 - 成功。")
 }
+
+//
+//var (
+//	TopicOfOdom              = "/odom"
+//	RuleOfOdom               []func(data *nav_msgs.Odometry) string
+//	TopicOfObstacleDetection = "/obstacle_detection"
+//	RuleOfObstacleDetection  []func(data *std_msgs.UInt8) string
+//	TopicOfSysInfo           = "/sys_info"
+//	RuleOfSysInfo            []func(data *pji_msgs.SysInfo) string
+//	TopicOfLocateInfo        = "/locate_info"
+//	RuleOfLocateInfo         []func(data *pji_msgs.LocateInfo) string
+//	TopicOfImu               = "/imu"
+//	RuleOfImu                []func(data *sensor_msgs.Imu) string
+//	TopicOfDiagnostics       = "/diagnostics"
+//	RuleOfDiagnostics        []func(data *diagnostic_msgs.DiagnosticArray) string
+//	LabelMapTriggerId        = make(map[string]string)
+//
+//)
+//
+//func InitTriggerConfig() {
+//	// 下载所有触发器的文件
+//	for _, trigger := range cfg.PlatformConfig.TaskTriggers {
+//		// 获取文件名
+//		slice := strings.Split(trigger.TriggerScriptPath, "/")
+//		fileName := slice[len(slice)-1]
+//		// 下载
+//		triggerLocalPath := cfg.CloudConfig.TriggersDir + fileName
+//		cutil.CreateParentDir(triggerLocalPath)
+//		c_log.GlobalLogger.Info("下载触发器插件从", trigger.TriggerScriptPath, "到", triggerLocalPath)
+//		err := cfg.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath)
+//		if err != nil {
+//			c_log.GlobalLogger.Error("下载oss上的触发器插件失败:", err)
+//			continue
+//		}
+//		// 载入插件到数组
+//		open, err := plugin.Open(triggerLocalPath)
+//		if err != nil {
+//			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "失败。", err)
+//		}
+//		topic0, err := open.Lookup("Topic")
+//		if err != nil {
+//			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Topic方法失败。", err)
+//			continue
+//		}
+//		topic1, ok := topic0.(func() string)
+//		if ok != true {
+//			c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func() string):", err)
+//			continue
+//		}
+//		topic2 := topic1()
+//		rule, err := open.Lookup("Rule")
+//		if err != nil {
+//			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Rule方法失败。", err)
+//			continue
+//		}
+//		if TopicOfOdom == topic2 {
+//			f, ok := rule.(func(data *nav_msgs.Odometry) string)
+//			if ok != true {
+//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *nav_msgs.Odometry) string):", err)
+//				continue
+//			}
+//			RuleOfOdom = append(RuleOfOdom, f)
+//		} else if TopicOfObstacleDetection == topic2 {
+//			f, ok := rule.(func(data *std_msgs.UInt8) string)
+//			if ok != true {
+//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *std_msgs.UInt8) string):", err)
+//				continue
+//			}
+//			RuleOfObstacleDetection = append(RuleOfObstacleDetection, f)
+//		} else if TopicOfSysInfo == topic2 {
+//			f, ok := rule.(func(data *pji_msgs.SysInfo) string)
+//			if ok != true {
+//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *pji_msgs.SysInfo) string):", err)
+//				continue
+//			}
+//			RuleOfSysInfo = append(RuleOfSysInfo, f)
+//		} else if TopicOfLocateInfo == topic2 {
+//			f, ok := rule.(func(data *pji_msgs.LocateInfo) string)
+//			if ok != true {
+//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *pji_msgs.LocateInfo) string):", err)
+//				continue
+//			}
+//			RuleOfLocateInfo = append(RuleOfLocateInfo, f)
+//		} else if TopicOfImu == topic2 {
+//			f, ok := rule.(func(data *sensor_msgs.Imu) string)
+//			if ok != true {
+//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *sensor_msgs.Imu) string):", err)
+//				continue
+//			}
+//			RuleOfImu = append(RuleOfImu, f)
+//		} else if TopicOfDiagnostics == topic2 {
+//			f, ok := rule.(func(data *diagnostic_msgs.DiagnosticArray) string)
+//			if ok != true {
+//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *diagnostic_msgs.DiagnosticArray) string):", err)
+//				continue
+//			}
+//			RuleOfDiagnostics = append(RuleOfDiagnostics, f)
+//		} else {
+//			c_log.GlobalLogger.Error("未知的topic:", topic2)
+//			continue
+//		}
+//
+//		label, err := open.Lookup("Label")
+//		if err != nil {
+//			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的TriggerName方法失败。", err)
+//			continue
+//		}
+//		labelFunc, ok := label.(func() string)
+//		if ok != true {
+//			c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Label方法必须是(func() string):", err)
+//			continue
+//		}
+//		labelString := labelFunc()
+//		LabelMapTriggerId[labelString] = strconv.Itoa(trigger.TriggerId)
+//		c_log.GlobalLogger.Info("主节点加载触发器插件:【ros topic】=", topic2, ",【触发器label】=", labelString, "【触发器ID】=", trigger.TriggerId, "【label和id映射关系】=", LabelMapTriggerId)
+//	}
+//	c_log.GlobalLogger.Info("主节点加载触发器插件 - 成功。")
+//}

+ 53 - 192
pji/master/package/svc/produce_window.go

@@ -3,199 +3,67 @@ package svc
 import (
 	"cicv-data-closedloop/common/config/c_log"
 	commonUtil "cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/pji/common/cfg"
+	commonConfig "cicv-data-closedloop/pji/common/cfg"
 	commonEntity "cicv-data-closedloop/pji/common/ent"
 	"cicv-data-closedloop/pji/common/global"
 	commonService "cicv-data-closedloop/pji/common/svc"
 	"cicv-data-closedloop/pji/common/util"
 	masterConfig "cicv-data-closedloop/pji/master/package/cfg"
-	"cicv-data-closedloop/pji_msgs"
 	"github.com/bluenviron/goroslib/v2"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/diagnostic_msgs"
-	"github.com/bluenviron/goroslib/v2/pkg/msgs/sensor_msgs"
 	"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
-	"os"
 	"sync"
-)
-
-var (
-	subscriber0Mutex sync.Mutex
-	subscriber1Mutex sync.Mutex
-	subscriber2Mutex sync.Mutex
-	subscriber3Mutex sync.Mutex
-	subscriber4Mutex sync.Mutex
-	subscriber5Mutex sync.Mutex
+	"time"
 )
 
 // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
 func PrepareTimeWindowProducerQueue() {
-	//log.GlobalLogger.Info("创建订阅者订阅主题", masterConfig.TopicOfOdom)
-	//subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-	//	Node:  cfg.RosNode,
-	//	Topic: masterConfig.TopicOfOdom,
-	//	Callback: func(data *nav_msgs.Odometry) {
-	//		//log.GlobalLogger.Info("话题", masterConfig.TopicOfOdom, "接收到数据", data)
-	//		subscriber0Mutex.Lock()
-	//		{
-	//			faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-	//			lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
-	//			var faultLabel string
-	//			for _, f := range masterConfig.RuleOfOdom {
-	//				faultLabel = f(data)
-	//				if faultLabel != "" {
-	//					saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-	//					break
-	//				}
-	//			}
-	//		}
-	//		subscriber0Mutex.Unlock()
-	//	},
-	//})
-	//if err != nil {
-	//	log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfOdom, "发生故障:", err)
-	//	os.Exit(-1)
-	//}
-	c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfObstacleDetection)
-	subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  cfg.RosNode,
-		Topic: masterConfig.TopicOfObstacleDetection,
-		Callback: func(data *std_msgs.UInt8) {
-			//c_log.GlobalLogger.Info("话题", masterConfig.TopicOfObstacleDetection, "接收到数据", data)
-			subscriber1Mutex.Lock()
-			{
-				faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow()       // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfObstacleDetection {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						break
-					}
-				}
-			}
-			subscriber1Mutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
-		os.Exit(-1)
-	}
-	c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfSysInfo)
-	subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  cfg.RosNode,
-		Topic: masterConfig.TopicOfSysInfo,
-		Callback: func(data *pji_msgs.SysInfo) {
-			//c_log.GlobalLogger.Info("话题", masterConfig.TopicOfSysInfo, "接收到数据", data)
-			subscriber2Mutex.Lock()
-			{
-				faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow()       // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfSysInfo {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						break
-					}
-				}
-			}
-			subscriber2Mutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
-		os.Exit(-1)
-	}
-	c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfLocateInfo)
-	subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  cfg.RosNode,
-		Topic: masterConfig.TopicOfLocateInfo,
-		Callback: func(data *pji_msgs.LocateInfo) {
-			//c_log.GlobalLogger.Info("话题", masterConfig.TopicOfLocateInfo, "接收到数据", data)
-			subscriber3Mutex.Lock()
-			{
-				faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow()       // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfLocateInfo {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						break
-					}
-				}
-			}
-			subscriber3Mutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
-		os.Exit(-1)
-	}
-	c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfImu)
-	subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  cfg.RosNode,
-		Topic: masterConfig.TopicOfImu,
-		Callback: func(data *sensor_msgs.Imu) {
-			//c_log.GlobalLogger.Info("话题", masterConfig.TopicOfImu, "接收到数据", data)
-			subscriber4Mutex.Lock()
-			{
-				faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow()       // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfImu {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						break
+
+	var err error
+	subscribers := make([]*goroslib.Subscriber, len(commonConfig.SubscribeTopics))
+	subscribersTimes := make([]time.Time, len(commonConfig.SubscribeTopics))
+	subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
+	subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
+	for i, topic := range commonConfig.SubscribeTopics {
+		c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
+		if topic == masterConfig.TopicOfObstacleDetection {
+			subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
+				Node:  commonConfig.RosNode,
+				Topic: topic,
+				Callback: func(data *std_msgs.UInt8) {
+					if len(masterConfig.RuleOfObstacleDetection) == 0 {
+						c_log.GlobalLogger.Infof("话题 %v 没有触发器", topic)
+						return
 					}
-				}
-			}
-			subscriber4Mutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
-		os.Exit(-1)
-	}
-	c_log.GlobalLogger.Info("创建订阅者订阅主题" + masterConfig.TopicOfDiagnostics)
-	subscriber5, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
-		Node:  cfg.RosNode,
-		Topic: masterConfig.TopicOfDiagnostics,
-		Callback: func(data *diagnostic_msgs.DiagnosticArray) {
-			//c_log.GlobalLogger.Info("话题", masterConfig.TopicOfDiagnostics, "接收到数据", data)
-			subscriber5Mutex.Lock()
-			{
-				faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow()       // 获取最后一个时间窗口
-				var faultLabel string
-				for _, f := range masterConfig.RuleOfDiagnostics {
-					faultLabel = f(data)
-					if faultLabel != "" {
-						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						break
+					subscribersTimeMutexes[i].Lock()
+					if time.Since(subscribersTimes[i]).Seconds() > 1 {
+						subscribersMutexes[i].Lock()
+						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
+						lastTimeWindow := util.GetLastTimeWindow()       // 获取最后一个时间窗口
+						var faultLabel string
+						for _, f := range masterConfig.RuleOfObstacleDetection {
+							faultLabel = f(data)
+							if faultLabel != "" {
+								saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
+								break
+							}
+						}
+						subscribersMutexes[i].Unlock()
 					}
-				}
-			}
-			subscriber5Mutex.Unlock()
-		},
-	})
-	if err != nil {
-		c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
-		os.Exit(-1)
+					subscribersTimeMutexes[i].Unlock()
+				},
+			})
+		}
+		if err != nil {
+			c_log.GlobalLogger.Info("创建订阅者", masterConfig.TopicOfObstacleDetection, "发生故障:", err)
+			//TODO 如何回传日志
+			continue
+		}
 	}
 
 	select {
 	case signal := <-commonService.ChannelKillSubscriber:
 		if signal == 1 {
-			//subscriber0.Close()
-			subscriber1.Close()
-			subscriber2.Close()
-			subscriber3.Close()
-			subscriber4.Close()
-			subscriber5.Close()
-			cfg.RosNode.Close()
+			commonConfig.RosNode.Close()
 			commonService.AddKillTimes("3")
 			return
 		}
@@ -208,9 +76,9 @@ func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *c
 		// 2-1 如果是不在旧故障窗口内,添加一个新窗口
 		newTimeWindow := commonEntity.TimeWindow{
 			FaultTime:       faultHappenTime,
-			TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -cfg.PlatformConfig.TaskBeforeTime),
-			TimeWindowEnd:   util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime),
-			Length:          cfg.PlatformConfig.TaskBeforeTime + cfg.PlatformConfig.TaskAfterTime + 1,
+			TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
+			TimeWindowEnd:   util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
+			Length:          commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
 			Labels:          []string{faultLabel},
 			MasterTopics:    masterTopics,
 			SlaveTopics:     slaveTopics,
@@ -222,11 +90,11 @@ func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *c
 		global.TimeWindowProducerQueueMutex.RLock()
 		defer global.TimeWindowProducerQueueMutex.RUnlock()
 		// 2-2-1 更新故障窗口end时间
-		maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, cfg.PlatformConfig.TaskMaxTime)
-		expectEnd := util.TimeCustomChange(faultHappenTime, cfg.PlatformConfig.TaskAfterTime)
+		maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
+		expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
 		if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
 			lastTimeWindow.TimeWindowEnd = maxEnd
-			lastTimeWindow.Length = cfg.PlatformConfig.TaskMaxTime
+			lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
 		} else {
 			if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
 				lastTimeWindow.TimeWindowEnd = expectEnd
@@ -248,21 +116,14 @@ func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *c
 func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []string) {
 	// 获取所有需要采集的topic
 	var faultCodeTopics []string
-	for _, code := range cfg.CloudConfig.Triggers {
+	for _, code := range commonConfig.CloudConfig.Triggers {
 		if code.Label == faultLabel {
 			faultCodeTopics = code.Topics
 		}
 	}
 	return faultCodeTopics, nil
-	//// 根据不同节点采集的topic进行分配采集
-	//for _, acceptTopic := range faultCodeTopics {
-	//	for _, host := range cfg.CloudConfig.Hosts {
-	//		for _, topic := range host.Topics {
-	//			if host.Name == cfg.CloudConfig.Hosts[0].Name && acceptTopic == topic {
-	//				masterTopics = append(masterTopics, acceptTopic)
-	//			}
-	//		}
-	//	}
-	//}
-	//return masterTopics, slaveTopics
+}
+
+func initCallbackFunc() {
+
 }