|
@@ -5,7 +5,7 @@ import (
|
|
|
"cicv-data-closedloop/aarch64/kinglong/common/service"
|
|
|
masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
|
|
|
"cicv-data-closedloop/common/config/c_log"
|
|
|
- "cicv-data-closedloop/common/entity"
|
|
|
+ commonEntity "cicv-data-closedloop/common/entity"
|
|
|
"cicv-data-closedloop/common/util"
|
|
|
"cicv-data-closedloop/kinglong_msgs"
|
|
|
"github.com/bluenviron/goroslib/v2"
|
|
@@ -14,10 +14,13 @@ import (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- m sync.RWMutex
|
|
|
- velocityX float64
|
|
|
- velocityY float64
|
|
|
- yaw float64
|
|
|
+ extendParam commonEntity.KinglongParam
|
|
|
+ // /cicv_location
|
|
|
+ mutexOfCicvLocation sync.RWMutex
|
|
|
+ // /tpperception
|
|
|
+ mutexOfTpperception sync.RWMutex
|
|
|
+ // /pj_control_pub
|
|
|
+ mutexOfPjControlPub sync.RWMutex
|
|
|
)
|
|
|
|
|
|
// PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
|
|
@@ -30,6 +33,20 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
subscribersTimeMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
subscribersMutexes := make([]sync.Mutex, len(commonConfig.SubscribeTopics))
|
|
|
for i, topic := range commonConfig.SubscribeTopics {
|
|
|
+ // !!!扩展的定时任务监听,牛逼的设计!!!扩展性拉满啦
|
|
|
+ if topic == masterConfig.TopicOfCicvExtend {
|
|
|
+ for {
|
|
|
+ time.Sleep(time.Duration(3500) * time.Millisecond)
|
|
|
+ for _, f := range masterConfig.RuleOfCicvExtend {
|
|
|
+ label := f(extendParam)
|
|
|
+ if label != "" {
|
|
|
+ saveTimeWindow(label, util.GetNowTimeCustom(), commonEntity.GetLastTimeWindow())
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
c_log.GlobalLogger.Info("创建订阅者订阅话题:" + topic)
|
|
|
if topic == masterConfig.TopicOfCicvLocation && len(masterConfig.RuleOfCicvLocation) > 0 {
|
|
|
subscribers[i], err = goroslib.NewSubscriber(goroslib.SubscriberConf{
|
|
@@ -37,17 +54,20 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
Topic: topic,
|
|
|
Callback: func(data *kinglong_msgs.PerceptionLocalization) {
|
|
|
// 更新共享变量
|
|
|
- m.RLock()
|
|
|
- velocityX = data.VelocityX
|
|
|
- velocityY = data.VelocityY
|
|
|
- yaw = data.Yaw
|
|
|
- m.RUnlock()
|
|
|
+ mutexOfCicvLocation.RLock()
|
|
|
+ {
|
|
|
+ extendParam.VelocityYOfCicvLocation = data.VelocityX
|
|
|
+ extendParam.VelocityYOfCicvLocation = data.VelocityY
|
|
|
+ extendParam.YawOfCicvLocation = data.Yaw
|
|
|
+ extendParam.AngularVelocityZOfCicvLocation = data.AngularVelocityZ
|
|
|
+ }
|
|
|
+ mutexOfCicvLocation.RUnlock()
|
|
|
|
|
|
subscribersTimeMutexes[i].Lock()
|
|
|
if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
var faultLabel string
|
|
|
for _, f := range masterConfig.RuleOfCicvLocation {
|
|
|
faultLabel = f(data)
|
|
@@ -71,11 +91,11 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
subscribersTimeMutexes[i].Lock()
|
|
|
if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
var faultLabel string
|
|
|
for _, f := range masterConfig.RuleOfTpperception {
|
|
|
- faultLabel = f(data, velocityX, velocityY, yaw)
|
|
|
+ faultLabel = f(data, extendParam)
|
|
|
if faultLabel != "" {
|
|
|
saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
break
|
|
@@ -96,8 +116,8 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
subscribersTimeMutexes[i].Lock()
|
|
|
if time.Since(subscribersTimes[i]).Seconds() > 1 {
|
|
|
subscribersMutexes[i].Lock()
|
|
|
- faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
+ lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
var faultLabel string
|
|
|
for _, f := range masterConfig.RuleOfDataRead {
|
|
|
faultLabel = f(data)
|
|
@@ -131,24 +151,24 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
// //c_log.GlobalLogger.Info("话题 nodefault_info没有触发器")
|
|
|
// return
|
|
|
// }
|
|
|
- // entity.Subscriber0TimeMutex.Lock()
|
|
|
- // if time.Since(entity.Subscriber0Time).Seconds() > 1 {
|
|
|
- // entity.Subscriber0TimeMutex.Unlock()
|
|
|
+ // commonEntity.Subscriber0TimeMutex.Lock()
|
|
|
+ // if time.Since(commonEntity.Subscriber0Time).Seconds() > 1 {
|
|
|
+ // commonEntity.Subscriber0TimeMutex.Unlock()
|
|
|
// subscriber0Mutex.Lock()
|
|
|
// faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
// var faultLabel string
|
|
|
// for _, f := range masterConfig.RuleOfNodefaultInfo {
|
|
|
// faultLabel = f(data)
|
|
|
// if faultLabel != "" {
|
|
|
// saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // entity.Subscriber0Time = time.Now()
|
|
|
+ // commonEntity.Subscriber0Time = time.Now()
|
|
|
// break
|
|
|
// }
|
|
|
// }
|
|
|
// subscriber0Mutex.Unlock()
|
|
|
// }
|
|
|
- // entity.Subscriber0TimeMutex.Unlock()
|
|
|
+ // commonEntity.Subscriber0TimeMutex.Unlock()
|
|
|
// }})
|
|
|
//if err != nil {
|
|
|
// c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
|
|
@@ -170,24 +190,24 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
// c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
|
|
|
// return
|
|
|
// }
|
|
|
- // entity.Subscriber1TimeMutex.Lock()
|
|
|
- // if time.Since(entity.Subscriber1Time).Seconds() > 1 {
|
|
|
+ // commonEntity.Subscriber1TimeMutex.Lock()
|
|
|
+ // if time.Since(commonEntity.Subscriber1Time).Seconds() > 1 {
|
|
|
// subscriber1Mutex.Lock()
|
|
|
// faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
// // 更新共享变量
|
|
|
// var faultLabel string
|
|
|
// for _, f := range masterConfig.RuleOfCicvLocation {
|
|
|
// faultLabel = f(data)
|
|
|
// if faultLabel != "" {
|
|
|
// saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // entity.Subscriber1Time = time.Now()
|
|
|
+ // commonEntity.Subscriber1Time = time.Now()
|
|
|
// break
|
|
|
// }
|
|
|
// }
|
|
|
// subscriber1Mutex.Unlock()
|
|
|
// }
|
|
|
- // entity.Subscriber1TimeMutex.Unlock()
|
|
|
+ // commonEntity.Subscriber1TimeMutex.Unlock()
|
|
|
// },
|
|
|
//})
|
|
|
//if err != nil {
|
|
@@ -203,26 +223,26 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
// c_log.GlobalLogger.Info("话题 tpperception 没有触发器")
|
|
|
// return
|
|
|
// }
|
|
|
- // entity.Subscriber2TimeMutex.Lock()
|
|
|
+ // commonEntity.Subscriber2TimeMutex.Lock()
|
|
|
// // 判断是否是连续故障码
|
|
|
- // if time.Since(entity.Subscriber2Time).Seconds() > 1 {
|
|
|
+ // if time.Since(commonEntity.Subscriber2Time).Seconds() > 1 {
|
|
|
// // 2 不是连续故障码
|
|
|
// subscriber2Mutex.Lock()
|
|
|
// faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
// var faultLabel string
|
|
|
// //c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
|
|
|
// for _, f := range masterConfig.RuleOfTpperception {
|
|
|
// faultLabel = f(data, velocityX, velocityY, yaw)
|
|
|
// if faultLabel != "" {
|
|
|
// saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // entity.Subscriber2Time = time.Now()
|
|
|
+ // commonEntity.Subscriber2Time = time.Now()
|
|
|
// break
|
|
|
// }
|
|
|
// }
|
|
|
// subscriber2Mutex.Unlock()
|
|
|
// }
|
|
|
- // entity.Subscriber2TimeMutex.Unlock()
|
|
|
+ // commonEntity.Subscriber2TimeMutex.Unlock()
|
|
|
// }})
|
|
|
//if err != nil {
|
|
|
// c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
|
|
@@ -238,24 +258,24 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
// c_log.GlobalLogger.Info("话题 fault_info 没有触发器")
|
|
|
// return
|
|
|
// }
|
|
|
- // entity.Subscriber3TimeMutex.Lock()
|
|
|
- // if time.Since(entity.Subscriber3Time).Seconds() > 1 {
|
|
|
+ // commonEntity.Subscriber3TimeMutex.Lock()
|
|
|
+ // if time.Since(commonEntity.Subscriber3Time).Seconds() > 1 {
|
|
|
// // 2 不是连续故障码
|
|
|
// subscriber3Mutex.Lock()
|
|
|
// faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
// var faultLabel string
|
|
|
// for _, f := range masterConfig.RuleOfFaultInfo {
|
|
|
// faultLabel = f(data)
|
|
|
// if faultLabel != "" {
|
|
|
// saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // entity.Subscriber3Time = time.Now()
|
|
|
+ // commonEntity.Subscriber3Time = time.Now()
|
|
|
// break
|
|
|
// }
|
|
|
// }
|
|
|
// subscriber3Mutex.Unlock()
|
|
|
// }
|
|
|
- // entity.Subscriber3TimeMutex.Unlock()
|
|
|
+ // commonEntity.Subscriber3TimeMutex.Unlock()
|
|
|
// }})
|
|
|
//if err != nil {
|
|
|
// c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
|
|
@@ -272,23 +292,23 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
// //c_log.GlobalLogger.Info("话题 data_read 没有触发器")
|
|
|
// return
|
|
|
// }
|
|
|
- // entity.Subscriber4TimeMutex.Lock()
|
|
|
- // if time.Since(entity.Subscriber4Time).Seconds() > 1 {
|
|
|
+ // commonEntity.Subscriber4TimeMutex.Lock()
|
|
|
+ // if time.Since(commonEntity.Subscriber4Time).Seconds() > 1 {
|
|
|
// subscriber4Mutex.Lock()
|
|
|
// faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
|
|
|
- // lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
+ // lastTimeWindow := commonEntity.GetLastTimeWindow() // 获取最后一个时间窗口
|
|
|
// var faultLabel string
|
|
|
// for _, f := range masterConfig.RuleOfDataRead {
|
|
|
// faultLabel = f(data)
|
|
|
// if faultLabel != "" {
|
|
|
// saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
|
|
|
- // entity.Subscriber4Time = time.Now()
|
|
|
+ // commonEntity.Subscriber4Time = time.Now()
|
|
|
// break
|
|
|
// }
|
|
|
// }
|
|
|
// subscriber4Mutex.Unlock()
|
|
|
// }
|
|
|
- // entity.Subscriber4TimeMutex.Unlock()
|
|
|
+ // commonEntity.Subscriber4TimeMutex.Unlock()
|
|
|
// },
|
|
|
//})
|
|
|
//if err != nil {
|
|
@@ -305,11 +325,11 @@ func PrepareTimeWindowProducerQueue() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
|
|
|
+func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
|
|
|
masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
|
|
|
if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
|
|
|
// 2-1 如果是不在旧故障窗口内,添加一个新窗口
|
|
|
- newTimeWindow := entity.TimeWindow{
|
|
|
+ newTimeWindow := commonEntity.TimeWindow{
|
|
|
FaultTime: faultHappenTime,
|
|
|
TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
|
|
|
TimeWindowEnd: util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
|
|
@@ -319,11 +339,11 @@ func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *e
|
|
|
SlaveTopics: slaveTopics,
|
|
|
}
|
|
|
c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
|
|
|
- entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
|
|
|
+ commonEntity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
|
|
|
} else {
|
|
|
// 2-2 如果在旧故障窗口内
|
|
|
- entity.TimeWindowProducerQueueMutex.RLock()
|
|
|
- defer entity.TimeWindowProducerQueueMutex.RUnlock()
|
|
|
+ commonEntity.TimeWindowProducerQueueMutex.RLock()
|
|
|
+ defer commonEntity.TimeWindowProducerQueueMutex.RUnlock()
|
|
|
// 2-2-1 更新故障窗口end时间
|
|
|
maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
|
|
|
expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
|