孟令鑫 1 năm trước cách đây
mục cha
commit
6eef18f0ae

+ 13 - 13
aarch64/kinglong/common/config/c_cloud.go

@@ -1,8 +1,8 @@
-package cfg
+package config
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/kinglong/common/log"
 	"gopkg.in/yaml.v3"
 	"os"
 	"sync"
@@ -55,7 +55,7 @@ var (
 
 // InitCloudConfig 初始化业务配置
 func InitCloudConfig() {
-	log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
+	c_log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
 	// 获取文件的目录
 	_ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
 	// 3 ------- 获取 yaml 字符串 -------
@@ -63,13 +63,13 @@ func InitCloudConfig() {
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
 	if err != nil {
-		log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
+		c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		os.Exit(-1)
 	}
 
 	content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
 	if err != nil {
-		log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
+		c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
 		os.Exit(-1)
 	}
 
@@ -77,7 +77,7 @@ func InitCloudConfig() {
 	var newCloudConfig cloudConfig
 	err = yaml.Unmarshal(content, &newCloudConfig)
 	if err != nil {
-		log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
+		c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
 		os.Exit(-1)
 	}
 
@@ -87,10 +87,10 @@ func InitCloudConfig() {
 		CloudConfig = newCloudConfig
 		CloudConfigMutex.RUnlock()
 	} else {
-		log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
+		c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
 		os.Exit(-1)
 	}
-	log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
+	c_log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
 	_ = util.CreateDir(CloudConfig.BagDataDir)
 	_ = util.CreateDir(CloudConfig.BagCopyDir)
 	timeToLabelJson, _ := util.MapToJsonString(map[string]interface{}{"time": "label"})
@@ -106,13 +106,13 @@ func refreshCloudConfig() {
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
 	if err != nil {
-		log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
+		c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		return
 	}
 
 	content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
 	if err != nil {
-		log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
+		c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
 		return
 	}
 
@@ -120,7 +120,7 @@ func refreshCloudConfig() {
 	var newCloudConfig cloudConfig
 	err = yaml.Unmarshal(content, &newCloudConfig)
 	if err != nil {
-		log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
+		c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
 		return
 	}
 
@@ -130,7 +130,7 @@ func refreshCloudConfig() {
 		CloudConfig = newCloudConfig
 		CloudConfigMutex.RUnlock()
 	} else {
-		log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
+		c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
 		return
 	}
 	_ = util.CreateDir(CloudConfig.BagDataDir)
@@ -148,7 +148,7 @@ func RefreshCloudConfig() {
 // CheckConfig 校验 cfg.yaml 文件
 func checkConfig(check cloudConfig) bool {
 	if len(check.Hosts) != 2 {
-		log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为2。")
+		c_log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为2。")
 		os.Exit(-1)
 	}
 	return true

+ 5 - 5
aarch64/kinglong/common/config/c_killrpcserver.go

@@ -1,7 +1,7 @@
-package cfg
+package config
 
 import (
-	"cicv-data-closedloop/kinglong/common/log"
+	"cicv-data-closedloop/common/config/c_log"
 	"net"
 	"os"
 )
@@ -10,12 +10,12 @@ var KillSignalListener net.Listener
 
 func InitKillSignalListener(serverIp string) {
 	var err error
-	log.GlobalLogger.Info("初始化RPC端口监听Kill信号 - 开始。")
+	c_log.GlobalLogger.Info("初始化RPC端口监听Kill信号 - 开始。")
 	socket := serverIp + ":" + CloudConfig.RpcPort
 	KillSignalListener, err = net.Listen("tcp", socket)
 	if err != nil {
-		log.GlobalLogger.Error("监听rpc端口失败:", err)
+		c_log.GlobalLogger.Error("监听rpc端口失败:", err)
 		os.Exit(-1)
 	}
-	log.GlobalLogger.Info("初始化RPC端口监听Kill信号 - 成功:", socket)
+	c_log.GlobalLogger.Info("初始化RPC端口监听Kill信号 - 成功:", socket)
 }

+ 6 - 6
aarch64/kinglong/common/config/c_local.go

@@ -1,7 +1,7 @@
-package cfg
+package config
 
 import (
-	"cicv-data-closedloop/kinglong/common/log"
+	"cicv-data-closedloop/common/config/c_log"
 	"gopkg.in/yaml.v2"
 	"os"
 )
@@ -34,21 +34,21 @@ var (
 )
 
 func InitLocalConfig() {
-	log.GlobalLogger.Info("初始化本地配置文件 - 开始:", localConfigPath)
+	c_log.GlobalLogger.Info("初始化本地配置文件 - 开始:", localConfigPath)
 	// 读取YAML文件内容
 	content, err := os.ReadFile(localConfigPath)
 	if err != nil {
-		log.GlobalLogger.Error("读取本地配置文件失败。", err)
+		c_log.GlobalLogger.Error("读取本地配置文件失败。", err)
 		os.Exit(-1)
 	}
 
 	// 解析YAML内容
 	err = yaml.Unmarshal(content, &LocalConfig)
 	if err != nil {
-		log.GlobalLogger.Error("解析本地配置文件失败。", err)
+		c_log.GlobalLogger.Error("解析本地配置文件失败。", err)
 		os.Exit(-1)
 	}
 
-	log.GlobalLogger.Info("初始化本地配置文件 - 成功:", LocalConfig)
+	c_log.GlobalLogger.Info("初始化本地配置文件 - 成功:", LocalConfig)
 
 }

+ 10 - 10
aarch64/kinglong/common/config/c_oss.go

@@ -1,8 +1,8 @@
-package cfg
+package config
 
 import (
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"encoding/json"
 	"github.com/aliyun/aliyun-oss-go-sdk/oss"
 	"os"
@@ -19,29 +19,29 @@ var OssClient *oss.Client
 var OssBucket *oss.Bucket
 
 func InitOssConfig() {
-	log.GlobalLogger.Info("初始化OSS客户端对象 - 开始。")
+	c_log.GlobalLogger.Info("初始化OSS客户端对象 - 开始。")
 	// 1 访问 HTTP 服务获取 OSS 配置
-	get, err := cutil.HttpGet(LocalConfig.UrlGetOssConfig)
+	get, err := util.HttpGet(LocalConfig.UrlGetOssConfig)
 	if err != nil {
-		log.GlobalLogger.Error("http获取oss配置时出错:", err)
+		c_log.GlobalLogger.Error("http获取oss配置时出错:", err)
 		os.Exit(-1)
 	}
 	var ossConnectInfo OssConnectInfoStruct
 	err = json.Unmarshal([]byte(get), &ossConnectInfo)
 	if err != nil {
-		log.GlobalLogger.Error("解析json时出错:", err)
+		c_log.GlobalLogger.Error("解析json时出错:", err)
 		os.Exit(-1)
 	}
 
 	OssClient, err = oss.New(ossConnectInfo.Endpoint, ossConnectInfo.AccessKeyId, ossConnectInfo.AccessKeySecret, oss.UseCname(true))
 	if err != nil {
-		log.GlobalLogger.Error("无法创建阿里云client:", err)
+		c_log.GlobalLogger.Error("无法创建阿里云client:", err)
 		os.Exit(-1)
 	}
 	OssBucket, err = OssClient.Bucket(ossConnectInfo.BucketName)
 	if err != nil {
-		log.GlobalLogger.Error("无法创建阿里云bucket:", err)
+		c_log.GlobalLogger.Error("无法创建阿里云bucket:", err)
 		os.Exit(-1)
 	}
-	log.GlobalLogger.Info("初始化OSS客户端对象 - 成功。")
+	c_log.GlobalLogger.Info("初始化OSS客户端对象 - 成功。")
 }

+ 18 - 19
aarch64/kinglong/common/config/c_platform.go

@@ -1,9 +1,8 @@
-package cfg
+package config
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
 	"encoding/json"
 	"time"
 )
@@ -40,19 +39,19 @@ var PlatformConfig platformConfig
 // InitPlatformConfig 初始化数据闭环平台的配置
 func InitPlatformConfig() {
 	var err error
-	log.GlobalLogger.Info("获取数据闭环平台配置 - 开始")
+	c_log.GlobalLogger.Info("获取数据闭环平台配置 - 开始")
 	// 1 如果车辆没有配置任务,则阻塞在这里,不启动任务
 	for {
 		time.Sleep(time.Duration(1))
-		log.GlobalLogger.Info("获取数据闭环平台配置 - 进行中")
+		c_log.GlobalLogger.Info("获取数据闭环平台配置 - 进行中")
 		PlatformConfig, err = getConfig()
 		if err != nil {
-			log.GlobalLogger.Error("获取配置status失败:", err)
+			c_log.GlobalLogger.Error("获取配置status失败:", err)
 			continue
 		}
 		break
 	}
-	log.GlobalLogger.Info("获取数据闭环平台配置 - 成功。")
+	c_log.GlobalLogger.Info("获取数据闭环平台配置 - 成功。")
 }
 
 /*
@@ -70,7 +69,7 @@ func InitPlatformConfig() {
 */
 // GetAccessToken 认证接口,获取access_token
 func GetAccessToken() (string, error) {
-	respJson, err := cutil.HttpPostJsonResponseString(
+	respJson, err := util.HttpPostJsonResponseString(
 		CloudConfig.Platform.UrlDeviceAuth,
 		map[string]string{
 			"equipmentNo": LocalConfig.EquipmentNo,
@@ -82,13 +81,13 @@ func GetAccessToken() (string, error) {
 	}
 	respMap, err := util.JsonStringToMap(respJson)
 	if err != nil {
-		log.GlobalLogger.Error("解析返回结果", respJson, "失败:", err)
+		c_log.GlobalLogger.Error("解析返回结果", respJson, "失败:", err)
 		return "", nil
 	}
 
-	dataMap := respMap["data"].(map[string]interface{})
-	if err != nil {
-		log.GlobalLogger.Error("解析返回结果.data", dataMap, "失败:", err)
+	dataMap, ok := respMap["data"].(map[string]interface{})
+	if !ok {
+		c_log.GlobalLogger.Error("解析返回结果.data", dataMap, "失败:", err)
 		return "", nil
 	}
 	return dataMap["accessToken"].(string), nil
@@ -112,7 +111,7 @@ func GetStatus(taskConfigId string) (string, error) {
 	if err != nil {
 		return "", err
 	}
-	resp, err := cutil.HttpGetStringAddHeadersResponseString(
+	resp, err := util.HttpGetStringAddHeadersResponseString(
 		CloudConfig.Platform.UrlTaskPoll,
 		map[string]string{
 			"authorization": token,
@@ -124,17 +123,17 @@ func GetStatus(taskConfigId string) (string, error) {
 	)
 
 	if err != nil {
-		log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
+		c_log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
 		return "", err
 	}
 	respMap, err := util.JsonStringToMap(resp)
 	if err != nil {
-		log.GlobalLogger.Error("解析【返回结果1】", resp, "失败:", err)
+		c_log.GlobalLogger.Error("解析【返回结果1】", resp, "失败:", err)
 		return "", err
 	}
 	dataMap, ok := respMap["data"].(map[string]interface{})
 	if !ok {
-		log.GlobalLogger.Errorf("解析【返回结果.data】的类型不是(map[string]interface{}),【dataMap】=%v", dataMap)
+		c_log.GlobalLogger.Errorf("解析【返回结果.data】的类型不是(map[string]interface{}),【dataMap】=%v", dataMap)
 		return "", err
 	}
 	return dataMap["status"].(string), nil
@@ -147,7 +146,7 @@ func getConfig() (platformConfig, error) {
 	}
 	// 下载插件和获取配置
 	// 2 访问配置获取接口
-	resp, err := cutil.HttpGetStringAddHeadersResponseString(
+	resp, err := util.HttpGetStringAddHeadersResponseString(
 		CloudConfig.Platform.UrlTask,
 		map[string]string{
 			"authorization": token,
@@ -157,13 +156,13 @@ func getConfig() (platformConfig, error) {
 		},
 	)
 	if err != nil {
-		log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
+		c_log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
 		return platformConfig{}, err
 	}
 	var result response
 	err = json.Unmarshal([]byte(resp), &result)
 	if err != nil {
-		log.GlobalLogger.Error("解析【返回结果】", resp, "失败:", err)
+		c_log.GlobalLogger.Error("解析【返回结果】", resp, "失败:", err)
 		return platformConfig{}, err
 	}
 	return result.Data, nil

+ 7 - 7
aarch64/kinglong/common/config/c_ros.go

@@ -1,8 +1,8 @@
-package cfg
+package config
 
 import (
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"github.com/bluenviron/goroslib/v2"
 	"os"
 )
@@ -10,15 +10,15 @@ import (
 var RosNode *goroslib.Node
 
 func InitRosConfig() {
-	log.GlobalLogger.Info("初始化RosNode - 开始")
+	c_log.GlobalLogger.Info("初始化RosNode - 开始")
 	var err error
 	RosNode, err = goroslib.NewNode(goroslib.NodeConf{
-		Name:          "node" + cutil.GetNowTimeCustom(),
+		Name:          "node" + util.GetNowTimeCustom(),
 		MasterAddress: CloudConfig.Ros.MasterAddress,
 	})
 	if err != nil {
-		log.GlobalLogger.Error("初始化RosNode - 失败:", err)
+		c_log.GlobalLogger.Error("初始化RosNode - 失败:", err)
 		os.Exit(-1)
 	}
-	log.GlobalLogger.Info("初始化RosNode - 成功:", CloudConfig.Ros.MasterAddress)
+	c_log.GlobalLogger.Info("初始化RosNode - 成功:", CloudConfig.Ros.MasterAddress)
 }

+ 10 - 35
aarch64/kinglong/common/init/common_init.go

@@ -1,58 +1,33 @@
 package init
 
 import (
-	"cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/svc"
+	"cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/aarch64/kinglong/common/service"
 )
 
 func Init() {
 
-	//// 循环打印cpu占用
-	//go ResourceOccupancy()
-
 	// 初始化本地配置文件(第1处配置,在本地文件)
-	cfg.InitLocalConfig()
+	config.InitLocalConfig()
 
 	// 初始化Oss连接信息
-	cfg.InitOssConfig()
+	config.InitOssConfig()
 
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
-	cfg.InitCloudConfig()
+	config.InitCloudConfig()
 
-	go cfg.RefreshCloudConfig()
+	go config.RefreshCloudConfig()
 
 	// 初始化数据闭环平台的配置(第3处配置,在数据闭环平台接口)
-	cfg.InitPlatformConfig()
+	config.InitPlatformConfig()
 
 	// 初始化ros节点
-	cfg.InitRosConfig()
+	config.InitRosConfig()
 
 	// 维护data目录缓存的包数量
-	go svc.BagCacheClean()
+	go service.BagCacheClean()
 
 	// 磁盘占用过高时根据缓存策略处理copy目录
-	go svc.DiskClean()
+	go service.DiskClean()
 
 }
-
-//func ResourceOccupancy() {
-//	for {
-//		// 获取 CPU 使用率
-//		cpuPercent, err := cpu.Percent(time.Second, false)
-//		if err != nil {
-//			log.MonitorLogger.Info("获取cpu使用率报错:", err)
-//			return
-//		}
-//
-//		var m runtime.MemStats
-//		runtime.ReadMemStats(&m)
-//
-//		// 计算内存占用百分比
-//		memoryPercent := float64(m.Alloc) / float64(m.Sys) * 100.0
-//
-//		log.MonitorLogger.Info("cpu使用率为:", cpuPercent[0]*100, "%,内存使用率为:", memoryPercent, "%")
-//
-//		// 等待一段时间,例如1秒
-//		time.Sleep(time.Second)
-//	}
-//}

+ 23 - 22
aarch64/kinglong/common/service/disk_clean.go

@@ -1,20 +1,21 @@
-package svc
+package service
 
 import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/global"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/util"
-	masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
+	commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
+	masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	"time"
 )
 
 // DiskClean 如果磁盘占用过高,则删除timeWindow和对应的文件
 func DiskClean() {
-	log.GlobalLogger.Info("启动timeWindow清理goroutine,根据缓存策略清理copy目录。")
+	c_log.GlobalLogger.Info("启动timeWindow清理goroutine,根据缓存策略清理copy目录。")
 	for {
 		time.Sleep(1000 * time.Millisecond)
-		bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagCopyDir, ".bag")
+		bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagCopyDir, ".bag")
 		if len(bags) == 0 {
 			continue
 		}
@@ -29,56 +30,56 @@ func DiskClean() {
 			"LRU":  "保留高优先级",
 		}
 		// 1 获取磁盘占用
-		percent := util.GetDiskUsagePercent()
+		percent, _ := util.GetDiskUsagePercent()
 		if percent > commonConfig.CloudConfig.DiskUsage {
 			// 2 获取策略
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
-			log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
+			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
 			if policy == "TTL" {
 				// 1 获取时间窗口队列中的第二个
-				if len(global.TimeWindowConsumerQueue) > 2 {
+				if len(entity.TimeWindowConsumerQueue) > 2 {
 					deleteTimeWindow(1)
 				}
 			} else if policy == "STOP" {
 				// 2 获取时间窗口队列中的倒数第一个
-				if len(global.TimeWindowConsumerQueue) > 2 {
-					deleteTimeWindow(len(global.TimeWindowConsumerQueue) - 1)
+				if len(entity.TimeWindowConsumerQueue) > 2 {
+					deleteTimeWindow(len(entity.TimeWindowConsumerQueue) - 1)
 				}
 			} else if policy == "LRU" {
 				// 3 获取优先级最低的时间窗口
-				if len(global.TimeWindowConsumerQueue) > 2 {
+				if len(entity.TimeWindowConsumerQueue) > 2 {
 					indexToRemove := getIndexToRemoveForLRU()
 					if indexToRemove != -1 {
 						deleteTimeWindow(indexToRemove)
 					}
 				}
 			} else {
-				log.GlobalLogger.Error("未知的缓存策略:", policy)
+				c_log.GlobalLogger.Error("未知的缓存策略:", policy)
 			}
 		}
 	}
 }
 
 func deleteTimeWindow(indexToRemove int) {
-	timeWindowToRemove := global.TimeWindowConsumerQueue[indexToRemove]
+	timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
 	// 删除文件
 	faultTime := timeWindowToRemove.FaultTime
-	dir := util.GetCopyDir(faultTime)
+	dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime)
 	err := util.RemoveDir(dir)
 	if err != nil {
-		log.GlobalLogger.Error("删除目录", dir, "失败:", err)
+		c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
 	}
-	global.TimeWindowConsumerQueueMutex.Lock()
+	entity.TimeWindowConsumerQueueMutex.Lock()
 	// 使用切片的特性删除指定位置的元素
-	global.TimeWindowConsumerQueue = append(global.TimeWindowConsumerQueue[:indexToRemove], global.TimeWindowConsumerQueue[indexToRemove+1:]...)
-	global.TimeWindowConsumerQueueMutex.Unlock()
+	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
+	entity.TimeWindowConsumerQueueMutex.Unlock()
 }
 
 func getIndexToRemoveForLRU() int {
 	lru := commonConfig.PlatformConfig.Lru
 	i := len(lru) - 1
 	for i >= 0 {
-		for i2, window := range global.TimeWindowConsumerQueue {
+		for i2, window := range entity.TimeWindowConsumerQueue {
 			for _, label := range window.Labels {
 				if masterConfig.LabelMapTriggerId[label] == lru[i] {
 					return i2

+ 16 - 16
aarch64/kinglong/common/service/kill_self.go

@@ -1,9 +1,9 @@
-package svc
+package service
 
 import (
-	"cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/util"
+	"cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"net/rpc"
 	"os"
 	"sync"
@@ -34,7 +34,7 @@ type KillService struct{}
 
 // Kill 杀死自身程序,通过通道实现 方法必须满足RPC规范:函数有两个参数,第一个参数是请求,第二个是响应
 func (m *KillService) Kill(args *KillSignal, reply *int) error {
-	log.GlobalLogger.Info("接收到自杀信号:", *args)
+	c_log.GlobalLogger.Info("接收到自杀信号:", *args)
 	// 1 杀死 rosbag record 命令
 	ChannelKillRosRecord <- 1
 	// 2 杀死所有 ros 订阅者
@@ -54,13 +54,13 @@ func (m *KillService) Kill(args *KillSignal, reply *int) error {
 func WaitKillSelf() {
 	killService := new(KillService)
 	if err := rpc.Register(killService); err != nil {
-		log.GlobalLogger.Error("注册rpc服务失败:", err)
+		c_log.GlobalLogger.Error("注册rpc服务失败:", err)
 		return
 	}
 
 	// 等待并处理远程调用请求
 	for {
-		conn, err := cfg.KillSignalListener.Accept()
+		conn, err := config.KillSignalListener.Accept()
 		if err != nil {
 			continue
 		}
@@ -75,24 +75,24 @@ func AddKillTimes(info string) {
 	case "1":
 		close(ChannelKillRosRecord)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		ChannelKillDiskClean <- 1
 	case "2":
 		close(ChannelKillDiskClean)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 	case "3":
 		close(ChannelKillWindowProducer)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死时间窗口生产者,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死时间窗口生产者,当前自杀进度 %v / %v", KillTimes, KillChannel)
 	case "4":
 		close(ChannelKillMove)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 	case "5":
 		close(ChannelKillConsume)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 	}
 	MutexKill.Unlock()
 }
@@ -102,14 +102,14 @@ func killDone(restart bool) {
 		time.Sleep(time.Duration(1) * time.Second)
 		if KillChannel == KillTimes {
 			if restart {
-				_, err := util.ExecuteWithPath(cfg.LocalConfig.RestartCmd.Dir, cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args...)
+				_, err := util.ExecuteWithPath(config.LocalConfig.RestartCmd.Dir, config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args...)
 				if err != nil {
-					log.GlobalLogger.Info("启动新程序失败,【path】=", cfg.LocalConfig.RestartCmd.Dir, "【cmd】=", cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args, ":", err)
+					c_log.GlobalLogger.Info("启动新程序失败,【path】=", config.LocalConfig.RestartCmd.Dir, "【cmd】=", config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args, ":", err)
 					os.Exit(-1)
 				}
-				log.GlobalLogger.Info("数据采集任务更新,正常退出当前程序。")
+				c_log.GlobalLogger.Info("数据采集任务更新,正常退出当前程序。")
 			} else {
-				log.GlobalLogger.Info("数据采集任务终止,正常退出当前程序。")
+				c_log.GlobalLogger.Info("数据采集任务终止,正常退出当前程序。")
 			}
 			os.Exit(0)
 		}

+ 9 - 9
aarch64/kinglong/common/service/rosbag_clean.go

@@ -1,15 +1,15 @@
-package svc
+package service
 
 import (
-	"cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/util"
+	"cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"time"
 )
 
 // BagCacheClean 保证本地缓存的包数量不超过设定值
 func BagCacheClean() {
-	log.GlobalLogger.Info("启动清理缓存的 goroutine 维护目录【", cfg.CloudConfig.BagDataDir, "】的 bag 包数量:", cfg.CloudConfig.BagNumber)
+	c_log.GlobalLogger.Info("启动清理缓存的 goroutine 维护目录【", config.CloudConfig.BagDataDir, "】的 bag 包数量:", config.CloudConfig.BagNumber)
 	for {
 		// 收到自杀信号
 		select {
@@ -24,12 +24,12 @@ func BagCacheClean() {
 		// 1 ------- 每10秒清理一次 -------
 		time.Sleep(time.Duration(10) * time.Second)
 		// 2 ------- 获取目录下所有bag包 -------
-		bags := util.ListAbsolutePathWithSuffixAndSort(cfg.CloudConfig.BagDataDir, ".bag")
+		bags, _ := util.ListAbsolutePathWithSuffixAndSort(config.CloudConfig.BagDataDir, ".bag")
 		// 3 如果打包数量超过n个,删除最旧的包{
-		if len(bags) > cfg.CloudConfig.BagNumber {
-			diff := len(bags) - cfg.CloudConfig.BagNumber
+		if len(bags) > config.CloudConfig.BagNumber {
+			diff := len(bags) - config.CloudConfig.BagNumber
 			for i := 0; i < diff; i++ {
-				util.DeleteFile(bags[i])
+				_ = util.DeleteFile(bags[i])
 			}
 		}
 	}

+ 20 - 21
aarch64/kinglong/common/service/rosbag_record.go

@@ -1,10 +1,9 @@
-package svc
+package service
 
 import (
+	"cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	cfg2 "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
 	"github.com/bluenviron/goroslib/v2"
 	"os"
 	"time"
@@ -12,21 +11,21 @@ import (
 
 // BagRecord 打包rosbag
 func BagRecord(nodeName string) {
-	log.GlobalLogger.Info("rosbag record goroutine - 启动")
+	c_log.GlobalLogger.Info("rosbag record goroutine - 启动")
 	for {
-		log.GlobalLogger.Info("校验必需的 rosnode 是否全部启动。")
+		c_log.GlobalLogger.Info("校验必需的 rosnode 是否全部启动。")
 		canRecord := false
 		for !canRecord {
 			time.Sleep(time.Duration(2) * time.Second)
-			canRecord = isCanRecord(cfg2.RosNode)
+			canRecord = isCanRecord(config.RosNode)
 		}
-		log.GlobalLogger.Info("rosnode启动完成,正在启动record命令。")
+		c_log.GlobalLogger.Info("rosnode启动完成,正在启动record命令。")
 
 		var command []string
 		command = append(command, "record")
 		command = append(command, "--split")
 		command = append(command, "--duration=1")
-		for _, host := range cfg2.CloudConfig.Hosts {
+		for _, host := range config.CloudConfig.Hosts {
 			if host.Name == nodeName {
 				for _, topic := range host.Topics {
 					command = append(command, topic)
@@ -37,13 +36,13 @@ func BagRecord(nodeName string) {
 		// 2 ------- 调用 rosbag 打包命令,该命令自动阻塞 -------
 		// 不在此处压缩,因为 rosbag filter 时会报错。在上传到oss之前压缩即可。
 		// 包名格式:2023-11-15-17-35-20_0.bag
-		cutil.CreateParentDir(cfg2.CloudConfig.BagDataDir)
-		cmd, err := util.ExecuteWithEnvAndDirAsync(os.Environ(), cfg2.CloudConfig.BagDataDir, "rosbag", command...)
+		_ = util.CreateParentDir(config.CloudConfig.BagDataDir)
+		cmd, err := util.ExecuteWithEnvAndDirAsync(os.Environ(), config.CloudConfig.BagDataDir, "rosbag", command...)
 		if err != nil {
-			log.GlobalLogger.Error("执行record命令", command, "出错:", err)
+			c_log.GlobalLogger.Error("执行record命令", command, "出错:", err)
 			continue
 		}
-		log.GlobalLogger.Info("启动record命令成功。")
+		c_log.GlobalLogger.Info("启动record命令成功。")
 
 		recordProcessPid := cmd.Process.Pid
 		var recordSubProcessPid int
@@ -51,25 +50,25 @@ func BagRecord(nodeName string) {
 			time.Sleep(time.Duration(2) * time.Second)
 			recordSubProcessPid, err = util.GetSubProcessPid(recordProcessPid)
 			if err != nil {
-				log.GlobalLogger.Info("正在等待获取进程 ", recordProcessPid, " 的子进程的pid。")
+				c_log.GlobalLogger.Info("正在等待获取进程 ", recordProcessPid, " 的子进程的pid。")
 				continue
 			}
 			if recordSubProcessPid != 0 {
-				log.GlobalLogger.Info("获取进程 ", recordProcessPid, " 的子进程的pid:", recordSubProcessPid)
+				c_log.GlobalLogger.Info("获取进程 ", recordProcessPid, " 的子进程的pid:", recordSubProcessPid)
 				break
 			}
 		}
 		// 等待自杀信号
-		log.GlobalLogger.Info("rosbag record goroutine - 等待自杀信号")
+		c_log.GlobalLogger.Info("rosbag record goroutine - 等待自杀信号")
 		select {
 		case signal := <-ChannelKillRosRecord:
 			if signal == 1 {
 				if err = util.KillProcessByPid(recordSubProcessPid); err != nil {
-					log.GlobalLogger.Errorf("程序阻塞,杀死record命令子进程出错,【pid】=%v,【err】=%v。", recordSubProcessPid, err)
+					c_log.GlobalLogger.Errorf("程序阻塞,杀死record命令子进程出错,【pid】=%v,【err】=%v。", recordSubProcessPid, err)
 					select {} // 此处阻塞防止record命令一直录包占满存储
 				}
 				if err = cmd.Process.Kill(); err != nil {
-					log.GlobalLogger.Error("程序阻塞,杀死record命令父进程", recordProcessPid, "出错:", err)
+					c_log.GlobalLogger.Error("程序阻塞,杀死record命令父进程", recordProcessPid, "出错:", err)
 					select {} // 此处阻塞防止record命令一直录包占满存储
 				}
 				AddKillTimes("1")
@@ -99,14 +98,14 @@ func isCanRecord(n *goroslib.Node) bool {
 	time.Sleep(time.Duration(1) * time.Second)
 	nodes, err := n.MasterGetNodes()
 	if err != nil {
-		log.GlobalLogger.Error("获取rosnode出错:", err)
+		c_log.GlobalLogger.Error("获取rosnode出错:", err)
 		return false
 	}
 	myMap := nodes
-	mySlice := cfg2.CloudConfig.Ros.Nodes
+	mySlice := config.CloudConfig.Ros.Nodes
 	for _, element := range mySlice {
 		if _, ok := myMap[element]; !ok {
-			log.GlobalLogger.Info("rosnode:", element, " 未启动,需等待启动后才可启动record。")
+			c_log.GlobalLogger.Info("rosnode:", element, " 未启动,需等待启动后才可启动record。")
 			return false
 		}
 	}

+ 32 - 31
aarch64/kinglong/common/service/rosbag_upload.go

@@ -1,12 +1,13 @@
-package svc
+package service
 
 import (
+	commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
+	masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	commonUtil "cicv-data-closedloop/common/util"
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/global"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/util"
-	masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
 	"fmt"
 	"os"
 	"strings"
@@ -14,7 +15,7 @@ import (
 )
 
 func RunTimeWindowConsumerQueue(nodeName string) {
-	log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
+	c_log.GlobalLogger.Info("处理消费者队列goroutine - 启动")
 outLoop:
 	for { // 串行处理
 		// 收到自杀信号
@@ -22,7 +23,7 @@ outLoop:
 		case signal := <-ChannelKillConsume:
 			if signal == 1 {
 				ChannelKillConsume <- 1
-				if len(global.TimeWindowConsumerQueue) == 0 {
+				if len(entity.TimeWindowConsumerQueue) == 0 {
 					AddKillTimes("5")
 					return
 				}
@@ -34,19 +35,19 @@ outLoop:
 		}
 		// 每一秒扫一次
 		time.Sleep(time.Duration(1) * time.Second)
-		waitLength := len(global.TimeWindowConsumerQueue)
+		waitLength := len(entity.TimeWindowConsumerQueue)
 		if waitLength == 0 {
 			continue outLoop
 		}
-		log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
+		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
 		// 1 获取即将处理的窗口
-		currentTimeWindow := global.TimeWindowConsumerQueue[0]
-		util.RemoveHeaOfdTimeWindowConsumerQueue()
-		log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
+		currentTimeWindow := entity.TimeWindowConsumerQueue[0]
+		entity.RemoveHeadOfTimeWindowConsumerQueue()
+		c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 
 		// 2 获取目录
-		dir := util.GetCopyDir(currentTimeWindow.FaultTime)
-		bags := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
+		dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
+		bags, _ := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
 		bagNumber := len(bags)
 		if bagNumber > currentTimeWindow.Length {
 			bagNumber = currentTimeWindow.Length
@@ -70,56 +71,56 @@ outLoop:
 				newName := bag + "_filter"
 				filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
 				_, output, err := commonUtil.ExecuteWithEnvSync(os.Environ(), "rosbag", filterCommand...)
-				log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
+				c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
 				if err != nil {
-					log.GlobalLogger.Errorf("filter 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
+					c_log.GlobalLogger.Errorf("filter 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
 					continue
 				}
 				// 删除旧文件
 				util.DeleteFile(oldName)
 				// 将新文件改回旧文件名
 				if err = os.Rename(newName, oldName); err != nil {
-					log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
+					c_log.GlobalLogger.Info("修改文件名", oldName, "失败,放弃当前时间窗口", currentTimeWindow.FaultTime, ",错误为:", err)
 					continue outLoop
 				}
 			}
 		}
 
 		// 4 compress包,必须顺序执行,此时每个包会对应生成一个压缩过的包和原始包,原始包后缀为.orig.bag
-		log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
+		c_log.GlobalLogger.Info("压缩bag数据包,故障时间为:", currentTimeWindow.FaultTime)
 		for i, bag := range bags {
 			oldName := bag
 			compressCommand := []string{"compress", "--bz2", oldName}
-			log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
+			c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
 			if _, output, err := commonUtil.ExecuteWithEnvSync(os.Environ(), "rosbag", compressCommand...); err != nil {
-				log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
+				c_log.GlobalLogger.Errorf("compress 命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
 				continue
 			}
 		}
 		// 5 upload,必须顺序执行
-		log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
+		c_log.GlobalLogger.Info("发送bag数据包,故障时间为:", currentTimeWindow.FaultTime)
 		start := time.Now()
 		objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + nodeName + "_" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
 		objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		for i, bag := range bags {
 			bagSlice := strings.Split(bag, "/")
-			log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
+			c_log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
 			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
 			if err != nil {
-				log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
+				c_log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
 				continue
 			}
 		}
-		log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
+		c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
 		// 在上传完成的包目录同级下添加一个目录同名的json
 		triggerIds := make([]string, 0)
 		for _, label := range currentTimeWindow.Labels {
 			triggerIdToAppend := masterConfig.LabelMapTriggerId[label]
-			log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
+			c_log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
 			triggerIds = append(triggerIds, triggerIdToAppend)
 		}
-		log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds)
+		c_log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds)
 		callBackMap := map[string]interface{}{
 			"dataName":    currentTimeWindow.FaultTime,
 			"dataSize":    "", // 由合并程序补充
@@ -131,17 +132,17 @@ outLoop:
 			"triggerId":   triggerIds,
 		}
 		callBackJson, err := util.MapToJsonString(callBackMap)
-		log.GlobalLogger.Info("【callBackJson】=", callBackJson)
+		c_log.GlobalLogger.Info("【callBackJson】=", callBackJson)
 		if err != nil {
-			log.GlobalLogger.Error("callBackMap", callBackMap, "转 json 失败:", err)
+			c_log.GlobalLogger.Error("callBackMap", callBackMap, "转 json 失败:", err)
 		}
 		err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
 		if err != nil {
-			log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
+			c_log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
 		}
 
 		// 删除本地所有已上传的bag文件
-		log.GlobalLogger.Infof("结束处理窗口,【Label】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
+		c_log.GlobalLogger.Infof("结束处理窗口,【Label】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		if err = util.RemoveDir(dir); err != nil {
 			continue outLoop
 		}

+ 25 - 25
aarch64/kinglong/control/main/control.go

@@ -1,10 +1,10 @@
 package main
 
 import (
+	"cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/aarch64/kinglong/common/service"
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/log"
-	commonService "cicv-data-closedloop/kinglong/common/svc"
 	"net/rpc"
 	"os"
 	"runtime"
@@ -14,13 +14,13 @@ import (
 func init() {
 	runtime.GOMAXPROCS(1)
 	// 初始化日志配置
-	log.InitLogConfig("kinglong-control")
+	c_log.InitLog("mnt/media/sda1/cicv-data-closedloop/log", "kinglong-control")
 	// 初始化本地配置文件(第1处配置,在本地文件)
-	cfg.InitLocalConfig()
+	config.InitLocalConfig()
 	// 初始化Oss连接信息
-	cfg.InitOssConfig()
+	config.InitOssConfig()
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
-	cfg.InitCloudConfig()
+	config.InitCloudConfig()
 }
 
 func main() {
@@ -29,9 +29,9 @@ func main() {
 	for {
 		time.Sleep(time.Duration(2) * time.Second)
 		// 1 获取当前设备的任务的 status
-		status, err := cfg.GetStatus(cfg.PlatformConfig.TaskConfigId)
+		status, err := config.GetStatus(config.PlatformConfig.TaskConfigId)
 		if err != nil {
-			log.GlobalLogger.Error("获取配置status失败:", err)
+			c_log.GlobalLogger.Error("获取配置status失败:", err)
 			continue
 		}
 		// 2 判断 status
@@ -47,46 +47,46 @@ func main() {
 			}
 			// 3 发送rpc信号杀死两个服务,并重启程序
 			if lastStatus == "NONE" && status == "CHANGE" {
-				if _, err := util.ExecuteWithPath(cfg.LocalConfig.RestartCmd.Dir, cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args...); err != nil {
-					log.GlobalLogger.Info("启动新程序失败,【path】=", cfg.LocalConfig.RestartCmd.Dir, "【cmd】=", cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args, ":", err)
+				if _, err := util.ExecuteWithPath(config.LocalConfig.RestartCmd.Dir, config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args...); err != nil {
+					c_log.GlobalLogger.Info("启动新程序失败,【path】=", config.LocalConfig.RestartCmd.Dir, "【cmd】=", config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args, ":", err)
 					os.Exit(-1)
 				}
-				log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", cfg.LocalConfig.RestartCmd.Dir, "【cmd】=", cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args)
+				c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", config.LocalConfig.RestartCmd.Dir, "【cmd】=", config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args)
 				lastStatus = status
-				log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-				cfg.InitPlatformConfig()
+				c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
+				config.InitPlatformConfig()
 				continue
 			}
-			var killArgs *commonService.KillSignal
+			var killArgs *service.KillSignal
 			if lastStatus == "UN_CHANGE" && status == "CHANGE" {
-				killArgs = &commonService.KillSignal{NodeName: cfg.LocalConfig.Node.Name, DropUploadData: cfg.PlatformConfig.DropUploadData, Restart: true}
-				log.GlobalLogger.Info("更新任务,发送rpc重启信号到本地"+cfg.LocalConfig.Node.Name+":", killArgs)
+				killArgs = &service.KillSignal{NodeName: config.LocalConfig.Node.Name, DropUploadData: config.PlatformConfig.DropUploadData, Restart: true}
+				c_log.GlobalLogger.Info("更新任务,发送rpc重启信号到本地"+config.LocalConfig.Node.Name+":", killArgs)
 			}
 			if lastStatus == "UN_CHANGE" && status == "NONE" {
-				killArgs = &commonService.KillSignal{NodeName: cfg.LocalConfig.Node.Name, DropUploadData: cfg.PlatformConfig.DropUploadData, Restart: false}
-				log.GlobalLogger.Info("杀死任务,发送rpc结束信号到本地"+cfg.LocalConfig.Node.Name+":", killArgs)
+				killArgs = &service.KillSignal{NodeName: config.LocalConfig.Node.Name, DropUploadData: config.PlatformConfig.DropUploadData, Restart: false}
+				c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号到本地"+config.LocalConfig.Node.Name+":", killArgs)
 			}
 
-			KillRpcClient, err := rpc.Dial("tcp", cfg.LocalConfig.Node.Ip+":"+cfg.CloudConfig.RpcPort)
+			KillRpcClient, err := rpc.Dial("tcp", config.LocalConfig.Node.Ip+":"+config.CloudConfig.RpcPort)
 			if err != nil {
-				log.GlobalLogger.Error("创建rpc客户端连接master失败:", err)
+				c_log.GlobalLogger.Error("创建rpc客户端连接master失败:", err)
 				// 此处关闭client会报错
 				continue
 			}
 
 			reply := 0
 			if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
-				log.GlobalLogger.Error("发送rpc请求到master失败:", err)
+				c_log.GlobalLogger.Error("发送rpc请求到master失败:", err)
 				//TODO 这里可能会报错 unexpected EOF 但是不影响,先注释 close 和 continue
 				//KillRpcClient.Close()
 				//continue
 			}
 			lastStatus = status
-			log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-			cfg.InitPlatformConfig()
+			c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
+			config.InitPlatformConfig()
 			KillRpcClient.Close()
 		} else {
-			log.GlobalLogger.Error("未知的采集任务状态。【status】=", status)
+			c_log.GlobalLogger.Error("未知的采集任务状态。【status】=", status)
 		}
 	}
 }

+ 8 - 7
aarch64/kinglong/master/main/master.go

@@ -1,18 +1,19 @@
 package main
 
 import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	commonInit "cicv-data-closedloop/kinglong/common/init"
-	"cicv-data-closedloop/kinglong/common/log"
-	commonService "cicv-data-closedloop/kinglong/common/svc"
-	masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
-	masterService "cicv-data-closedloop/kinglong/master/pkg/svc"
+	commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
+	commonInit "cicv-data-closedloop/aarch64/kinglong/common/init"
+	commonService "cicv-data-closedloop/aarch64/kinglong/common/service"
+	masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
+	masterService "cicv-data-closedloop/aarch64/kinglong/master/package/service"
+	"cicv-data-closedloop/common/config/c_log"
 )
 
 func init() {
 	// 初始化日志配置
 	//runtime.GOMAXPROCS(1)
-	log.InitLogConfig("kinglong-master")
+	// 初始化日志配置
+	c_log.InitLog("mnt/media/sda1/cicv-data-closedloop/log", "kinglong-master")
 	commonInit.Init()
 	// 初始化加载触发器插件文件
 	masterConfig.InitTriggerConfig()

+ 24 - 24
aarch64/kinglong/master/package/config/master_trigger_cfg.go

@@ -1,9 +1,9 @@
-package cfg
+package config
 
 import (
-	"cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
+	"cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"cicv-data-closedloop/kinglong_msgs"
 	"plugin"
 	"strconv"
@@ -25,92 +25,92 @@ var (
 )
 
 func InitTriggerConfig() {
-	log.GlobalLogger.Info("主节点加载触发器插件 - 开始。")
+	c_log.GlobalLogger.Info("主节点加载触发器插件 - 开始。")
 	// 下载所有触发器的文件
-	for _, trigger := range cfg.PlatformConfig.TaskTriggers {
+	for _, trigger := range config.PlatformConfig.TaskTriggers {
 		// 获取文件名
 		pathSplit := strings.Split(trigger.TriggerScriptPath, "/")
 		fileName := pathSplit[len(pathSplit)-1]
 		// 下载
-		triggerLocalPath := cfg.CloudConfig.TriggersDir + fileName
-		cutil.CreateParentDir(triggerLocalPath)
-		log.GlobalLogger.Info("下载触发器插件从 ", trigger.TriggerScriptPath, " 到 ", triggerLocalPath)
-		err := cfg.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath)
+		triggerLocalPath := config.CloudConfig.TriggersDir + fileName
+		util.CreateParentDir(triggerLocalPath)
+		c_log.GlobalLogger.Info("下载触发器插件从 ", trigger.TriggerScriptPath, " 到 ", triggerLocalPath)
+		err := config.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath)
 		if err != nil {
-			log.GlobalLogger.Error("下载 oss 上的触发器插件失败:", err)
+			c_log.GlobalLogger.Error("下载 oss 上的触发器插件失败:", err)
 			continue
 		}
 		// 载入插件到数组
 		open, err := plugin.Open(triggerLocalPath)
 		if err != nil {
-			log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "失败。", err)
+			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "失败。", err)
 			continue
 		}
 		topic0, err := open.Lookup("Topic")
 		if err != nil {
-			log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Topic方法失败。", err)
+			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Topic方法失败。", err)
 			continue
 		}
 		topic1, ok := topic0.(func() string)
 		if ok != true {
-			log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func() string):", err)
+			c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func() string):", err)
 			continue
 		}
 		topic2 := topic1()
 		rule, err := open.Lookup("Rule")
 		if err != nil {
-			log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Rule方法失败。", err)
+			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Rule方法失败。", err)
 			continue
 		}
 		if TopicOfNodeFaultInfo == topic2 {
 			f, ok := rule.(func(data *kinglong_msgs.FaultInfo) string)
 			if ok != true {
-				log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.FaultInfo) string):", err)
+				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.FaultInfo) string):", err)
 				continue
 			}
 			RuleOfNodefaultInfo = append(RuleOfNodefaultInfo, f)
 		} else if TopicOfCicvLocation == topic2 {
 			f, ok := rule.(func(data *kinglong_msgs.PerceptionLocalization) string)
 			if ok != true {
-				log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.PerceptionLocalization) string):", err)
+				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.PerceptionLocalization) string):", err)
 				continue
 			}
 			RuleOfCicvLocation = append(RuleOfCicvLocation, f)
 		} else if TopicOfTpperception == topic2 {
 			f, ok := rule.(func(data *kinglong_msgs.PerceptionObjects, velocityX float64, velocityY float64, yaw float64) string)
 			if ok != true {
-				log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.PerceptionObjects, velocityX float64, velocityY float64, yaw float64) string):", err)
+				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.PerceptionObjects, velocityX float64, velocityY float64, yaw float64) string):", err)
 				continue
 			}
 			RuleOfTpperception = append(RuleOfTpperception, f)
 		} else if TopicOfFaultInfo == topic2 {
 			f, ok := rule.(func(data *kinglong_msgs.FaultVec) string)
 			if ok != true {
-				log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.FaultVec) string):", err)
+				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.FaultVec) string):", err)
 				continue
 			}
 			RuleOfFaultInfo = append(RuleOfFaultInfo, f)
 		} else if TopicOfDataRead == topic2 {
 			f, ok := rule.(func(data *kinglong_msgs.Retrieval) string)
 			if ok != true {
-				log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.Retrieval) string):", err)
+				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *kinglong_msgs.Retrieval) string):", err)
 				continue
 			}
 			RuleOfDataRead = append(RuleOfDataRead, f)
 		} else {
-			log.GlobalLogger.Error("未知的topic:", topic2)
+			c_log.GlobalLogger.Error("未知的topic:", topic2)
 			continue
 		}
 
 		label, err := open.Lookup("Label")
 		if err != nil {
-			log.GlobalLogger.Error("加载本地插件 ", triggerLocalPath, " 中的 Label 方法失败。", err)
+			c_log.GlobalLogger.Error("加载本地插件 ", triggerLocalPath, " 中的 Label 方法失败。", err)
 			continue
 		}
 		labelFunc := label.(func() string)
 		labelString := labelFunc()
 		LabelMapTriggerId[labelString] = strconv.Itoa(trigger.TriggerId)
-		log.GlobalLogger.Info("主节点加载触发器插件:【ros topic】=", topic2, ",【触发器label】=", labelString, "【触发器ID】=", trigger.TriggerId, "【label和id映射关系】=", LabelMapTriggerId)
+		c_log.GlobalLogger.Info("主节点加载触发器插件:【ros topic】=", topic2, ",【触发器label】=", labelString, "【触发器ID】=", trigger.TriggerId, "【label和id映射关系】=", LabelMapTriggerId)
 	}
-	log.GlobalLogger.Info("主节点加载触发器插件 - 成功。")
+	c_log.GlobalLogger.Info("主节点加载触发器插件 - 成功。")
 }

+ 30 - 42
aarch64/kinglong/master/package/service/move_bag_and_send_window.go

@@ -1,27 +1,26 @@
-package svc
+package service
 
 import (
-	commonUtil "cicv-data-closedloop/common/util"
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/ent"
-	"cicv-data-closedloop/kinglong/common/global"
-	"cicv-data-closedloop/kinglong/common/log"
-	commonService "cicv-data-closedloop/kinglong/common/svc"
-	"cicv-data-closedloop/kinglong/common/util"
+	commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
+	commonService "cicv-data-closedloop/aarch64/kinglong/common/service"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	"net"
 	"time"
 )
 
 // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
 func RunTimeWindowProducerQueue() {
-	log.GlobalLogger.Info("生产者队列goroutine - 启动")
+	c_log.GlobalLogger.Info("生产者队列goroutine - 启动")
 	for {
 		// 收到自杀信号
 		select {
 		case signal := <-commonService.ChannelKillMove:
 			if signal == 1 {
 				commonService.ChannelKillMove <- 1
-				if len(global.TimeWindowProducerQueue) == 0 {
+				if len(entity.TimeWindowProducerQueue) == 0 {
 					commonService.AddKillTimes("4")
 					return
 				}
@@ -33,9 +32,9 @@ func RunTimeWindowProducerQueue() {
 		}
 
 		time.Sleep(time.Duration(1) * time.Second)
-		if len(global.TimeWindowProducerQueue) > 0 {
-			bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
-			currentTimeWindow := global.TimeWindowProducerQueue[0]
+		if len(entity.TimeWindowProducerQueue) > 0 {
+			bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
+			currentTimeWindow := entity.TimeWindowProducerQueue[0]
 			move := false
 			bigger := false
 			for _, bag := range bags {
@@ -45,7 +44,7 @@ func RunTimeWindowProducerQueue() {
 				compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
 				if compare1 && compare2 {
 					// 将bag包移动到Copy目录
-					util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
+					domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
 					move = true
 				} else {
 					if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
@@ -60,25 +59,25 @@ func RunTimeWindowProducerQueue() {
 				// 1 如果第一个已经大于了timeWindowEnd,则触发上传并删除
 				// 将时间窗口发送给从节点
 				currentTimeWindow.CanUpload = "yes"
-				log.GlobalLogger.Info("将已完成的窗口发送给从节点:", currentTimeWindow.CanUpload)
-				util.SupplyCopyBags(currentTimeWindow)
-				util.RefreshTcpSendTime()
+				c_log.GlobalLogger.Info("将已完成的窗口发送给从节点:", currentTimeWindow.CanUpload)
+				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
+				entity.RefreshTcpSendTime()
 				go sendTimeWindowByTcp(currentTimeWindow)
 				// 将时间窗口移出准备队列
-				util.RemoveHeadOfdTimeWindowProducerQueue()
+				entity.RemoveHeadOfTimeWindowProducerQueue()
 				// 将时间窗口加入运行队列
-				util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
+				entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
 				// 获取copy目录下的字典json,key为触发时间,value为label
-				timeToLabelJson, _ := commonUtil.ReadFile(commonConfig.CloudConfig.TimeToLabelJsonPath)
-				timeToLabelMap, _ := commonUtil.JsonStringToMap(timeToLabelJson)
-				timeToLabelMap[currentTimeWindow.FaultTime] = commonUtil.ToString(currentTimeWindow.Labels)
-				timeToLabelJson, _ = commonUtil.MapToJsonString(timeToLabelMap)
-				_ = commonUtil.WriteFile(timeToLabelJson, commonConfig.CloudConfig.TimeToLabelJsonPath)
+				timeToLabelJson, _ := util.ReadFile(commonConfig.CloudConfig.TimeToLabelJsonPath)
+				timeToLabelMap, _ := util.JsonStringToMap(timeToLabelJson)
+				timeToLabelMap[currentTimeWindow.FaultTime] = util.ToString(currentTimeWindow.Labels)
+				timeToLabelJson, _ = util.MapToJsonString(timeToLabelMap)
+				_ = util.WriteFile(timeToLabelJson, commonConfig.CloudConfig.TimeToLabelJsonPath)
 				continue
 			} else { // 保证当前窗口只发送一次,每间隔5秒发一次
-				if int(time.Since(global.TcpSendTime).Seconds()) > commonConfig.CloudConfig.TimeWindowSendGap {
-					log.GlobalLogger.Info("每隔", commonConfig.CloudConfig.TimeWindowSendGap, "秒发送一次tcp消息")
-					util.RefreshTcpSendTime()
+				if int(time.Since(entity.TcpSendTime).Seconds()) > commonConfig.CloudConfig.TimeWindowSendGap {
+					c_log.GlobalLogger.Info("每隔", commonConfig.CloudConfig.TimeWindowSendGap, "秒发送一次tcp消息")
+					entity.RefreshTcpSendTime()
 					// 2 如果第一个不大于timeWindowEnd,则发送不可上传的窗口信息。
 					currentTimeWindow.CanUpload = "no"
 					go sendTimeWindowByTcp(currentTimeWindow)
@@ -88,30 +87,19 @@ func RunTimeWindowProducerQueue() {
 	}
 }
 
-//TODO 服务端接受连接时如何维护该链接
-//func sendTimeWindowByTcp(timeWindow ent.TimeWindow) {
-//	// 发送数据
-//	send := util2.TimeWindowToJson(timeWindow)
-//	_, err := masterCfg.TcpConnection.Write([]byte(send))
-//	if err != nil {
-//		log.GlobalLogger.Error("master发送给slave时间窗口", timeWindow, "失败:", err)
-//		return
-//	}
-//}
-
-func sendTimeWindowByTcp(timeWindow ent.TimeWindow) {
+func sendTimeWindowByTcp(timeWindow entity.TimeWindow) {
 	socket := commonConfig.CloudConfig.Hosts[1].Ip + ":" + commonConfig.CloudConfig.TcpPort
 	tcpConn, err := net.Dial("tcp", socket)
 	if err != nil {
-		log.GlobalLogger.Error("建立tcp连接", socket, "失败:", err)
+		c_log.GlobalLogger.Error("建立tcp连接", socket, "失败:", err)
 		return
 	}
 	defer tcpConn.Close()
 	// 发送数据
-	send := util.TimeWindowToJson(timeWindow)
+	send, _ := entity.TimeWindowToJson(timeWindow)
 	_, err = tcpConn.Write([]byte(send))
 	if err != nil {
-		log.GlobalLogger.Error("master发送给slave时间窗口", timeWindow, "失败:", err)
+		c_log.GlobalLogger.Error("master发送给slave时间窗口", timeWindow, "失败:", err)
 		return
 	}
 }

+ 62 - 64
aarch64/kinglong/master/package/service/produce_window.go

@@ -1,14 +1,12 @@
-package svc
+package service
 
 import (
+	commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/aarch64/kinglong/common/service"
+	masterConfig "cicv-data-closedloop/aarch64/kinglong/master/package/config"
 	"cicv-data-closedloop/common/config/c_log"
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/ent"
-	"cicv-data-closedloop/kinglong/common/global"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/svc"
-	"cicv-data-closedloop/kinglong/common/util"
-	masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	"cicv-data-closedloop/kinglong_msgs"
 	"github.com/bluenviron/goroslib/v2"
 	"os"
@@ -30,42 +28,42 @@ var (
 
 // PrepareTimeWindowProducerQueue 负责监听所有主题并修改时间窗口
 func PrepareTimeWindowProducerQueue() {
-	log.GlobalLogger.Info("订阅者 goroutine,启动。")
+	c_log.GlobalLogger.Info("订阅者 goroutine,启动。")
 	//创建订阅者0订阅主题 nodefault_info
-	log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
+	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfNodeFaultInfo)
 	subscriber0, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfNodeFaultInfo,
 		Callback: func(data *kinglong_msgs.FaultInfo) {
 			if len(masterConfig.RuleOfNodefaultInfo) == 0 {
-				//log.GlobalLogger.Info("话题 nodefault_info没有触发器")
+				//c_log.GlobalLogger.Info("话题 nodefault_info没有触发器")
 				return
 			}
-			global.Subscriber0TimeMutex.Lock()
-			if time.Since(global.Subscriber0Time).Seconds() > 1 {
-				global.Subscriber0TimeMutex.Unlock()
+			entity.Subscriber0TimeMutex.Lock()
+			if time.Since(entity.Subscriber0Time).Seconds() > 1 {
+				entity.Subscriber0TimeMutex.Unlock()
 				subscriber0Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
+				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
 				var faultLabel string
 				for _, f := range masterConfig.RuleOfNodefaultInfo {
 					faultLabel = f(data)
 					if faultLabel != "" {
 						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						global.Subscriber0Time = time.Now()
+						entity.Subscriber0Time = time.Now()
 						break
 					}
 				}
 				subscriber0Mutex.Unlock()
 			}
-			global.Subscriber0TimeMutex.Unlock()
+			entity.Subscriber0TimeMutex.Unlock()
 		}})
 	if err != nil {
-		log.GlobalLogger.Info("创建订阅者发生故障:", err)
+		c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
 		os.Exit(-1)
 	}
 	// 创建订阅者1订阅主题 cicv_location
-	log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
+	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfCicvLocation)
 	subscriber1, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfCicvLocation,
@@ -77,138 +75,138 @@ func PrepareTimeWindowProducerQueue() {
 			m.RUnlock()
 
 			if len(masterConfig.RuleOfCicvLocation) == 0 {
-				log.GlobalLogger.Info("话题 cicv_location 没有触发器")
+				c_log.GlobalLogger.Info("话题 cicv_location 没有触发器")
 				return
 			}
-			global.Subscriber1TimeMutex.Lock()
-			if time.Since(global.Subscriber1Time).Seconds() > 1 {
+			entity.Subscriber1TimeMutex.Lock()
+			if time.Since(entity.Subscriber1Time).Seconds() > 1 {
 				subscriber1Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
+				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
 				// 更新共享变量
 				var faultLabel string
 				for _, f := range masterConfig.RuleOfCicvLocation {
 					faultLabel = f(data)
 					if faultLabel != "" {
 						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						global.Subscriber1Time = time.Now()
+						entity.Subscriber1Time = time.Now()
 						break
 					}
 				}
 				subscriber1Mutex.Unlock()
 			}
-			global.Subscriber1TimeMutex.Unlock()
+			entity.Subscriber1TimeMutex.Unlock()
 		},
 	})
 	if err != nil {
-		log.GlobalLogger.Info("创建订阅者发生故障:", err)
+		c_log.GlobalLogger.Info("创建订阅者发生故障:", err)
 		os.Exit(-1)
 	}
-	log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
+	c_log.GlobalLogger.Info("创建订阅者订阅话题 ", masterConfig.TopicOfTpperception)
 	subscriber2, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfTpperception,
 		Callback: func(data *kinglong_msgs.PerceptionObjects) {
 			if len(masterConfig.RuleOfTpperception) == 0 {
-				log.GlobalLogger.Info("话题 tpperception 没有触发器")
+				c_log.GlobalLogger.Info("话题 tpperception 没有触发器")
 				return
 			}
-			global.Subscriber2TimeMutex.Lock()
+			entity.Subscriber2TimeMutex.Lock()
 			// 判断是否是连续故障码
-			if time.Since(global.Subscriber2Time).Seconds() > 1 {
+			if time.Since(entity.Subscriber2Time).Seconds() > 1 {
 				// 2 不是连续故障码
 				subscriber2Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
+				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
 				var faultLabel string
-				//log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
+				//c_log.GlobalLogger.Infof("TTC数据为:【velocityX】=%v,【velocityY】=%v", velocityX, velocityY)
 				for _, f := range masterConfig.RuleOfTpperception {
 					faultLabel = f(data, velocityX, velocityY, yaw)
 					if faultLabel != "" {
 						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						global.Subscriber2Time = time.Now()
+						entity.Subscriber2Time = time.Now()
 						break
 					}
 				}
 				subscriber2Mutex.Unlock()
 			}
-			global.Subscriber2TimeMutex.Unlock()
+			entity.Subscriber2TimeMutex.Unlock()
 		}})
 	if err != nil {
 		c_log.GlobalLogger.Info("创建订阅者2发生故障:", err)
 		os.Exit(-1)
 	}
 	// 创建订阅者3订阅主题 fault_info
-	log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo)
+	c_log.GlobalLogger.Info("创建订阅者3订阅话题 ", masterConfig.TopicOfFaultInfo)
 	subscriber3, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfFaultInfo,
 		Callback: func(data *kinglong_msgs.FaultVec) {
 			if len(masterConfig.RuleOfFaultInfo) == 0 {
-				log.GlobalLogger.Info("话题 fault_info 没有触发器")
+				c_log.GlobalLogger.Info("话题 fault_info 没有触发器")
 				return
 			}
-			global.Subscriber3TimeMutex.Lock()
-			if time.Since(global.Subscriber3Time).Seconds() > 1 {
+			entity.Subscriber3TimeMutex.Lock()
+			if time.Since(entity.Subscriber3Time).Seconds() > 1 {
 				// 2 不是连续故障码
 				subscriber3Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
+				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
 				var faultLabel string
 				for _, f := range masterConfig.RuleOfFaultInfo {
 					faultLabel = f(data)
 					if faultLabel != "" {
 						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						global.Subscriber3Time = time.Now()
+						entity.Subscriber3Time = time.Now()
 						break
 					}
 				}
 				subscriber3Mutex.Unlock()
 			}
-			global.Subscriber3TimeMutex.Unlock()
+			entity.Subscriber3TimeMutex.Unlock()
 		}})
 	if err != nil {
-		log.GlobalLogger.Info("创建订阅者3发生故障:", err)
+		c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
 		os.Exit(-1)
 	}
 	// 创建订阅者4订阅主题 data_read
 	// TODO 高频率触发
-	log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
+	c_log.GlobalLogger.Info("创建订阅者4订阅话题 ", masterConfig.TopicOfDataRead)
 	subscriber4, err := goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: masterConfig.TopicOfDataRead,
 		Callback: func(data *kinglong_msgs.Retrieval) {
 			if len(masterConfig.RuleOfDataRead) == 0 {
-				//log.GlobalLogger.Info("话题 data_read 没有触发器")
+				//c_log.GlobalLogger.Info("话题 data_read 没有触发器")
 				return
 			}
-			global.Subscriber4TimeMutex.Lock()
-			if time.Since(global.Subscriber4Time).Seconds() > 1 {
+			entity.Subscriber4TimeMutex.Lock()
+			if time.Since(entity.Subscriber4Time).Seconds() > 1 {
 				subscriber4Mutex.Lock()
-				faultHappenTime := util.GetNowTimeCustom() // 获取当前故障发生时间
-				lastTimeWindow := util.GetLastTimeWindow() // 获取最后一个时间窗口
+				faultHappenTime := util.GetNowTimeCustom()   // 获取当前故障发生时间
+				lastTimeWindow := entity.GetLastTimeWindow() // 获取最后一个时间窗口
 				var faultLabel string
 				for _, f := range masterConfig.RuleOfDataRead {
 					faultLabel = f(data)
 					if faultLabel != "" {
 						saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
-						global.Subscriber4Time = time.Now()
+						entity.Subscriber4Time = time.Now()
 						break
 					}
 				}
 				subscriber4Mutex.Unlock()
 			}
-			global.Subscriber4TimeMutex.Unlock()
+			entity.Subscriber4TimeMutex.Unlock()
 		},
 	})
 	if err != nil {
-		log.GlobalLogger.Info("创建订阅者3发生故障:", err)
+		c_log.GlobalLogger.Info("创建订阅者3发生故障:", err)
 		os.Exit(-1)
 	}
 	select {
-	case signal := <-svc.ChannelKillWindowProducer:
+	case signal := <-service.ChannelKillWindowProducer:
 		if signal == 1 {
-			defer svc.AddKillTimes("3")
+			defer service.AddKillTimes("3")
 			subscriber0.Close()
 			subscriber1.Close()
 			subscriber2.Close()
@@ -220,11 +218,11 @@ func PrepareTimeWindowProducerQueue() {
 	}
 }
 
-func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *ent.TimeWindow) {
+func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
 	masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
 	if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
 		// 2-1 如果是不在旧故障窗口内,添加一个新窗口
-		newTimeWindow := ent.TimeWindow{
+		newTimeWindow := entity.TimeWindow{
 			FaultTime:       faultHappenTime,
 			TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
 			TimeWindowEnd:   util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
@@ -233,12 +231,12 @@ func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *e
 			MasterTopics:    masterTopics,
 			SlaveTopics:     slaveTopics,
 		}
-		log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
-		util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
+		c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
+		entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
 	} else {
 		// 2-2 如果在旧故障窗口内
-		global.TimeWindowProducerQueueMutex.RLock()
-		defer global.TimeWindowProducerQueueMutex.RUnlock()
+		entity.TimeWindowProducerQueueMutex.RLock()
+		defer entity.TimeWindowProducerQueueMutex.RUnlock()
 		// 2-2-1 更新故障窗口end时间
 		maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
 		expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
@@ -259,7 +257,7 @@ func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *e
 		lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
 		sourceSlaveTopics := lastTimeWindow.SlaveTopics
 		lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
-		log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
+		c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
 
 	}
 }

+ 14 - 14
aarch64/kinglong/slave/main/slave.go

@@ -1,36 +1,36 @@
 package main
 
 import (
-	cfg2 "cicv-data-closedloop/kinglong/common/cfg"
-	commonInit "cicv-data-closedloop/kinglong/common/init"
-	"cicv-data-closedloop/kinglong/common/log"
-	svc3 "cicv-data-closedloop/kinglong/common/svc"
-	slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
-	svc4 "cicv-data-closedloop/kinglong/slave/pkg/svc"
+	"cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/aarch64/kinglong/common/init"
+	commonService "cicv-data-closedloop/aarch64/kinglong/common/service"
+	slaveConfig "cicv-data-closedloop/aarch64/kinglong/slave/package/config"
+	slaveService "cicv-data-closedloop/aarch64/kinglong/slave/package/service"
+	"cicv-data-closedloop/common/config/c_log"
 )
 
 // init 初始化函数
 func init() {
 	//runtime.GOMAXPROCS(1)
-	log.InitLogConfig("kinglong-slave")
-	commonInit.Init()
+	c_log.InitLog("mnt/media/sda1/cicv-data-closedloop/log", "kinglong-master")
+	init.Init()
 	slaveConfig.InitTcpListener()
-	cfg2.InitKillSignalListener(cfg2.CloudConfig.Hosts[1].Ip)
+	config.InitKillSignalListener(config.CloudConfig.Hosts[1].Ip)
 	// 等待重启,接收到重启信号,会把信号分发给以下channel
-	go svc3.WaitKillSelf()
+	go commonService.WaitKillSelf()
 }
 
 // main 主函数
 func main() {
 
 	// 1 负责打包数据到data目录
-	go svc3.BagRecord(cfg2.CloudConfig.Hosts[1].Name)
+	go commonService.BagRecord(config.CloudConfig.Hosts[1].Name)
 	// 2 负责监控故障,并修改timeWindow
-	go svc4.PrepareTimeWindowProducerQueue()
+	go slaveService.PrepareTimeWindowProducerQueue()
 	// 3
-	go svc4.RunTimeWindowProducerQueue()
+	go slaveService.RunTimeWindowProducerQueue()
 	// 4 排队运行时间窗口
-	go svc3.RunTimeWindowConsumerQueue(cfg2.CloudConfig.Hosts[1].Name)
+	go commonService.RunTimeWindowConsumerQueue(config.CloudConfig.Hosts[1].Name)
 
 	// 阻塞主线程,等待其他线程执行。
 	select {}

+ 6 - 6
aarch64/kinglong/slave/package/config/slave_tcp_config.go

@@ -1,8 +1,8 @@
-package cfg
+package config
 
 import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/log"
+	commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
+	"cicv-data-closedloop/common/config/c_log"
 	"net"
 	"os"
 )
@@ -10,13 +10,13 @@ import (
 var TcpListener net.Listener
 
 func InitTcpListener() {
-	log.GlobalLogger.Info("从节点初始化TCP端口监听 - 开始。")
+	c_log.GlobalLogger.Info("从节点初始化TCP端口监听 - 开始。")
 	socket := commonConfig.CloudConfig.Hosts[1].Ip + ":" + commonConfig.CloudConfig.TcpPort
 	var err error
 	TcpListener, err = net.Listen("tcp", socket)
 	if err != nil {
-		log.GlobalLogger.Error("监听tcp端口失败:", err)
+		c_log.GlobalLogger.Error("监听tcp端口失败:", err)
 		os.Exit(-1)
 	}
-	log.GlobalLogger.Info("从节点初始化TCP端口监听 - 成功:", socket)
+	c_log.GlobalLogger.Info("从节点初始化TCP端口监听 - 成功:", socket)
 }

+ 12 - 13
aarch64/kinglong/slave/package/service/accept_window.go

@@ -1,11 +1,10 @@
-package svc
+package service
 
 import (
-	"cicv-data-closedloop/kinglong/common/ent"
-	commonCfg "cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/svc"
-	"cicv-data-closedloop/kinglong/common/util"
-	slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
+	"cicv-data-closedloop/aarch64/kinglong/common/service"
+	slaveConfig "cicv-data-closedloop/aarch64/kinglong/slave/package/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/entity"
 	"context"
 	"encoding/json"
 	"sync"
@@ -17,11 +16,11 @@ func PrepareTimeWindowProducerQueue() {
 	// 处理退出信号
 	go func() {
 		select {
-		case signal := <-svc.ChannelKillWindowProducer:
+		case signal := <-service.ChannelKillWindowProducer:
 			if signal == 1 {
 				cancel()
 				slaveConfig.TcpListener.Close()
-				svc.AddKillTimes("3")
+				service.AddKillTimes("3")
 				return
 			}
 		}
@@ -38,7 +37,7 @@ func PrepareTimeWindowProducerQueue() {
 				case <-ctx.Done():
 					return
 				default:
-					commonCfg.GlobalLogger.Error("接受连接错误:", err)
+					c_log.GlobalLogger.Error("接受连接错误:", err)
 					continue
 				}
 			}
@@ -46,16 +45,16 @@ func PrepareTimeWindowProducerQueue() {
 			buffer := make([]byte, 2048)
 			total, err := conn.Read(buffer)
 			if err != nil {
-				commonCfg.GlobalLogger.Error("读取数据错误:", err)
+				c_log.GlobalLogger.Error("读取数据错误:", err)
 				continue
 			}
-			var timeWindow ent.TimeWindow
+			var timeWindow entity.TimeWindow
 			err = json.Unmarshal(buffer[:total], &timeWindow)
 			if err != nil {
-				commonCfg.GlobalLogger.Error("解析Json时出错:", err)
+				c_log.GlobalLogger.Error("解析Json时出错:", err)
 				continue
 			}
-			util.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
+			entity.AddTimeWindowToTimeWindowProducerQueue(timeWindow)
 			prepareTimeWindowProducerQueueMutex.Unlock()
 		}
 

+ 19 - 18
aarch64/kinglong/slave/package/service/move_bag.go

@@ -1,23 +1,24 @@
-package svc
+package service
 
 import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/global"
-	"cicv-data-closedloop/kinglong/common/log"
-	commonService "cicv-data-closedloop/kinglong/common/svc"
-	"cicv-data-closedloop/kinglong/common/util"
+	commonConfig "cicv-data-closedloop/aarch64/kinglong/common/config"
+	commonService "cicv-data-closedloop/aarch64/kinglong/common/service"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	"time"
 )
 
 // RunTimeWindowProducerQueue 将时间窗口内的包全部move出去,并等待当前时间窗口结束触发上传
 func RunTimeWindowProducerQueue() {
-	log.GlobalLogger.Info("生产者队列 - 启动")
+	c_log.GlobalLogger.Info("生产者队列 - 启动")
 	for { // 必须串行排队处理
 		select {
 		case signal := <-commonService.ChannelKillMove:
 			if signal == 1 {
 				commonService.ChannelKillMove <- 1
-				if len(global.TimeWindowProducerQueue) == 0 {
+				if len(entity.TimeWindowProducerQueue) == 0 {
 					commonService.AddKillTimes("4")
 					return
 				}
@@ -28,26 +29,26 @@ func RunTimeWindowProducerQueue() {
 		default:
 		}
 		time.Sleep(time.Duration(1) * time.Second)
-		if len(global.TimeWindowProducerQueue) > 0 {
-			currentTimeWindow := global.TimeWindowProducerQueue[0]
+		if len(entity.TimeWindowProducerQueue) > 0 {
+			currentTimeWindow := entity.TimeWindowProducerQueue[0]
 			// 将时间窗口移出准备队列
-			util.RemoveHeadOfdTimeWindowProducerQueue()
+			entity.RemoveHeadOfTimeWindowProducerQueue()
 			if currentTimeWindow.CanUpload == "yes" {
-				log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
+				c_log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
 			}
 			if currentTimeWindow.CanUpload == "no" {
-				log.GlobalLogger.Info("从节点接收到不可上传的timeWindow")
+				c_log.GlobalLogger.Info("从节点接收到不可上传的timeWindow")
 			}
 
 			// 2 timeWindow不可以上传,则将data目录下的数据move到copy目录
-			bags := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
+			bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
 			for _, bag := range bags {
 				bagTime := util.GetBagTime(bag)
 				compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
 				compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
 				if compare1 && compare2 {
 					// 将bag包移动到Copy目录
-					util.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
+					domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
 				} else {
 					if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
 						// 必须已经生成了窗口之后的包才算窗口结束了
@@ -58,11 +59,11 @@ func RunTimeWindowProducerQueue() {
 			// 判断是否可上传
 			if currentTimeWindow.CanUpload == "yes" {
 				// 1 timeWindow可以上传
-				log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
+				c_log.GlobalLogger.Info("timeWindow可以上传:", currentTimeWindow)
 				// 补充bag包
-				util.SupplyCopyBags(currentTimeWindow)
+				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
 				// 将时间窗口加入运行队列
-				util.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
+				entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
 			}
 		}
 	}

+ 3 - 4
aarch64/pji/common/config/c_platform.go

@@ -3,7 +3,6 @@ package config
 import (
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/kinglong/common/log"
 	"encoding/json"
 	"strings"
 	"time"
@@ -99,8 +98,8 @@ func GetAccessToken() (string, error) {
 		return "", nil
 	}
 
-	dataMap := respMap["data"].(map[string]interface{})
-	if err != nil {
+	dataMap, ok := respMap["data"].(map[string]interface{})
+	if !ok {
 		c_log.GlobalLogger.Error("解析返回结果.data", dataMap, "失败:", err)
 		return "", nil
 	}
@@ -147,7 +146,7 @@ func GetStatus(taskConfigId string) (string, error) {
 	}
 	dataMap, ok := respMap["data"].(map[string]interface{})
 	if !ok {
-		log.GlobalLogger.Errorf("解析【返回结果.data】的类型不是(map[string]interface{}),【dataMap】=%v", dataMap)
+		c_log.GlobalLogger.Errorf("解析【返回结果.data】的类型不是(map[string]interface{}),【dataMap】=%v", dataMap)
 		return "", err
 	}
 	return dataMap["status"].(string), nil

+ 1 - 2
aarch64/pji/common/service/disk_clean.go

@@ -7,7 +7,6 @@ import (
 	"cicv-data-closedloop/common/domain"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/kinglong/common/cfg"
 	"time"
 )
 
@@ -75,7 +74,7 @@ func deleteTimeWindow(indexToRemove int) {
 }
 
 func getIndexToRemoveForLRU() int {
-	lru := cfg.PlatformConfig.Lru
+	lru := commonConfig.PlatformConfig.Lru
 	i := len(lru) - 1
 	for i >= 0 {
 		for i2, window := range entity.TimeWindowConsumerQueue {

+ 12 - 7
common/entity/time_window.go

@@ -12,13 +12,18 @@ var (
 
 	TimeWindowConsumerQueue      []TimeWindow
 	TimeWindowConsumerQueueMutex sync.RWMutex
-
-	Subscriber1Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber1TimeMutex sync.Mutex
-	Subscriber2Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber2TimeMutex sync.Mutex
-	Subscriber4Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber4TimeMutex sync.Mutex
+	Subscriber0Time              = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber0TimeMutex         sync.Mutex
+	Subscriber1Time              = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber1TimeMutex         sync.Mutex
+	Subscriber2Time              = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber2TimeMutex         sync.Mutex
+	Subscriber3Time              = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber3TimeMutex         sync.Mutex
+	Subscriber4Time              = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber4TimeMutex         sync.Mutex
+	Subscriber5Time              = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber5TimeMutex         sync.Mutex
 
 	TcpSendTime      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
 	TcpSendTimeMutex sync.Mutex