孟令鑫 1 year ago
parent
commit
891b67a27b

+ 4 - 4
pji/common/svc/disk_clean.go

@@ -1,10 +1,10 @@
 package svc
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/kinglong/common/cfg"
 	commonConfig "cicv-data-closedloop/pji/common/cfg"
 	"cicv-data-closedloop/pji/common/global"
-	"cicv-data-closedloop/pji/common/log"
 	"cicv-data-closedloop/pji/common/util"
 	masterConfig "cicv-data-closedloop/pji/master/pkg/cfg"
 	"time"
@@ -12,7 +12,7 @@ import (
 
 // DiskClean 如果磁盘占用过高,则删除timeWindow和对应的文件
 func DiskClean() {
-	log.GlobalLogger.Info("清理timeWindow,启动!")
+	c_log.GlobalLogger.Info("清理timeWindow,启动!")
 	for {
 		time.Sleep(1000 * time.Millisecond)
 		// 1 获取磁盘占用
@@ -44,7 +44,7 @@ func DiskClean() {
 					}
 				}
 			} else {
-				log.GlobalLogger.Error("未知的缓存策略:", policy)
+				c_log.GlobalLogger.Error("未知的缓存策略:", policy)
 			}
 
 		}
@@ -58,7 +58,7 @@ func deleteTimeWindow(indexToRemove int) {
 	dir := util.GetCopyDir(faultTime)
 	err := util.RemoveDir(dir)
 	if err != nil {
-		log.GlobalLogger.Error("删除目录", dir, "失败:", err)
+		c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
 	}
 	global.TimeWindowConsumerQueueMutex.Lock()
 	// 使用切片的特性删除指定位置的元素

+ 10 - 10
pji/common/svc/kill_self.go

@@ -1,8 +1,8 @@
 package svc
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	commonConfig "cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/log"
 	"cicv-data-closedloop/pji/common/util"
 	"net/rpc"
 	"os"
@@ -34,7 +34,7 @@ type KillService struct{}
 
 // Kill 杀死自身程序,通过通道实现 方法必须满足RPC规范:函数有两个参数,第一个参数是请求,第二个是响应
 func (m *KillService) Kill(args *KillSignal, reply *int) error {
-	log.GlobalLogger.Info("接收到自杀信号:", *args)
+	c_log.GlobalLogger.Info("接收到自杀信号:", *args)
 	// 1 杀死 rosbag record 命令
 	ChannelKillRosRecord <- 1
 	// 2 杀死所有 ros 订阅者
@@ -56,7 +56,7 @@ func WaitKillSelf() {
 	killService := new(KillService)
 	err := rpc.Register(killService)
 	if err != nil {
-		log.GlobalLogger.Error("注册rpc服务失败:", err)
+		c_log.GlobalLogger.Error("注册rpc服务失败:", err)
 		return
 	}
 
@@ -75,24 +75,24 @@ func AddKillTimes(info string) {
 
 	switch info {
 	case "1":
-		log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		ChannelKillDiskClean <- 1
 		KillTimes++
 		close(ChannelKillRosRecord)
 	case "2":
-		log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		KillTimes++
 		close(ChannelKillDiskClean)
 	case "3":
-		log.GlobalLogger.Infof("已杀死rosnode和ros订阅者goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死rosnode和ros订阅者goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		KillTimes++
 		close(ChannelKillSubscriber)
 	case "4":
-		log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		KillTimes++
 		close(ChannelKillMove)
 	case "5":
-		log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		KillTimes++
 		close(ChannelKillConsume)
 	}
@@ -106,11 +106,11 @@ func killDone(restart bool) {
 			if restart {
 				_, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartDir, commonConfig.LocalConfig.RestartCmd)
 				if err != nil {
-					log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartDir, "【cmd】=", commonConfig.LocalConfig.RestartCmd, ":", err)
+					c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartDir, "【cmd】=", commonConfig.LocalConfig.RestartCmd, ":", err)
 					os.Exit(-1)
 				}
 			}
-			log.GlobalLogger.Info("程序已被更新,正常退出")
+			c_log.GlobalLogger.Info("程序已被更新,正常退出")
 			os.Exit(1)
 		}
 	}

+ 0 - 15
pji/common/svc/refresh_config.go

@@ -1,15 +0,0 @@
-package svc
-
-import (
-	"cicv-data-closedloop/pji/common/cfg"
-	"time"
-)
-
-// RefreshCloudConfig 轮询oss上的配置文件更新到本地
-func RefreshCloudConfig() {
-	// 定时刷新
-	for {
-		time.Sleep(time.Duration(cfg.CloudConfig.ConfigRefreshInterval) * time.Second)
-		cfg.InitCloudConfig()
-	}
-}

+ 3 - 3
pji/common/svc/rosbag_clean.go

@@ -1,15 +1,15 @@
 package svc
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/log"
 	"cicv-data-closedloop/pji/common/util"
 	"time"
 )
 
 // BagCacheClean 保证本地缓存的包数量不超过设定值
 func BagCacheClean() {
-	log.GlobalLogger.Info("启动清理缓存的 goroutine 维护目录【", cfg.CloudConfig.BagDataDir, "】的 bag 包数量:", cfg.CloudConfig.BagNumber)
+	c_log.GlobalLogger.Info("启动清理缓存的 goroutine 维护目录【", cfg.CloudConfig.BagDataDir, "】的 bag 包数量:", cfg.CloudConfig.BagNumber)
 	for {
 		// 收到自杀信号
 		select {
@@ -24,7 +24,7 @@ func BagCacheClean() {
 		// 1 ------- 每10秒清理一次 -------
 		time.Sleep(time.Duration(10) * time.Second)
 		// 2 ------- 获取目录下所有bag包 -------
-		bags := util.ListAbsolutePathWithSuffixAndSort(cfg.CloudConfig.BagDataDir, ".bag")
+		bags, _ := util.ListAbsolutePathWithSuffixAndSort(cfg.CloudConfig.BagDataDir, ".bag")
 		// 3 如果打包数量超过n个,删除最旧的包{
 		if len(bags) > cfg.CloudConfig.BagNumber {
 			diff := len(bags) - cfg.CloudConfig.BagNumber

+ 18 - 18
pji/common/svc/rosbag_upload.go

@@ -1,9 +1,9 @@
 package svc
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	commonConfig "cicv-data-closedloop/pji/common/cfg"
 	"cicv-data-closedloop/pji/common/global"
-	"cicv-data-closedloop/pji/common/log"
 	"cicv-data-closedloop/pji/common/util"
 	masterConfig "cicv-data-closedloop/pji/master/pkg/cfg"
 	"fmt"
@@ -13,7 +13,7 @@ import (
 )
 
 func RunTimeWindowConsumerQueue(nodeName string) {
-	log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
+	c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
 outLoop:
 	for {
 
@@ -39,14 +39,14 @@ outLoop:
 		if waitLength == 0 {
 			continue outLoop
 		}
-		log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
+		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
 		// 1 获取即将处理的窗口
 		currentTimeWindow := global.TimeWindowConsumerQueue[0]
 		util.RemoveHeaOfdTimeWindowConsumerQueue()
-		log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
+		c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		// 2 获取目录
 		dir := util.GetCopyDir(currentTimeWindow.FaultTime)
-		bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
+		bags, _ := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
 		bagNumber := len(bags)
 		if bagNumber > currentTimeWindow.Length {
 			bagNumber = currentTimeWindow.Length
@@ -70,34 +70,34 @@ outLoop:
 				newName := bag + "_filter"
 				filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
 				_, output, err := util.Execute("rosbag", filterCommand...)
-				log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
+				c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
 				if err != nil {
-					log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
+					c_log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
 					continue
 				}
 				// 删除旧文件
 				util.DeleteFile(oldName)
 				// 将新文件改回旧文件名
 				if err = os.Rename(newName, oldName); err != nil {
-					log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
+					c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
 					continue outLoop
 				}
 			}
 		}
 
 		// 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
-		log.GlobalLogger.Info("压缩 bag 数据包,故障时间为:", currentTimeWindow.FaultTime)
+		c_log.GlobalLogger.Info("压缩 bag 数据包,故障时间为:", currentTimeWindow.FaultTime)
 		for i, bag := range bags {
 			oldName := bag
 			compressCommand := []string{"compress", "--bz2", oldName}
-			log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
+			c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
 			if _, output, err := util.Execute("rosbag", compressCommand...); err != nil {
-				log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
+				c_log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
 				continue
 			}
 		}
 		// 5 upload,必须顺序执行
-		log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
+		c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
 		start := time.Now()
 		objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
@@ -105,14 +105,14 @@ outLoop:
 
 		for i, bag := range bags {
 			bagSlice := strings.Split(bag, "/")
-			log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
+			c_log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
 			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
 			if err != nil {
-				log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
+				c_log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
 				continue
 			}
 		}
-		log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
+		c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
 		// 在上传完成的包目录同级下添加一个目录同名的json
 		triggerIds := make([]string, 0)
 		for _, label := range currentTimeWindow.Labels {
@@ -130,15 +130,15 @@ outLoop:
 		}
 		callBackJson, err := util.MapToJsonString(callBackMap)
 		if err != nil {
-			log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
+			c_log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
 		}
 		err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
 		if err != nil {
-			log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
+			c_log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
 		}
 
 		// 删除本地所有已上传的bag文件
-		log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
+		c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		if err = util.RemoveDir(dir); err != nil {
 			continue outLoop
 		}

+ 2 - 2
pji/common/util/parse_json.go

@@ -1,8 +1,8 @@
 package util
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/pji/common/ent"
-	"cicv-data-closedloop/pji/common/log"
 	"encoding/json"
 )
 
@@ -27,7 +27,7 @@ func MapToJsonString(inputMap map[string]interface{}) (string, error) {
 func TimeWindowToJson(msg ent.TimeWindow) string {
 	jsonData, err := json.Marshal(msg)
 	if err != nil {
-		log.GlobalLogger.Error("timeWindow", msg, "转换为json时出错:", err)
+		c_log.GlobalLogger.Error("timeWindow", msg, "转换为json时出错:", err)
 	}
 	return string(jsonData)
 }