孟令鑫 1 سال پیش
والد
کامیت
42c9638b76

+ 0 - 25
aarch64/pjisuv/common/global/global.go

@@ -1,25 +0,0 @@
-package global
-
-import (
-	"cicv-data-closedloop/kinglong/common/ent"
-	"sync"
-	"time"
-)
-
-var (
-	TimeWindowProducerQueue      []ent.TimeWindow
-	TimeWindowProducerQueueMutex sync.RWMutex
-
-	TimeWindowConsumerQueue      []ent.TimeWindow
-	TimeWindowConsumerQueueMutex sync.RWMutex
-
-	Subscriber0Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber0TimeMutex sync.Mutex
-	Subscriber2Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber2TimeMutex sync.Mutex
-	Subscriber3Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber3TimeMutex sync.Mutex
-
-	TcpSendTime      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	TcpSendTimeMutex sync.Mutex
-)

+ 10 - 9
aarch64/pjisuv/common/service/disk_clean.go

@@ -2,19 +2,20 @@ package svc
 
 
 import (
 import (
 	commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
 	commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
+	masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/package/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/entity"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/util"
-	masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
+	"cicv-data-closedloop/common/util"
 	"time"
 	"time"
 )
 )
 
 
 // DiskClean 如果磁盘占用过高,则删除timeWindow和对应的文件
 // DiskClean 如果磁盘占用过高,则删除timeWindow和对应的文件
 func DiskClean() {
 func DiskClean() {
-	log.GlobalLogger.Info("启动timeWindow清理goroutine,根据缓存策略清理copy目录。")
+	c_log.GlobalLogger.Info("启动timeWindow清理goroutine,根据缓存策略清理copy目录。")
 	for {
 	for {
 		time.Sleep(1000 * time.Millisecond)
 		time.Sleep(1000 * time.Millisecond)
-		bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagCopyDir, ".bag")
+		bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagCopyDir, ".bag")
 		if len(bags) == 0 {
 		if len(bags) == 0 {
 			continue
 			continue
 		}
 		}
@@ -33,7 +34,7 @@ func DiskClean() {
 		if percent > commonConfig.CloudConfig.DiskUsage {
 		if percent > commonConfig.CloudConfig.DiskUsage {
 			// 2 获取策略
 			// 2 获取策略
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
-			log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
+			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
 			if policy == "TTL" {
 			if policy == "TTL" {
 				// 1 获取时间窗口队列中的第二个
 				// 1 获取时间窗口队列中的第二个
 				if len(entity.TimeWindowConsumerQueue) > 2 {
 				if len(entity.TimeWindowConsumerQueue) > 2 {
@@ -53,7 +54,7 @@ func DiskClean() {
 					}
 					}
 				}
 				}
 			} else {
 			} else {
-				log.GlobalLogger.Error("未知的缓存策略:", policy)
+				c_log.GlobalLogger.Error("未知的缓存策略:", policy)
 			}
 			}
 		}
 		}
 	}
 	}
@@ -63,10 +64,10 @@ func deleteTimeWindow(indexToRemove int) {
 	timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
 	timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
 	// 删除文件
 	// 删除文件
 	faultTime := timeWindowToRemove.FaultTime
 	faultTime := timeWindowToRemove.FaultTime
-	dir := util.GetCopyDir(faultTime)
+	dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime)
 	err := util.RemoveDir(dir)
 	err := util.RemoveDir(dir)
 	if err != nil {
 	if err != nil {
-		log.GlobalLogger.Error("删除目录", dir, "失败:", err)
+		c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
 	}
 	}
 	entity.TimeWindowConsumerQueueMutex.Lock()
 	entity.TimeWindowConsumerQueueMutex.Lock()
 	// 使用切片的特性删除指定位置的元素
 	// 使用切片的特性删除指定位置的元素

+ 7 - 7
aarch64/pjisuv/common/service/rosbag_clean.go

@@ -1,15 +1,15 @@
 package svc
 package svc
 
 
 import (
 import (
-	"cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/util"
+	"cicv-data-closedloop/aarch64/pjisuv/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"time"
 	"time"
 )
 )
 
 
 // BagCacheClean 保证本地缓存的包数量不超过设定值
 // BagCacheClean 保证本地缓存的包数量不超过设定值
 func BagCacheClean() {
 func BagCacheClean() {
-	log.GlobalLogger.Info("启动清理缓存的 goroutine 维护目录【", cfg.CloudConfig.BagDataDir, "】的 bag 包数量:", cfg.CloudConfig.BagNumber)
+	c_log.GlobalLogger.Info("启动清理缓存的 goroutine 维护目录【", config.CloudConfig.BagDataDir, "】的 bag 包数量:", config.CloudConfig.BagNumber)
 	for {
 	for {
 		// 收到自杀信号
 		// 收到自杀信号
 		select {
 		select {
@@ -24,10 +24,10 @@ func BagCacheClean() {
 		// 1 ------- 每10秒清理一次 -------
 		// 1 ------- 每10秒清理一次 -------
 		time.Sleep(time.Duration(10) * time.Second)
 		time.Sleep(time.Duration(10) * time.Second)
 		// 2 ------- 获取目录下所有bag包 -------
 		// 2 ------- 获取目录下所有bag包 -------
-		bags := util.ListAbsolutePathWithSuffixAndSort(cfg.CloudConfig.BagDataDir, ".bag")
+		bags, _ := util.ListAbsolutePathWithSuffixAndSort(config.CloudConfig.BagDataDir, ".bag")
 		// 3 如果打包数量超过n个,删除最旧的包{
 		// 3 如果打包数量超过n个,删除最旧的包{
-		if len(bags) > cfg.CloudConfig.BagNumber {
-			diff := len(bags) - cfg.CloudConfig.BagNumber
+		if len(bags) > config.CloudConfig.BagNumber {
+			diff := len(bags) - config.CloudConfig.BagNumber
 			for i := 0; i < diff; i++ {
 			for i := 0; i < diff; i++ {
 				util.DeleteFile(bags[i])
 				util.DeleteFile(bags[i])
 			}
 			}

+ 19 - 20
aarch64/pjisuv/common/service/rosbag_record.go

@@ -1,10 +1,9 @@
 package svc
 package svc
 
 
 import (
 import (
+	"cicv-data-closedloop/aarch64/pjisuv/common/config"
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
 	"cicv-data-closedloop/common/util"
-	cfg2 "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
 	"github.com/bluenviron/goroslib/v2"
 	"github.com/bluenviron/goroslib/v2"
 	"os"
 	"os"
 	"time"
 	"time"
@@ -12,21 +11,21 @@ import (
 
 
 // BagRecord 打包rosbag
 // BagRecord 打包rosbag
 func BagRecord(nodeName string) {
 func BagRecord(nodeName string) {
-	log.GlobalLogger.Info("rosbag record goroutine - 启动")
+	c_log.GlobalLogger.Info("rosbag record goroutine - 启动")
 	for {
 	for {
-		log.GlobalLogger.Info("校验必需的 rosnode 是否全部启动。")
+		c_log.GlobalLogger.Info("校验必需的 rosnode 是否全部启动。")
 		canRecord := false
 		canRecord := false
 		for !canRecord {
 		for !canRecord {
 			time.Sleep(time.Duration(2) * time.Second)
 			time.Sleep(time.Duration(2) * time.Second)
-			canRecord = isCanRecord(cfg2.RosNode)
+			canRecord = isCanRecord(config.RosNode)
 		}
 		}
-		log.GlobalLogger.Info("rosnode启动完成,正在启动record命令。")
+		c_log.GlobalLogger.Info("rosnode启动完成,正在启动record命令。")
 
 
 		var command []string
 		var command []string
 		command = append(command, "record")
 		command = append(command, "record")
 		command = append(command, "--split")
 		command = append(command, "--split")
 		command = append(command, "--duration=1")
 		command = append(command, "--duration=1")
-		for _, host := range cfg2.CloudConfig.Hosts {
+		for _, host := range config.CloudConfig.Hosts {
 			if host.Name == nodeName {
 			if host.Name == nodeName {
 				for _, topic := range host.Topics {
 				for _, topic := range host.Topics {
 					command = append(command, topic)
 					command = append(command, topic)
@@ -37,13 +36,13 @@ func BagRecord(nodeName string) {
 		// 2 ------- 调用 rosbag 打包命令,该命令自动阻塞 -------
 		// 2 ------- 调用 rosbag 打包命令,该命令自动阻塞 -------
 		// 不在此处压缩,因为 rosbag filter 时会报错。在上传到oss之前压缩即可。
 		// 不在此处压缩,因为 rosbag filter 时会报错。在上传到oss之前压缩即可。
 		// 包名格式:2023-11-15-17-35-20_0.bag
 		// 包名格式:2023-11-15-17-35-20_0.bag
-		cutil.CreateParentDir(cfg2.CloudConfig.BagDataDir)
-		cmd, err := util.ExecuteWithEnvAndDirAsync(os.Environ(), cfg2.CloudConfig.BagDataDir, "rosbag", command...)
+		_ = util.CreateParentDir(config.CloudConfig.BagDataDir)
+		cmd, err := util.ExecuteWithEnvAndDirAsync(os.Environ(), config.CloudConfig.BagDataDir, "rosbag", command...)
 		if err != nil {
 		if err != nil {
-			log.GlobalLogger.Error("执行record命令", command, "出错:", err)
+			c_log.GlobalLogger.Error("执行record命令", command, "出错:", err)
 			continue
 			continue
 		}
 		}
-		log.GlobalLogger.Info("启动record命令成功。")
+		c_log.GlobalLogger.Info("启动record命令成功。")
 
 
 		recordProcessPid := cmd.Process.Pid
 		recordProcessPid := cmd.Process.Pid
 		var recordSubProcessPid int
 		var recordSubProcessPid int
@@ -51,25 +50,25 @@ func BagRecord(nodeName string) {
 			time.Sleep(time.Duration(2) * time.Second)
 			time.Sleep(time.Duration(2) * time.Second)
 			recordSubProcessPid, err = util.GetSubProcessPid(recordProcessPid)
 			recordSubProcessPid, err = util.GetSubProcessPid(recordProcessPid)
 			if err != nil {
 			if err != nil {
-				log.GlobalLogger.Info("正在等待获取进程 ", recordProcessPid, " 的子进程的pid。")
+				c_log.GlobalLogger.Info("正在等待获取进程 ", recordProcessPid, " 的子进程的pid。")
 				continue
 				continue
 			}
 			}
 			if recordSubProcessPid != 0 {
 			if recordSubProcessPid != 0 {
-				log.GlobalLogger.Info("获取进程 ", recordProcessPid, " 的子进程的pid:", recordSubProcessPid)
+				c_log.GlobalLogger.Info("获取进程 ", recordProcessPid, " 的子进程的pid:", recordSubProcessPid)
 				break
 				break
 			}
 			}
 		}
 		}
 		// 等待自杀信号
 		// 等待自杀信号
-		log.GlobalLogger.Info("rosbag record goroutine - 等待自杀信号")
+		c_log.GlobalLogger.Info("rosbag record goroutine - 等待自杀信号")
 		select {
 		select {
 		case signal := <-ChannelKillRosRecord:
 		case signal := <-ChannelKillRosRecord:
 			if signal == 1 {
 			if signal == 1 {
 				if err = util.KillProcessByPid(recordSubProcessPid); err != nil {
 				if err = util.KillProcessByPid(recordSubProcessPid); err != nil {
-					log.GlobalLogger.Errorf("程序阻塞,杀死record命令子进程出错,【pid】=%v,【err】=%v。", recordSubProcessPid, err)
+					c_log.GlobalLogger.Errorf("程序阻塞,杀死record命令子进程出错,【pid】=%v,【err】=%v。", recordSubProcessPid, err)
 					select {} // 此处阻塞防止record命令一直录包占满存储
 					select {} // 此处阻塞防止record命令一直录包占满存储
 				}
 				}
 				if err = cmd.Process.Kill(); err != nil {
 				if err = cmd.Process.Kill(); err != nil {
-					log.GlobalLogger.Error("程序阻塞,杀死record命令父进程", recordProcessPid, "出错:", err)
+					c_log.GlobalLogger.Error("程序阻塞,杀死record命令父进程", recordProcessPid, "出错:", err)
 					select {} // 此处阻塞防止record命令一直录包占满存储
 					select {} // 此处阻塞防止record命令一直录包占满存储
 				}
 				}
 				AddKillTimes("1")
 				AddKillTimes("1")
@@ -99,14 +98,14 @@ func isCanRecord(n *goroslib.Node) bool {
 	time.Sleep(time.Duration(1) * time.Second)
 	time.Sleep(time.Duration(1) * time.Second)
 	nodes, err := n.MasterGetNodes()
 	nodes, err := n.MasterGetNodes()
 	if err != nil {
 	if err != nil {
-		log.GlobalLogger.Error("获取rosnode出错:", err)
+		c_log.GlobalLogger.Error("获取rosnode出错:", err)
 		return false
 		return false
 	}
 	}
 	myMap := nodes
 	myMap := nodes
-	mySlice := cfg2.CloudConfig.Ros.Nodes
+	mySlice := config.CloudConfig.Ros.Nodes
 	for _, element := range mySlice {
 	for _, element := range mySlice {
 		if _, ok := myMap[element]; !ok {
 		if _, ok := myMap[element]; !ok {
-			log.GlobalLogger.Info("rosnode:", element, " 未启动,需等待启动后才可启动record。")
+			c_log.GlobalLogger.Info("rosnode:", element, " 未启动,需等待启动后才可启动record。")
 			return false
 			return false
 		}
 		}
 	}
 	}

+ 33 - 33
aarch64/pjisuv/common/service/rosbag_upload.go

@@ -1,12 +1,12 @@
 package svc
 package svc
 
 
 import (
 import (
+	commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
+	masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/package/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
 	commonUtil "cicv-data-closedloop/common/util"
 	commonUtil "cicv-data-closedloop/common/util"
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/global"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/util"
-	masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
 	"fmt"
 	"fmt"
 	"os"
 	"os"
 	"strings"
 	"strings"
@@ -14,7 +14,7 @@ import (
 )
 )
 
 
 func RunTimeWindowConsumerQueue(nodeName string) {
 func RunTimeWindowConsumerQueue(nodeName string) {
-	log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
+	c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
 outLoop:
 outLoop:
 	for { // 串行处理
 	for { // 串行处理
 		// 收到自杀信号
 		// 收到自杀信号
@@ -22,7 +22,7 @@ outLoop:
 		case signal := <-ChannelKillConsume:
 		case signal := <-ChannelKillConsume:
 			if signal == 1 {
 			if signal == 1 {
 				ChannelKillConsume <- 1
 				ChannelKillConsume <- 1
-				if len(global.TimeWindowConsumerQueue) == 0 {
+				if len(entity.TimeWindowConsumerQueue) == 0 {
 					AddKillTimes("5")
 					AddKillTimes("5")
 					return
 					return
 				}
 				}
@@ -34,19 +34,19 @@ outLoop:
 		}
 		}
 		// 每一秒扫一次
 		// 每一秒扫一次
 		time.Sleep(time.Duration(1) * time.Second)
 		time.Sleep(time.Duration(1) * time.Second)
-		waitLength := len(global.TimeWindowConsumerQueue)
+		waitLength := len(entity.TimeWindowConsumerQueue)
 		if waitLength == 0 {
 		if waitLength == 0 {
 			continue outLoop
 			continue outLoop
 		}
 		}
-		log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
+		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
 		// 1 获取即将处理的窗口
 		// 1 获取即将处理的窗口
-		currentTimeWindow := global.TimeWindowConsumerQueue[0]
-		util.RemoveHeaOfdTimeWindowConsumerQueue()
-		log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
+		currentTimeWindow := entity.TimeWindowConsumerQueue[0]
+		entity.RemoveHeaOfdTimeWindowConsumerQueue()
+		c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 
 
 		// 2 获取目录
 		// 2 获取目录
-		dir := util.GetCopyDir(currentTimeWindow.FaultTime)
-		bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
+		dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
+		bags, _ := commonUtil.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
 		bagNumber := len(bags)
 		bagNumber := len(bags)
 		if bagNumber > currentTimeWindow.Length {
 		if bagNumber > currentTimeWindow.Length {
 			bagNumber = currentTimeWindow.Length
 			bagNumber = currentTimeWindow.Length
@@ -70,56 +70,56 @@ outLoop:
 				newName := bag + "_filter"
 				newName := bag + "_filter"
 				filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
 				filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
 				_, output, err := commonUtil.ExecuteWithEnvSync(os.Environ(), "rosbag", filterCommand...)
 				_, output, err := commonUtil.ExecuteWithEnvSync(os.Environ(), "rosbag", filterCommand...)
-				log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
+				c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
 				if err != nil {
 				if err != nil {
-					log.GlobalLogger.Errorf("filter 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
+					c_log.GlobalLogger.Errorf("filter 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
 					continue
 					continue
 				}
 				}
 				// 删除旧文件
 				// 删除旧文件
-				util.DeleteFile(oldName)
+				_ = commonUtil.DeleteFile(oldName)
 				// 将新文件改回旧文件名
 				// 将新文件改回旧文件名
 				if err = os.Rename(newName, oldName); err != nil {
 				if err = os.Rename(newName, oldName); err != nil {
-					log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
+					c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
 					continue outLoop
 					continue outLoop
 				}
 				}
 			}
 			}
 		}
 		}
 
 
 		// 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
 		// 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
-		log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
+		c_log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
 		for i, bag := range bags {
 		for i, bag := range bags {
 			oldName := bag
 			oldName := bag
 			compressCommand := []string{"compress", "--bz2", oldName}
 			compressCommand := []string{"compress", "--bz2", oldName}
-			log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
+			c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
 			if _, output, err := commonUtil.ExecuteWithEnvSync(os.Environ(), "rosbag", compressCommand...); err != nil {
 			if _, output, err := commonUtil.ExecuteWithEnvSync(os.Environ(), "rosbag", compressCommand...); err != nil {
-				log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
+				c_log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
 				continue
 				continue
 			}
 			}
 		}
 		}
 		// 5 upload,必须顺序执行
 		// 5 upload,必须顺序执行
-		log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
+		c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
 		start := time.Now()
 		start := time.Now()
 		objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
 		objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
 		objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		for i, bag := range bags {
 		for i, bag := range bags {
 			bagSlice := strings.Split(bag, "/")
 			bagSlice := strings.Split(bag, "/")
-			log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
+			c_log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
 			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
 			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
 			if err != nil {
 			if err != nil {
-				log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
+				c_log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
 				continue
 				continue
 			}
 			}
 		}
 		}
-		log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
+		c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
 		// 在上传完成的包目录同级下添加一个目录同名的json
 		// 在上传完成的包目录同级下添加一个目录同名的json
 		triggerIds := make([]string, 0)
 		triggerIds := make([]string, 0)
 		for _, label := range currentTimeWindow.Labels {
 		for _, label := range currentTimeWindow.Labels {
 			triggerIdToAppend := masterConfig.LabelMapTriggerId[label]
 			triggerIdToAppend := masterConfig.LabelMapTriggerId[label]
-			log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
+			c_log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
 			triggerIds = append(triggerIds, triggerIdToAppend)
 			triggerIds = append(triggerIds, triggerIdToAppend)
 		}
 		}
-		log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds)
+		c_log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds)
 		callBackMap := map[string]interface{}{
 		callBackMap := map[string]interface{}{
 			"dataName":    currentTimeWindow.FaultTime,
 			"dataName":    currentTimeWindow.FaultTime,
 			"dataSize":    "", // 由合并程序补充
 			"dataSize":    "", // 由合并程序补充
@@ -130,19 +130,19 @@ outLoop:
 			"taskId":      commonConfig.PlatformConfig.TaskConfigId,
 			"taskId":      commonConfig.PlatformConfig.TaskConfigId,
 			"triggerId":   triggerIds,
 			"triggerId":   triggerIds,
 		}
 		}
-		callBackJson, err := util.MapToJsonString(callBackMap)
-		log.GlobalLogger.Info("【callBackJson】=", callBackJson)
+		callBackJson, err := commonUtil.MapToJsonString(callBackMap)
+		c_log.GlobalLogger.Info("【callBackJson】=", callBackJson)
 		if err != nil {
 		if err != nil {
-			log.GlobalLogger.Error("callBackMap", callBackMap, "转 json 失败:", err)
+			c_log.GlobalLogger.Error("callBackMap", callBackMap, "转 json 失败:", err)
 		}
 		}
 		err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
 		err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
 		if err != nil {
 		if err != nil {
-			log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
+			c_log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
 		}
 		}
 
 
 		// 删除本地所有已上传的bag文件
 		// 删除本地所有已上传的bag文件
-		log.GlobalLogger.Infof("结束处理窗口,【Label】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
-		if err = util.RemoveDir(dir); err != nil {
+		c_log.GlobalLogger.Infof("结束处理窗口,【Label】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
+		if err = commonUtil.RemoveDir(dir); err != nil {
 			continue outLoop
 			continue outLoop
 		}
 		}
 
 

+ 0 - 16
aarch64/pjisuv/common/util/move_bag.go

@@ -1,16 +0,0 @@
-package util
-
-import (
-	"cicv-data-closedloop/common/util"
-	"strings"
-)
-
-func MoveFromDataToCopy(faultTime string, bagDataDir string, sourceBag string) {
-	dir := GetCopyDir(faultTime)
-	_ = util.CreateDir(dir)
-	targetBag := strings.Replace(sourceBag, bagDataDir, dir, 1)
-	var copyCommand []string
-	copyCommand = append(copyCommand, sourceBag)
-	copyCommand = append(copyCommand, targetBag)
-	_, _, _ = Execute("mv", copyCommand...)
-}

+ 0 - 33
aarch64/pjisuv/common/util/parse_json.go

@@ -1,33 +0,0 @@
-package util
-
-import (
-	"cicv-data-closedloop/kinglong/common/ent"
-	"cicv-data-closedloop/kinglong/common/log"
-	"encoding/json"
-)
-
-func JsonStringToMap(source string) (map[string]interface{}, error) {
-	var dataMap map[string]interface{}
-	err := json.Unmarshal([]byte(source), &dataMap)
-	if err != nil {
-		return nil, err
-	} else {
-		return dataMap, nil
-	}
-}
-
-func MapToJsonString(inputMap map[string]interface{}) (string, error) {
-	jsonBytes, err := json.Marshal(inputMap)
-	if err != nil {
-		return "", err
-	}
-	return string(jsonBytes), nil
-}
-
-func TimeWindowToJson(msg ent.TimeWindow) string {
-	jsonData, err := json.Marshal(msg)
-	if err != nil {
-		log.GlobalLogger.Error("timeWindow", msg, "转换为json时出错:", err)
-	}
-	return string(jsonData)
-}

+ 0 - 43
aarch64/pjisuv/common/util/util_io.go

@@ -1,43 +0,0 @@
-package util
-
-import (
-	"os"
-	"path/filepath"
-)
-
-// RemoveDir 递归删除目录及其下的所有文件和子目录
-func RemoveDir(dirPath string) error {
-	// 打开目录
-	dir, err := os.Open(dirPath)
-	if err != nil {
-		return err
-	}
-	defer dir.Close()
-	// 读取目录下的文件和子目录
-	fileInfos, err := dir.Readdir(-1)
-	if err != nil {
-		return err
-	}
-	// 遍历文件和子目录
-	for _, fileInfo := range fileInfos {
-		path := filepath.Join(dirPath, fileInfo.Name())
-
-		if fileInfo.IsDir() {
-			// 如果是子目录,递归调用removeDir删除子目录及其下的文件和子目录
-			RemoveDir(path)
-
-		} else {
-			// 如果是文件,直接删除文件
-			err = os.Remove(path)
-			if err != nil {
-				return err
-			}
-		}
-	}
-	// 删除目录本身
-	err = os.Remove(dirPath)
-	if err != nil {
-		return err
-	}
-	return nil
-}

+ 0 - 28
aarch64/pjisuv/master/package/config/master_tcp_config.go

@@ -1,28 +0,0 @@
-package config
-
-import (
-	"cicv-data-closedloop/kinglong/common/log"
-	"net"
-)
-
-var TcpConnection net.Conn
-
-// InitTcpConfig 初始化TCP连接,用于发送时间窗口到从节点
-func InitTcpConfig() {
-	log.GlobalLogger.Info("主节点初始化TCP长连接 - 开始。")
-	//var err error
-	//socket := commonConfig.CloudConfig.Hosts[1].Ip + ":" + commonConfig.CloudConfig.TcpPort
-	//for {
-	//	time.Sleep(time.Duration(1) * time.Second)
-	//	TcpConnection, err = net.Dial("tcp", socket)
-	//	if err != nil {
-	//		log.GlobalLogger.Error("主节点初始化TCP长连接 - 进行中:", err)
-	//		continue
-	//	} else {
-	//		log.GlobalLogger.Info("主节点初始化TCP长连接 - 成功:", socket)
-	//		break
-	//	}
-	//
-	//}
-
-}

+ 29 - 41
aarch64/pjisuv/master/package/service/move_bag_and_send_window.go

@@ -1,27 +1,26 @@
 package svc
 package svc
 
 
 import (
 import (
-	commonUtil "cicv-data-closedloop/common/util"
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/ent"
-	"cicv-data-closedloop/kinglong/common/global"
-	"cicv-data-closedloop/kinglong/common/log"
-	commonService "cicv-data-closedloop/kinglong/common/svc"
-	"cicv-data-closedloop/kinglong/common/util"
+	commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
+	commonService "cicv-data-closedloop/aarch64/pjisuv/common/service"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	"net"
 	"net"
 	"time"
 	"time"
 )
 )
 
 
 // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
 // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
 func RunTimeWindowProducerQueue() {
 func RunTimeWindowProducerQueue() {
-	log.GlobalLogger.Info("生产者队列goroutine - 启动")
+	c_log.GlobalLogger.Info("生产者队列goroutine - 启动")
 	for {
 	for {
 		// 收到自杀信号
 		// 收到自杀信号
 		select {
 		select {
 		case signal := <-commonService.ChannelKillMove:
 		case signal := <-commonService.ChannelKillMove:
 			if signal == 1 {
 			if signal == 1 {
 				commonService.ChannelKillMove <- 1
 				commonService.ChannelKillMove <- 1
-				if len(global.TimeWindowProducerQueue) == 0 {
+				if len(entity.TimeWindowProducerQueue) == 0 {
 					commonService.AddKillTimes("4")
 					commonService.AddKillTimes("4")
 					return
 					return
 				}
 				}
@@ -33,9 +32,9 @@ func RunTimeWindowProducerQueue() {
 		}
 		}
 
 
 		time.Sleep(time.Duration(1) * time.Second)
 		time.Sleep(time.Duration(1) * time.Second)
-		if len(global.TimeWindowProducerQueue) > 0 {
-			bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
-			currentTimeWindow := global.TimeWindowProducerQueue[0]
+		if len(entity.TimeWindowProducerQueue) > 0 {
+			bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
+			currentTimeWindow := entity.TimeWindowProducerQueue[0]
 			move := false
 			move := false
 			bigger := false
 			bigger := false
 			for _, bag := range bags {
 			for _, bag := range bags {
@@ -45,7 +44,7 @@ func RunTimeWindowProducerQueue() {
 				compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
 				compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
 				if compare1 && compare2 {
 				if compare1 && compare2 {
 					// 将bag包移动到Copy目录
 					// 将bag包移动到Copy目录
-					util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
+					domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
 					move = true
 					move = true
 				} else {
 				} else {
 					if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
 					if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
@@ -60,25 +59,25 @@ func RunTimeWindowProducerQueue() {
 				// 1 如果第一个已经大于了timeWindowEnd,则触发上传并删除
 				// 1 如果第一个已经大于了timeWindowEnd,则触发上传并删除
 				// 将时间窗口发送给从节点
 				// 将时间窗口发送给从节点
 				currentTimeWindow.CanUpload = "yes"
 				currentTimeWindow.CanUpload = "yes"
-				log.GlobalLogger.Info("将已完成的窗口发送给从节点:", currentTimeWindow.CanUpload)
-				util.SupplyCopyBags(currentTimeWindow)
-				util.RefreshTcpSendTime()
+				c_log.GlobalLogger.Info("将已完成的窗口发送给从节点:", currentTimeWindow.CanUpload)
+				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
+				entity.RefreshTcpSendTime()
 				go sendTimeWindowByTcp(currentTimeWindow)
 				go sendTimeWindowByTcp(currentTimeWindow)
 				// 将时间窗口移出准备队列
 				// 将时间窗口移出准备队列
-				util.RemoveHeadOfdTimeWindowProducerQueue()
+				entity.RemoveHeadOfdTimeWindowProducerQueue()
 				// 将时间窗口加入运行队列
 				// 将时间窗口加入运行队列
-				util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
+				entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
 				// 获取copy目录下的字典json,key为触发时间,value为label
 				// 获取copy目录下的字典json,key为触发时间,value为label
-				timeToLabelJson, _ := commonUtil.ReadFile(commonConfig.CloudConfig.TimeToLabelJsonPath)
-				timeToLabelMap, _ := commonUtil.JsonStringToMap(timeToLabelJson)
-				timeToLabelMap[currentTimeWindow.FaultTime] = commonUtil.ToString(currentTimeWindow.Labels)
-				timeToLabelJson, _ = commonUtil.MapToJsonString(timeToLabelMap)
-				_ = commonUtil.WriteFile(timeToLabelJson, commonConfig.CloudConfig.TimeToLabelJsonPath)
+				timeToLabelJson, _ := util.ReadFile(commonConfig.CloudConfig.TimeToLabelJsonPath)
+				timeToLabelMap, _ := util.JsonStringToMap(timeToLabelJson)
+				timeToLabelMap[currentTimeWindow.FaultTime] = util.ToString(currentTimeWindow.Labels)
+				timeToLabelJson, _ = util.MapToJsonString(timeToLabelMap)
+				_ = util.WriteFile(timeToLabelJson, commonConfig.CloudConfig.TimeToLabelJsonPath)
 				continue
 				continue
 			} else { // 保证当前窗口只发送一次,每间隔5秒发一次
 			} else { // 保证当前窗口只发送一次,每间隔5秒发一次
-				if int(time.Since(global.TcpSendTime).Seconds()) > commonConfig.CloudConfig.TimeWindowSendGap {
-					log.GlobalLogger.Info("每隔", commonConfig.CloudConfig.TimeWindowSendGap, "秒发送一次tcp消息")
-					util.RefreshTcpSendTime()
+				if int(time.Since(entity.TcpSendTime).Seconds()) > commonConfig.CloudConfig.TimeWindowSendGap {
+					c_log.GlobalLogger.Info("每隔", commonConfig.CloudConfig.TimeWindowSendGap, "秒发送一次tcp消息")
+					entity.RefreshTcpSendTime()
 					// 2 如果第一个不大于timeWindowEnd,则发送不可上传的窗口信息。
 					// 2 如果第一个不大于timeWindowEnd,则发送不可上传的窗口信息。
 					currentTimeWindow.CanUpload = "no"
 					currentTimeWindow.CanUpload = "no"
 					go sendTimeWindowByTcp(currentTimeWindow)
 					go sendTimeWindowByTcp(currentTimeWindow)
@@ -88,30 +87,19 @@ func RunTimeWindowProducerQueue() {
 	}
 	}
 }
 }
 
 
-//TODO 服务端接受连接时如何维护该链接
-//func sendTimeWindowByTcp(timeWindow ent.TimeWindow) {
-//	// 发送数据
-//	send := util2.TimeWindowToJson(timeWindow)
-//	_, err := masterCfg.TcpConnection.Write([]byte(send))
-//	if err != nil {
-//		log.GlobalLogger.Error("master发送给slave时间窗口", timeWindow, "失败:", err)
-//		return
-//	}
-//}
-
-func sendTimeWindowByTcp(timeWindow ent.TimeWindow) {
+func sendTimeWindowByTcp(timeWindow entity.TimeWindow) {
 	socket := commonConfig.CloudConfig.Hosts[1].Ip + ":" + commonConfig.CloudConfig.TcpPort
 	socket := commonConfig.CloudConfig.Hosts[1].Ip + ":" + commonConfig.CloudConfig.TcpPort
 	tcpConn, err := net.Dial("tcp", socket)
 	tcpConn, err := net.Dial("tcp", socket)
 	if err != nil {
 	if err != nil {
-		log.GlobalLogger.Error("建立tcp连接", socket, "失败:", err)
+		c_log.GlobalLogger.Error("建立tcp连接", socket, "失败:", err)
 		return
 		return
 	}
 	}
 	defer tcpConn.Close()
 	defer tcpConn.Close()
 	// 发送数据
 	// 发送数据
-	send := util.TimeWindowToJson(timeWindow)
+	send, _ := entity.TimeWindowToJson(timeWindow)
 	_, err = tcpConn.Write([]byte(send))
 	_, err = tcpConn.Write([]byte(send))
 	if err != nil {
 	if err != nil {
-		log.GlobalLogger.Error("master发送给slave时间窗口", timeWindow, "失败:", err)
+		c_log.GlobalLogger.Error("master发送给slave时间窗口", timeWindow, "失败:", err)
 		return
 		return
 	}
 	}
 }
 }

+ 8 - 0
common/entity/time_window.go

@@ -1,6 +1,7 @@
 package entity
 package entity
 
 
 import (
 import (
+	"encoding/json"
 	"sync"
 	"sync"
 	"time"
 	"time"
 )
 )
@@ -81,3 +82,10 @@ func GetLastTimeWindow() *TimeWindow {
 	}
 	}
 	return lastTimeWindow
 	return lastTimeWindow
 }
 }
+func TimeWindowToJson(msg TimeWindow) (string, error) {
+	jsonData, err := json.Marshal(msg)
+	if err != nil {
+		return "", err
+	}
+	return string(jsonData), nil
+}

+ 0 - 5
aarch64/pjisuv/common/util/utils.go → common/util/u_disk.go

@@ -1,7 +1,6 @@
 package util
 package util
 
 
 import (
 import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
 	"cicv-data-closedloop/kinglong/common/log"
 	"cicv-data-closedloop/kinglong/common/log"
 	"os/exec"
 	"os/exec"
 	"strconv"
 	"strconv"
@@ -34,7 +33,3 @@ func GetDiskUsagePercent() float64 {
 	}
 	}
 	return 0.0
 	return 0.0
 }
 }
-
-func GetCopyDir(faultTime string) string {
-	return commonConfig.CloudConfig.BagCopyDir + faultTime + "/"
-}