|
@@ -15,10 +15,8 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- subscriber0Mutex sync.Mutex
|
|
|
subscriber1Mutex sync.Mutex
|
|
|
subscriber2Mutex sync.Mutex
|
|
|
- subscriber3Mutex sync.Mutex
|
|
|
subscriber4Mutex sync.Mutex
|
|
|
m sync.RWMutex
|
|
|
velocityX float64
|
|
@@ -29,8 +27,7 @@ var (
|
|
|
// PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
|
|
|
func PrepareTimeWindowProducerQueue() {
|
|
|
c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
|
|
|
- // 创建订阅者1订阅主题 cicv_location
|
|
|
- c_log.GlobalLogger.Info("创建订阅者1订阅话题 ", masterConfig.TopicOfCicvLocation)
|
|
|
+ c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
|
|
|
subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
Node: commonConfig.RosNode,
|
|
|
Topic: masterConfig.TopicOfCicvLocation,
|
|
@@ -48,8 +45,7 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
}
|
|
|
// 判断是否是连续故障码
|
|
|
entity.Subscriber1TimeMutex.Lock()
|
|
|
- gap := time.Since(entity.Subscriber1Time).Seconds()
|
|
|
- if gap > 1 {
|
|
|
+ if time.Since(entity.Subscriber1Time).Seconds() > 1 {
|
|
|
subscriber1Mutex.Lock()
|
|
|
faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
@@ -70,11 +66,10 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
},
|
|
|
})
|
|
|
if err != nil {
|
|
|
- c_log.GlobalLogger.Info("创建订阅者1发生故障:", err)
|
|
|
+ c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
|
|
|
os.Exit(-1)
|
|
|
}
|
|
|
- // 创建订阅者2订阅主题 tpperception
|
|
|
- c_log.GlobalLogger.Info("创建订阅者2订阅话题 ", masterConfig.TopicOfTpperception)
|
|
|
+ c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
|
|
|
subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
Node: commonConfig.RosNode,
|
|
|
Topic: masterConfig.TopicOfTpperception,
|
|
@@ -85,8 +80,7 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
}
|
|
|
// 判断是否是连续故障码
|
|
|
entity.Subscriber2TimeMutex.Lock()
|
|
|
- gap := time.Since(entity.Subscriber2Time).Seconds()
|
|
|
- if gap > 1 {
|
|
|
+ if time.Since(entity.Subscriber2Time).Seconds() > 1 {
|
|
|
// 2 不是连续故障码
|
|
|
subscriber2Mutex.Lock()
|
|
|
faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
@@ -107,11 +101,10 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
entity.Subscriber2TimeMutex.Unlock()
|
|
|
}})
|
|
|
if err != nil {
|
|
|
- c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
|
|
|
+ c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
|
|
|
os.Exit(-1)
|
|
|
}
|
|
|
- // 创建订阅者4订阅主题 data_read
|
|
|
- c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
|
|
|
+ c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfDataRead)
|
|
|
subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
|
Node: commonConfig.RosNode,
|
|
|
Topic: masterConfig.TopicOfDataRead,
|
|
@@ -122,8 +115,7 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
}
|
|
|
// 判断是否是连续故障码
|
|
|
entity.Subscriber4TimeMutex.Lock()
|
|
|
- gap := time.Since(entity.Subscriber4Time).Seconds()
|
|
|
- if gap > 1 {
|
|
|
+ if time.Since(entity.Subscriber4Time).Seconds() > 1 {
|
|
|
subscriber4Mutex.Lock()
|
|
|
faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
@@ -142,17 +134,15 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
},
|
|
|
})
|
|
|
if err != nil {
|
|
|
- c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
|
|
|
+ c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
|
|
|
os.Exit(-1)
|
|
|
}
|
|
|
select {
|
|
|
case signal := <-service.ChannelKillWindowProducer:
|
|
|
if signal == 1 {
|
|
|
defer service.AddKillTimes("3")
|
|
|
- //subscriber0.Close()
|
|
|
subscriber1.Close()
|
|
|
subscriber2.Close()
|
|
|
- //subscriber3.Close()
|
|
|
subscriber4.Close()
|
|
|
commonConfig.RosNode.Close()
|
|
|
return
|