孟令鑫 1 anno fa
parent
commit
1d72d743e1
49 ha cambiato i file con 483 aggiunte e 544 eliminazioni
  1. 0 0
      aarch64/pjisuv/common/config/cloud-config-v2253.yaml
  2. 0 0
      aarch64/pjisuv/common/config/cloud-config-v2304.yaml
  3. 0 0
      aarch64/pjisuv/common/config/cloud-config.yaml
  4. 15 15
      aarch64/pjisuv/common/config/cloud_config.go
  5. 21 0
      aarch64/pjisuv/common/config/kill_rpc_server_config.go
  6. 0 0
      aarch64/pjisuv/common/config/local-config-soc1.yaml
  7. 0 0
      aarch64/pjisuv/common/config/local-config-soc2.yaml
  8. 0 0
      aarch64/pjisuv/common/config/local-config.yaml
  9. 6 6
      aarch64/pjisuv/common/config/local_config.go
  10. 10 10
      aarch64/pjisuv/common/config/oss_config.go
  11. 16 17
      aarch64/pjisuv/common/config/platform_config.go
  12. 24 0
      aarch64/pjisuv/common/config/ros_config.go
  13. 0 0
      aarch64/pjisuv/common/global/global.go
  14. 8 8
      aarch64/pjisuv/common/init/common_init.go
  15. 11 11
      aarch64/pjisuv/common/service/disk_clean.go
  16. 15 16
      aarch64/pjisuv/common/service/kill_self.go
  17. 0 0
      aarch64/pjisuv/common/service/rosbag_clean.go
  18. 0 0
      aarch64/pjisuv/common/service/rosbag_record.go
  19. 0 0
      aarch64/pjisuv/common/service/rosbag_upload.go
  20. 16 0
      aarch64/pjisuv/common/util/move_bag.go
  21. 0 0
      aarch64/pjisuv/common/util/parse_json.go
  22. 0 0
      aarch64/pjisuv/common/util/send_tcp.go
  23. 0 0
      aarch64/pjisuv/common/util/util_exec.go
  24. 0 24
      aarch64/pjisuv/common/util/util_io.go
  25. 40 0
      aarch64/pjisuv/common/util/utils.go
  26. 15 15
      aarch64/pjisuv/control/main/control.go
  27. 7 7
      aarch64/pjisuv/master/main/master.go
  28. 1 1
      aarch64/pjisuv/master/package/config/master_tcp_config.go
  29. 1 1
      aarch64/pjisuv/master/package/config/master_trigger_config.go
  30. 0 0
      aarch64/pjisuv/master/package/service/move_bag_and_send_window.go
  31. 0 0
      aarch64/pjisuv/master/package/service/produce_window.go
  32. 38 0
      aarch64/pjisuv/slave/main/slave.go
  33. 0 0
      aarch64/pjisuv/slave/package/config/slave_tcp_config.go
  34. 0 0
      aarch64/pjisuv/slave/package/service/accept_window.go
  35. 0 0
      aarch64/pjisuv/slave/package/service/move_bag.go
  36. 46 0
      common/domain/d_service.go
  37. 83 0
      common/entity/time_window.go
  38. 54 0
      common/util/u_io.go
  39. 0 0
      common/util/u_oss.go
  40. 11 0
      common/util/u_slice.go
  41. 43 1
      common/util/u_time.go
  42. 2 3
      kinglong/common/svc/kill_self.go
  43. 0 21
      pjisuv/common/config/kill_rpc_server_cfg.go
  44. 0 24
      pjisuv/common/config/ros_cfg.go
  45. 0 13
      pjisuv/common/entity/time_window.go
  46. 0 64
      pjisuv/common/log/log_cfg.go
  47. 0 17
      pjisuv/common/util/move_bag.go
  48. 0 232
      pjisuv/common/util/utils.go
  49. 0 38
      pjisuv/slave/main/slave.go

+ 0 - 0
pjisuv/common/config/cloud-config-v2253.yaml → aarch64/pjisuv/common/config/cloud-config-v2253.yaml


+ 0 - 0
pjisuv/common/config/cloud-config-v2304.yaml → aarch64/pjisuv/common/config/cloud-config-v2304.yaml


+ 0 - 0
pjisuv/common/config/cloud-config.yaml → aarch64/pjisuv/common/config/cloud-config.yaml


+ 15 - 15
pjisuv/common/config/cloud_config.go → aarch64/pjisuv/common/config/cloud_config.go

@@ -1,8 +1,8 @@
-package cfg
+package config
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/kinglong/common/log"
 	"gopkg.in/yaml.v3"
 	"os"
 	"sync"
@@ -55,7 +55,7 @@ var (
 
 // InitCloudConfig 初始化业务配置
 func InitCloudConfig() {
-	log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
+	c_log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
 	// 获取文件的目录
 	_ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
 	// 3 ------- 获取 yaml 字符串 -------
@@ -63,13 +63,13 @@ func InitCloudConfig() {
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
 	if err != nil {
-		log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
+		c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		os.Exit(-1)
 	}
 
 	content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
 	if err != nil {
-		log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
+		c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
 		os.Exit(-1)
 	}
 
@@ -77,7 +77,7 @@ func InitCloudConfig() {
 	var newCloudConfig cloudConfig
 	err = yaml.Unmarshal(content, &newCloudConfig)
 	if err != nil {
-		log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
+		c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
 		os.Exit(-1)
 	}
 
@@ -87,12 +87,12 @@ func InitCloudConfig() {
 		CloudConfig = newCloudConfig
 		CloudConfigMutex.RUnlock()
 	} else {
-		log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
+		c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
 		os.Exit(-1)
 	}
-	log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
-	util.CreateDir(CloudConfig.BagDataDir)
-	util.CreateDir(CloudConfig.BagCopyDir)
+	c_log.GlobalLogger.Info("初始化OSS配置文件 - 成功。")
+	_ = util.CreateDir(CloudConfig.BagDataDir)
+	_ = util.CreateDir(CloudConfig.BagCopyDir)
 	timeToLabelJson, _ := util.MapToJsonString(map[string]interface{}{"time": "label"})
 	_ = util.WriteFile(timeToLabelJson, CloudConfig.TimeToLabelJsonPath)
 }
@@ -106,13 +106,13 @@ func refreshCloudConfig() {
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
 	if err != nil {
-		log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
+		c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		return
 	}
 
 	content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
 	if err != nil {
-		log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
+		c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 读取失败:", err)
 		return
 	}
 
@@ -120,7 +120,7 @@ func refreshCloudConfig() {
 	var newCloudConfig cloudConfig
 	err = yaml.Unmarshal(content, &newCloudConfig)
 	if err != nil {
-		log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
+		c_log.GlobalLogger.Error("配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
 		return
 	}
 
@@ -130,7 +130,7 @@ func refreshCloudConfig() {
 		CloudConfig = newCloudConfig
 		CloudConfigMutex.RUnlock()
 	} else {
-		log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
+		c_log.GlobalLogger.Error("配置文件格式错误:", newCloudConfig)
 		return
 	}
 	util.CreateDir(CloudConfig.BagDataDir)
@@ -148,7 +148,7 @@ func RefreshCloudConfig() {
 // CheckConfig 校验 cfg.yaml 文件
 func checkConfig(check cloudConfig) bool {
 	if len(check.Hosts) != 2 {
-		log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为2。")
+		c_log.GlobalLogger.Error("cloud-config.yaml中配置的hosts必须为2。")
 		os.Exit(-1)
 	}
 	return true

+ 21 - 0
aarch64/pjisuv/common/config/kill_rpc_server_config.go

@@ -0,0 +1,21 @@
+package config
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"net"
+	"os"
+)
+
+var KillSignalListener net.Listener
+
+func InitKillSignalListener(serverIp string) {
+	var err error
+	c_log.GlobalLogger.Info("初始化RPC端口监听Kill信号 - 开始。")
+	socket := serverIp + ":" + CloudConfig.RpcPort
+	KillSignalListener, err = net.Listen("tcp", socket)
+	if err != nil {
+		c_log.GlobalLogger.Error("监听rpc端口失败:", err)
+		os.Exit(-1)
+	}
+	c_log.GlobalLogger.Info("初始化RPC端口监听Kill信号 - 成功:", socket)
+}

+ 0 - 0
pjisuv/common/config/local-config-soc1.yaml → aarch64/pjisuv/common/config/local-config-soc1.yaml


+ 0 - 0
pjisuv/common/config/local-config-soc2.yaml → aarch64/pjisuv/common/config/local-config-soc2.yaml


+ 0 - 0
pjisuv/common/config/local-config.yaml → aarch64/pjisuv/common/config/local-config.yaml


+ 6 - 6
pjisuv/common/config/local_cfg.go → aarch64/pjisuv/common/config/local_config.go

@@ -1,7 +1,7 @@
-package cfg
+package config
 
 import (
-	"cicv-data-closedloop/kinglong/common/log"
+	"cicv-data-closedloop/common/config/c_log"
 	"gopkg.in/yaml.v2"
 	"os"
 )
@@ -34,21 +34,21 @@ var (
 )
 
 func InitLocalConfig() {
-	log.GlobalLogger.Info("初始化本地配置文件 - 开始:", localConfigPath)
+	c_log.GlobalLogger.Info("初始化本地配置文件 - 开始:", localConfigPath)
 	// 读取YAML文件内容
 	content, err := os.ReadFile(localConfigPath)
 	if err != nil {
-		log.GlobalLogger.Error("读取本地配置文件失败。", err)
+		c_log.GlobalLogger.Error("读取本地配置文件失败。", err)
 		os.Exit(-1)
 	}
 
 	// 解析YAML内容
 	err = yaml.Unmarshal(content, &LocalConfig)
 	if err != nil {
-		log.GlobalLogger.Error("解析本地配置文件失败。", err)
+		c_log.GlobalLogger.Error("解析本地配置文件失败。", err)
 		os.Exit(-1)
 	}
 
-	log.GlobalLogger.Info("初始化本地配置文件 - 成功:", LocalConfig)
+	c_log.GlobalLogger.Info("初始化本地配置文件 - 成功:", LocalConfig)
 
 }

+ 10 - 10
pjisuv/common/config/oss_cfg.go → aarch64/pjisuv/common/config/oss_config.go

@@ -1,8 +1,8 @@
-package cfg
+package config
 
 import (
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"encoding/json"
 	"github.com/aliyun/aliyun-oss-go-sdk/oss"
 	"os"
@@ -19,29 +19,29 @@ var OssClient *oss.Client
 var OssBucket *oss.Bucket
 
 func InitOssConfig() {
-	log.GlobalLogger.Info("初始化OSS客户端对象 - 开始。")
+	c_log.GlobalLogger.Info("初始化OSS客户端对象 - 开始。")
 	// 1 访问 HTTP 服务获取 OSS 配置
-	get, err := cutil.HttpGet(LocalConfig.UrlGetOssConfig)
+	get, err := util.HttpGet(LocalConfig.UrlGetOssConfig)
 	if err != nil {
-		log.GlobalLogger.Error("http获取oss配置时出错:", err)
+		c_log.GlobalLogger.Error("http获取oss配置时出错:", err)
 		os.Exit(-1)
 	}
 	var ossConnectInfo OssConnectInfoStruct
 	err = json.Unmarshal([]byte(get), &ossConnectInfo)
 	if err != nil {
-		log.GlobalLogger.Error("解析json时出错:", err)
+		c_log.GlobalLogger.Error("解析json时出错:", err)
 		os.Exit(-1)
 	}
 
 	OssClient, err = oss.New(ossConnectInfo.Endpoint, ossConnectInfo.AccessKeyId, ossConnectInfo.AccessKeySecret, oss.UseCname(true))
 	if err != nil {
-		log.GlobalLogger.Error("无法创建阿里云client:", err)
+		c_log.GlobalLogger.Error("无法创建阿里云client:", err)
 		os.Exit(-1)
 	}
 	OssBucket, err = OssClient.Bucket(ossConnectInfo.BucketName)
 	if err != nil {
-		log.GlobalLogger.Error("无法创建阿里云bucket:", err)
+		c_log.GlobalLogger.Error("无法创建阿里云bucket:", err)
 		os.Exit(-1)
 	}
-	log.GlobalLogger.Info("初始化OSS客户端对象 - 成功。")
+	c_log.GlobalLogger.Info("初始化OSS客户端对象 - 成功。")
 }

+ 16 - 17
pjisuv/common/config/platform_cfg.go → aarch64/pjisuv/common/config/platform_config.go

@@ -1,9 +1,8 @@
-package cfg
+package config
 
 import (
+	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
 	"encoding/json"
 	"time"
 )
@@ -40,19 +39,19 @@ var PlatformConfig platformConfig
 // InitPlatformConfig 初始化数据闭环平台的配置
 func InitPlatformConfig() {
 	var err error
-	log.GlobalLogger.Info("获取数据闭环平台配置 - 开始")
+	c_log.GlobalLogger.Info("获取数据闭环平台配置 - 开始")
 	// 1 如果车辆没有配置任务,则阻塞在这里,不启动任务
 	for {
 		time.Sleep(time.Duration(1))
-		log.GlobalLogger.Info("获取数据闭环平台配置 - 进行中")
+		c_log.GlobalLogger.Info("获取数据闭环平台配置 - 进行中")
 		PlatformConfig, err = getConfig()
 		if err != nil {
-			log.GlobalLogger.Error("获取配置status失败:", err)
+			c_log.GlobalLogger.Error("获取配置status失败:", err)
 			continue
 		}
 		break
 	}
-	log.GlobalLogger.Info("获取数据闭环平台配置 - 成功。")
+	c_log.GlobalLogger.Info("获取数据闭环平台配置 - 成功。")
 }
 
 /*
@@ -70,7 +69,7 @@ func InitPlatformConfig() {
 */
 // GetAccessToken 认证接口,获取access_token
 func GetAccessToken() (string, error) {
-	respJson, err := cutil.HttpPostJsonResponseString(
+	respJson, err := util.HttpPostJsonResponseString(
 		CloudConfig.Platform.UrlDeviceAuth,
 		map[string]string{
 			"equipmentNo": LocalConfig.EquipmentNo,
@@ -82,13 +81,13 @@ func GetAccessToken() (string, error) {
 	}
 	respMap, err := util.JsonStringToMap(respJson)
 	if err != nil {
-		log.GlobalLogger.Error("解析返回结果", respJson, "失败:", err)
+		c_log.GlobalLogger.Error("解析返回结果", respJson, "失败:", err)
 		return "", nil
 	}
 
 	dataMap := respMap["data"].(map[string]interface{})
 	if err != nil {
-		log.GlobalLogger.Error("解析返回结果.data", dataMap, "失败:", err)
+		c_log.GlobalLogger.Error("解析返回结果.data", dataMap, "失败:", err)
 		return "", nil
 	}
 	return dataMap["accessToken"].(string), nil
@@ -112,7 +111,7 @@ func GetStatus(taskConfigId string) (string, error) {
 	if err != nil {
 		return "", err
 	}
-	resp, err := cutil.HttpGetStringAddHeadersResponseString(
+	resp, err := util.HttpGetStringAddHeadersResponseString(
 		CloudConfig.Platform.UrlTaskPoll,
 		map[string]string{
 			"authorization": token,
@@ -124,17 +123,17 @@ func GetStatus(taskConfigId string) (string, error) {
 	)
 
 	if err != nil {
-		log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
+		c_log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
 		return "", err
 	}
 	respMap, err := util.JsonStringToMap(resp)
 	if err != nil {
-		log.GlobalLogger.Error("解析【返回结果1】", resp, "失败:", err)
+		c_log.GlobalLogger.Error("解析【返回结果1】", resp, "失败:", err)
 		return "", err
 	}
 	dataMap, ok := respMap["data"].(map[string]interface{})
 	if !ok {
-		log.GlobalLogger.Errorf("解析【返回结果.data】的类型不是(map[string]interface{}),【dataMap】=%v", dataMap)
+		c_log.GlobalLogger.Errorf("解析【返回结果.data】的类型不是(map[string]interface{}),【dataMap】=%v", dataMap)
 		return "", err
 	}
 	return dataMap["status"].(string), nil
@@ -147,7 +146,7 @@ func getConfig() (platformConfig, error) {
 	}
 	// 下载插件和获取配置
 	// 2 访问配置获取接口
-	resp, err := cutil.HttpGetStringAddHeadersResponseString(
+	resp, err := util.HttpGetStringAddHeadersResponseString(
 		CloudConfig.Platform.UrlTask,
 		map[string]string{
 			"authorization": token,
@@ -157,13 +156,13 @@ func getConfig() (platformConfig, error) {
 		},
 	)
 	if err != nil {
-		log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
+		c_log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
 		return platformConfig{}, err
 	}
 	var result response
 	err = json.Unmarshal([]byte(resp), &result)
 	if err != nil {
-		log.GlobalLogger.Error("解析【返回结果】", resp, "失败:", err)
+		c_log.GlobalLogger.Error("解析【返回结果】", resp, "失败:", err)
 		return platformConfig{}, err
 	}
 	return result.Data, nil

+ 24 - 0
aarch64/pjisuv/common/config/ros_config.go

@@ -0,0 +1,24 @@
+package config
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
+	"github.com/bluenviron/goroslib/v2"
+	"os"
+)
+
+var RosNode *goroslib.Node
+
+func InitRosConfig() {
+	c_log.GlobalLogger.Info("初始化RosNode - 开始")
+	var err error
+	RosNode, err = goroslib.NewNode(goroslib.NodeConf{
+		Name:          "node" + util.GetNowTimeCustom(),
+		MasterAddress: CloudConfig.Ros.MasterAddress,
+	})
+	if err != nil {
+		c_log.GlobalLogger.Error("初始化RosNode - 失败:", err)
+		os.Exit(-1)
+	}
+	c_log.GlobalLogger.Info("初始化RosNode - 成功:", CloudConfig.Ros.MasterAddress)
+}

+ 0 - 0
pjisuv/common/global/global.go → aarch64/pjisuv/common/global/global.go


+ 8 - 8
pjisuv/common/init/common_init.go → aarch64/pjisuv/common/init/common_init.go

@@ -1,28 +1,28 @@
 package init
 
 import (
-	"cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/svc"
+	"cicv-data-closedloop/aarch64/pjisuv/common/config"
+	"cicv-data-closedloop/aarch64/pjisuv/common/service"
 )
 
 func Init() {
 
 	// 初始化本地配置文件(第1处配置,在本地文件)
-	cfg.InitLocalConfig()
+	config.InitLocalConfig()
 
 	// 初始化Oss连接信息
-	cfg.InitOssConfig()
+	config.InitOssConfig()
 
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
-	cfg.InitCloudConfig()
+	config.InitCloudConfig()
 
-	go cfg.RefreshCloudConfig()
+	go config.RefreshCloudConfig()
 
 	// 初始化数据闭环平台的配置(第3处配置,在数据闭环平台接口)
-	cfg.InitPlatformConfig()
+	config.InitPlatformConfig()
 
 	// 初始化ros节点
-	cfg.InitRosConfig()
+	config.InitRosConfig()
 
 	// 维护data目录缓存的包数量
 	go svc.BagCacheClean()

+ 11 - 11
pjisuv/common/service/disk_clean.go → aarch64/pjisuv/common/service/disk_clean.go

@@ -1,8 +1,8 @@
 package svc
 
 import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/global"
+	commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
+	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/kinglong/common/log"
 	"cicv-data-closedloop/kinglong/common/util"
 	masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
@@ -36,17 +36,17 @@ func DiskClean() {
 			log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
 			if policy == "TTL" {
 				// 1 获取时间窗口队列中的第二个
-				if len(global.TimeWindowConsumerQueue) > 2 {
+				if len(entity.TimeWindowConsumerQueue) > 2 {
 					deleteTimeWindow(1)
 				}
 			} else if policy == "STOP" {
 				// 2 获取时间窗口队列中的倒数第一个
-				if len(global.TimeWindowConsumerQueue) > 2 {
-					deleteTimeWindow(len(global.TimeWindowConsumerQueue) - 1)
+				if len(entity.TimeWindowConsumerQueue) > 2 {
+					deleteTimeWindow(len(entity.TimeWindowConsumerQueue) - 1)
 				}
 			} else if policy == "LRU" {
 				// 3 获取优先级最低的时间窗口
-				if len(global.TimeWindowConsumerQueue) > 2 {
+				if len(entity.TimeWindowConsumerQueue) > 2 {
 					indexToRemove := getIndexToRemoveForLRU()
 					if indexToRemove != -1 {
 						deleteTimeWindow(indexToRemove)
@@ -60,7 +60,7 @@ func DiskClean() {
 }
 
 func deleteTimeWindow(indexToRemove int) {
-	timeWindowToRemove := global.TimeWindowConsumerQueue[indexToRemove]
+	timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
 	// 删除文件
 	faultTime := timeWindowToRemove.FaultTime
 	dir := util.GetCopyDir(faultTime)
@@ -68,17 +68,17 @@ func deleteTimeWindow(indexToRemove int) {
 	if err != nil {
 		log.GlobalLogger.Error("删除目录", dir, "失败:", err)
 	}
-	global.TimeWindowConsumerQueueMutex.Lock()
+	entity.TimeWindowConsumerQueueMutex.Lock()
 	// 使用切片的特性删除指定位置的元素
-	global.TimeWindowConsumerQueue = append(global.TimeWindowConsumerQueue[:indexToRemove], global.TimeWindowConsumerQueue[indexToRemove+1:]...)
-	global.TimeWindowConsumerQueueMutex.Unlock()
+	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
+	entity.TimeWindowConsumerQueueMutex.Unlock()
 }
 
 func getIndexToRemoveForLRU() int {
 	lru := commonConfig.PlatformConfig.Lru
 	i := len(lru) - 1
 	for i >= 0 {
-		for i2, window := range global.TimeWindowConsumerQueue {
+		for i2, window := range entity.TimeWindowConsumerQueue {
 			if masterConfig.LabelMapTriggerId[window.FaultTime] == lru[i] {
 				return i2
 			}

+ 15 - 16
pjisuv/common/service/kill_self.go → aarch64/pjisuv/common/service/kill_self.go

@@ -1,10 +1,9 @@
 package svc
 
 import (
-	"cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/kinglong/common/util"
-	commonConfig "cicv-data-closedloop/pji/common/cfg"
+	"cicv-data-closedloop/aarch64/pjisuv/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"net/rpc"
 	"os"
 	"sync"
@@ -35,7 +34,7 @@ type KillService struct{}
 
 // Kill 杀死自身程序,通过通道实现 方法必须满足RPC规范:函数有两个参数,第一个参数是请求,第二个是响应
 func (m *KillService) Kill(args *KillSignal, reply *int) error {
-	log.GlobalLogger.Info("接收到自杀信号:", *args)
+	c_log.GlobalLogger.Info("接收到自杀信号:", *args)
 	// 1 杀死 rosbag record 命令
 	ChannelKillRosRecord <- 1
 	// 2 杀死所有 ros 订阅者
@@ -55,13 +54,13 @@ func (m *KillService) Kill(args *KillSignal, reply *int) error {
 func WaitKillSelf() {
 	killService := new(KillService)
 	if err := rpc.Register(killService); err != nil {
-		log.GlobalLogger.Error("注册rpc服务失败:", err)
+		c_log.GlobalLogger.Error("注册rpc服务失败:", err)
 		return
 	}
 
 	// 等待并处理远程调用请求
 	for {
-		conn, err := cfg.KillSignalListener.Accept()
+		conn, err := config.KillSignalListener.Accept()
 		if err != nil {
 			continue
 		}
@@ -76,24 +75,24 @@ func AddKillTimes(info string) {
 	case "1":
 		close(ChannelKillRosRecord)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死record打包goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 		ChannelKillDiskClean <- 1
 	case "2":
 		close(ChannelKillDiskClean)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包数量维护goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 	case "3":
 		close(ChannelKillWindowProducer)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死时间窗口生产者,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死时间窗口生产者,当前自杀进度 %v / %v", KillTimes, KillChannel)
 	case "4":
 		close(ChannelKillMove)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包移动goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 	case "5":
 		close(ChannelKillConsume)
 		KillTimes++
-		log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
+		c_log.GlobalLogger.Infof("已杀死bag包消费goroutine,当前自杀进度 %v / %v", KillTimes, KillChannel)
 	}
 	MutexKill.Unlock()
 }
@@ -103,14 +102,14 @@ func killDone(restart bool) {
 		time.Sleep(time.Duration(1) * time.Second)
 		if KillChannel == KillTimes {
 			if restart {
-				_, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...)
+				_, err := util.ExecuteWithPath(config.LocalConfig.RestartCmd.Dir, config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args...)
 				if err != nil {
-					log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
+					c_log.GlobalLogger.Info("启动新程序失败,【path】=", config.LocalConfig.RestartCmd.Dir, "【cmd】=", config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args, ":", err)
 					os.Exit(-1)
 				}
-				log.GlobalLogger.Info("数据采集任务更新,正常退出当前程序。")
+				c_log.GlobalLogger.Info("数据采集任务更新,正常退出当前程序。")
 			} else {
-				log.GlobalLogger.Info("数据采集任务终止,正常退出当前程序。")
+				c_log.GlobalLogger.Info("数据采集任务终止,正常退出当前程序。")
 			}
 			os.Exit(0)
 		}

+ 0 - 0
pjisuv/common/service/rosbag_clean.go → aarch64/pjisuv/common/service/rosbag_clean.go


+ 0 - 0
pjisuv/common/service/rosbag_record.go → aarch64/pjisuv/common/service/rosbag_record.go


+ 0 - 0
pjisuv/common/service/rosbag_upload.go → aarch64/pjisuv/common/service/rosbag_upload.go


+ 16 - 0
aarch64/pjisuv/common/util/move_bag.go

@@ -0,0 +1,16 @@
+package util
+
+import (
+	"cicv-data-closedloop/common/util"
+	"strings"
+)
+
+func MoveFromDataToCopy(faultTime string, bagDataDir string, sourceBag string) {
+	dir := GetCopyDir(faultTime)
+	_ = util.CreateDir(dir)
+	targetBag := strings.Replace(sourceBag, bagDataDir, dir, 1)
+	var copyCommand []string
+	copyCommand = append(copyCommand, sourceBag)
+	copyCommand = append(copyCommand, targetBag)
+	_, _, _ = Execute("mv", copyCommand...)
+}

+ 0 - 0
pjisuv/common/util/parse_json.go → aarch64/pjisuv/common/util/parse_json.go


+ 0 - 0
pjisuv/common/util/send_tcp.go → aarch64/pjisuv/common/util/send_tcp.go


+ 0 - 0
pjisuv/common/util/util_exec.go → aarch64/pjisuv/common/util/util_exec.go


+ 0 - 24
pjisuv/common/util/util_io.go → aarch64/pjisuv/common/util/util_io.go

@@ -1,34 +1,10 @@
 package util
 
 import (
-	"cicv-data-closedloop/kinglong/common/log"
 	"os"
 	"path/filepath"
-	"sort"
-	"strings"
 )
 
-func ListAbsolutePathWithSuffixAndSort(dir string, suffix string) []string {
-	var result []string
-	if !strings.HasSuffix(dir, "/") {
-		dir = dir + "/"
-	}
-	files, err := os.ReadDir(dir)
-	if err != nil {
-		log.GlobalLogger.Error("读取目录", dir, "报错:", err)
-	}
-	for _, file := range files {
-		if strings.HasSuffix(file.Name(), suffix) {
-			result = append(result, dir+file.Name())
-		}
-	}
-	// 根据文件名进行升序排序
-	sort.Slice(result, func(i, j int) bool {
-		return filepath.Base(result[i]) < filepath.Base(result[j])
-	})
-	return result
-}
-
 // RemoveDir 递归删除目录及其下的所有文件和子目录
 func RemoveDir(dirPath string) error {
 	// 打开目录

+ 40 - 0
aarch64/pjisuv/common/util/utils.go

@@ -0,0 +1,40 @@
+package util
+
+import (
+	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
+	"cicv-data-closedloop/kinglong/common/log"
+	"os/exec"
+	"strconv"
+	"strings"
+)
+
+// GetDiskUsagePercent 获取磁盘使用率
+func GetDiskUsagePercent() float64 {
+	// 执行 df 命令获取磁盘使用情况
+	cmd := exec.Command("df", "--total")
+	output, err := cmd.Output()
+	if err != nil {
+		log.GlobalLogger.Info("执行命令失败:", err)
+		return 0.0
+	}
+
+	// 解析 df 命令输出,计算磁盘占比
+	lines := strings.Split(string(output), "\n")
+	for _, line := range lines[1:] {
+		fields := strings.Fields(line)
+		if len(fields) >= 6 && fields[0] == "total" {
+			//filesystem := fields[0]
+			total, _ := strconv.ParseFloat(strings.TrimSuffix(fields[1], "G"), 64)
+			used, _ := strconv.ParseFloat(strings.TrimSuffix(fields[2], "G"), 64)
+			usedPercent := (used / total) * 100
+
+			//fmt.Printf("文件系统 %s 已使用 %.2f%%\n", filesystem, usedPercent)
+			return usedPercent
+		}
+	}
+	return 0.0
+}
+
+func GetCopyDir(faultTime string) string {
+	return commonConfig.CloudConfig.BagCopyDir + faultTime + "/"
+}

+ 15 - 15
pjisuv/control/main/control.go → aarch64/pjisuv/control/main/control.go

@@ -1,8 +1,8 @@
 package main
 
 import (
+	"cicv-data-closedloop/aarch64/pjisuv/common/config"
 	"cicv-data-closedloop/common/util"
-	"cicv-data-closedloop/kinglong/common/cfg"
 	"cicv-data-closedloop/kinglong/common/log"
 	commonService "cicv-data-closedloop/kinglong/common/svc"
 	"net/rpc"
@@ -16,11 +16,11 @@ func init() {
 	// 初始化日志配置
 	log.InitLogConfig("kinglong-control")
 	// 初始化本地配置文件(第1处配置,在本地文件)
-	cfg.InitLocalConfig()
+	config.InitLocalConfig()
 	// 初始化Oss连接信息
-	cfg.InitOssConfig()
+	config.InitOssConfig()
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
-	cfg.InitCloudConfig()
+	config.InitCloudConfig()
 }
 
 func main() {
@@ -29,7 +29,7 @@ func main() {
 	for {
 		time.Sleep(time.Duration(2) * time.Second)
 		// 1 获取当前设备的任务的 status
-		status, err := cfg.GetStatus(cfg.PlatformConfig.TaskConfigId)
+		status, err := config.GetStatus(config.PlatformConfig.TaskConfigId)
 		if err != nil {
 			log.GlobalLogger.Error("获取配置status失败:", err)
 			continue
@@ -47,27 +47,27 @@ func main() {
 			}
 			// 3 发送rpc信号杀死两个服务,并重启程序
 			if lastStatus == "NONE" && status == "CHANGE" {
-				if _, err := util.ExecuteWithPath(cfg.LocalConfig.RestartCmd.Dir, cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args...); err != nil {
-					log.GlobalLogger.Info("启动新程序失败,【path】=", cfg.LocalConfig.RestartCmd.Dir, "【cmd】=", cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args, ":", err)
+				if _, err := util.ExecuteWithPath(config.LocalConfig.RestartCmd.Dir, config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args...); err != nil {
+					log.GlobalLogger.Info("启动新程序失败,【path】=", config.LocalConfig.RestartCmd.Dir, "【cmd】=", config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args, ":", err)
 					os.Exit(-1)
 				}
-				log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", cfg.LocalConfig.RestartCmd.Dir, "【cmd】=", cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args)
+				log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", config.LocalConfig.RestartCmd.Dir, "【cmd】=", config.LocalConfig.RestartCmd.Name, config.LocalConfig.RestartCmd.Args)
 				lastStatus = status
 				log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-				cfg.InitPlatformConfig()
+				config.InitPlatformConfig()
 				continue
 			}
 			var killArgs *commonService.KillSignal
 			if lastStatus == "UN_CHANGE" && status == "CHANGE" {
-				killArgs = &commonService.KillSignal{NodeName: cfg.LocalConfig.Node.Name, DropUploadData: cfg.PlatformConfig.DropUploadData, Restart: true}
-				log.GlobalLogger.Info("更新任务,发送rpc重启信号到本地"+cfg.LocalConfig.Node.Name+":", killArgs)
+				killArgs = &commonService.KillSignal{NodeName: config.LocalConfig.Node.Name, DropUploadData: config.PlatformConfig.DropUploadData, Restart: true}
+				log.GlobalLogger.Info("更新任务,发送rpc重启信号到本地"+config.LocalConfig.Node.Name+":", killArgs)
 			}
 			if lastStatus == "UN_CHANGE" && status == "NONE" {
-				killArgs = &commonService.KillSignal{NodeName: cfg.LocalConfig.Node.Name, DropUploadData: cfg.PlatformConfig.DropUploadData, Restart: false}
-				log.GlobalLogger.Info("杀死任务,发送rpc结束信号到本地"+cfg.LocalConfig.Node.Name+":", killArgs)
+				killArgs = &commonService.KillSignal{NodeName: config.LocalConfig.Node.Name, DropUploadData: config.PlatformConfig.DropUploadData, Restart: false}
+				log.GlobalLogger.Info("杀死任务,发送rpc结束信号到本地"+config.LocalConfig.Node.Name+":", killArgs)
 			}
 
-			KillRpcClient, err := rpc.Dial("tcp", cfg.LocalConfig.Node.Ip+":"+cfg.CloudConfig.RpcPort)
+			KillRpcClient, err := rpc.Dial("tcp", config.LocalConfig.Node.Ip+":"+config.CloudConfig.RpcPort)
 			if err != nil {
 				log.GlobalLogger.Error("创建rpc客户端连接master失败:", err)
 				// 此处关闭client会报错
@@ -83,7 +83,7 @@ func main() {
 			}
 			lastStatus = status
 			log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-			cfg.InitPlatformConfig()
+			config.InitPlatformConfig()
 			KillRpcClient.Close()
 		} else {
 			log.GlobalLogger.Error("未知的采集任务状态。【status】=", status)

+ 7 - 7
pjisuv/master/main/master.go → aarch64/pjisuv/master/main/master.go

@@ -1,18 +1,18 @@
 package main
 
 import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	commonInit "cicv-data-closedloop/kinglong/common/init"
-	"cicv-data-closedloop/kinglong/common/log"
-	commonService "cicv-data-closedloop/kinglong/common/svc"
-	masterConfig "cicv-data-closedloop/kinglong/master/pkg/cfg"
-	masterService "cicv-data-closedloop/kinglong/master/pkg/svc"
+	commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
+	commonInit "cicv-data-closedloop/aarch64/pjisuv/common/init"
+	commonService "cicv-data-closedloop/aarch64/pjisuv/common/service"
+	masterConfig "cicv-data-closedloop/aarch64/pjisuv/master/package/config"
+	masterService "cicv-data-closedloop/aarch64/pjisuv/master/package/service"
+	"cicv-data-closedloop/common/config/c_log"
 )
 
 func init() {
 	// 初始化日志配置
 	//runtime.GOMAXPROCS(1)
-	log.InitLogConfig("kinglong-master")
+	c_log.InitLog("/mnt/media/sda1/cicv-data-closedloop/", "pjisuv-master")
 	commonInit.Init()
 	// 初始化TCP连接,用于发送时间窗口到从节点
 	masterConfig.InitTcpConfig()

+ 1 - 1
pjisuv/master/pkg/cfg/master_tcp_cfg.go → aarch64/pjisuv/master/package/config/master_tcp_config.go

@@ -1,4 +1,4 @@
-package cfg
+package config
 
 import (
 	"cicv-data-closedloop/kinglong/common/log"

+ 1 - 1
pjisuv/master/pkg/cfg/master_trigger_cfg.go → aarch64/pjisuv/master/package/config/master_trigger_config.go

@@ -1,4 +1,4 @@
-package cfg
+package config
 
 import (
 	"cicv-data-closedloop/kinglong/common/cfg"

+ 0 - 0
pjisuv/master/pkg/svc/move_bag_and_send_window.go → aarch64/pjisuv/master/package/service/move_bag_and_send_window.go


+ 0 - 0
pjisuv/master/pkg/svc/produce_window.go → aarch64/pjisuv/master/package/service/produce_window.go


+ 38 - 0
aarch64/pjisuv/slave/main/slave.go

@@ -0,0 +1,38 @@
+package main
+
+import (
+	commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
+	commonInit "cicv-data-closedloop/aarch64/pjisuv/common/init"
+	commonService "cicv-data-closedloop/aarch64/pjisuv/common/service"
+	slaveConfig "cicv-data-closedloop/aarch64/pjisuv/slave/package/config"
+	slaveService "cicv-data-closedloop/aarch64/pjisuv/slave/package/service"
+	"cicv-data-closedloop/common/config/c_log"
+)
+
+// init 初始化函数
+func init() {
+	//runtime.GOMAXPROCS(1)
+	c_log.InitLog("/mnt/media/sda1/cicv-data-closedloop/", "pji-slave")
+	commonInit.Init()
+	slaveConfig.InitTcpListener()
+	commonConfig.InitKillSignalListener(commonConfig.CloudConfig.Hosts[1].Ip)
+	// 等待重启,接收到重启信号,会把信号分发给以下channel
+	go commonService.WaitKillSelf()
+}
+
+// main 主函数
+func main() {
+
+	// 1 负责打包数据到data目录
+	go commonService.BagRecord(commonConfig.CloudConfig.Hosts[1].Name)
+	// 2 负责监控故障,并修改timeWindow
+	go slaveService.PrepareTimeWindowProducerQueue()
+	// 3
+	go slaveService.RunTimeWindowProducerQueue()
+	// 4 排队运行时间窗口
+	go commonService.RunTimeWindowConsumerQueue(commonConfig.CloudConfig.Hosts[1].Name)
+
+	// 阻塞主线程,等待其他线程执行。
+	select {}
+
+}

+ 0 - 0
pjisuv/slave/pkg/cfg/slave_tcp_config.go → aarch64/pjisuv/slave/package/config/slave_tcp_config.go


+ 0 - 0
pjisuv/slave/pkg/svc/accept_window.go → aarch64/pjisuv/slave/package/service/accept_window.go


+ 0 - 0
pjisuv/slave/pkg/svc/move_bag.go → aarch64/pjisuv/slave/package/service/move_bag.go


+ 46 - 0
common/domain/d_service.go

@@ -0,0 +1,46 @@
+package domain
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
+	"strings"
+	"time"
+)
+
+func MoveFromDataToCopy(faultTime string, bagDataDir string, sourceBag string, bagCopyDir string) {
+	dir := GetCopyDir(bagCopyDir, faultTime)
+	_ = util.CreateDir(dir)
+	targetBag := strings.Replace(sourceBag, bagDataDir, dir, 1)
+	var copyCommand []string
+	copyCommand = append(copyCommand, sourceBag)
+	copyCommand = append(copyCommand, targetBag)
+	_, _, _ = util.Execute("mv", copyCommand...)
+}
+
+func GetCopyDir(bagDataDir string, faultTime string) string {
+	return bagDataDir + faultTime + "/"
+}
+
+// SupplyCopyBags 如果 Copy目录下的包不够,则补充一些
+func SupplyCopyBags(bagDataDir string, bagCopyDir string, currentTimeWindow entity.TimeWindow) {
+	// 如果bag包没有达到length,补充几个
+	copyBags, _ := util.ListAbsolutePathWithSuffixAndSort(GetCopyDir(bagCopyDir, currentTimeWindow.FaultTime), ".bag")
+	copyBagsLength := len(copyBags)
+	if copyBagsLength < currentTimeWindow.Length {
+		time.Sleep(time.Duration(copyBagsLength) * time.Second)
+		dataBags, _ := util.ListAbsolutePathWithSuffixAndSort(bagDataDir, ".bag")
+		gap := currentTimeWindow.Length - copyBagsLength
+		c_log.GlobalLogger.Info("故障 ", currentTimeWindow.FaultTime, "需要补充 ", gap, " 个 bag 包")
+		for _, bag := range dataBags {
+			bagTime := util.GetBagTime(bag)
+			if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.FaultTime) {
+				MoveFromDataToCopy(currentTimeWindow.FaultTime, bagDataDir, bag, bagCopyDir)
+				gap = gap - 1
+				if gap == 0 {
+					break
+				}
+			}
+		}
+	}
+}

+ 83 - 0
common/entity/time_window.go

@@ -0,0 +1,83 @@
+package entity
+
+import (
+	"sync"
+	"time"
+)
+
+var (
+	TimeWindowProducerQueue      []TimeWindow
+	TimeWindowProducerQueueMutex sync.RWMutex
+
+	TimeWindowConsumerQueue      []TimeWindow
+	TimeWindowConsumerQueueMutex sync.RWMutex
+
+	Subscriber0Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber0TimeMutex sync.Mutex
+	Subscriber2Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber2TimeMutex sync.Mutex
+	Subscriber3Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	Subscriber3TimeMutex sync.Mutex
+
+	TcpSendTime      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
+	TcpSendTimeMutex sync.Mutex
+)
+
+type TimeWindow struct {
+	FaultTime       string   `json:"FaultTime"`
+	TimeWindowBegin string   `json:"TimeWindowBegin"`
+	TimeWindowEnd   string   `json:"TimeWindowEnd"`
+	Labels          []string `json:"Labels"`
+	TriggerIds      []string `json:"TriggerIds"`
+	Length          int      `json:"Length"`
+	CanUpload       string   `json:"CanUpload"`
+	MasterTopics    []string `json:"MasterTopics"`
+	SlaveTopics     []string `json:"SlaveTopics"`
+}
+
+func RefreshTcpSendTime() {
+	TcpSendTimeMutex.Lock()
+	TcpSendTime = time.Now()
+	TcpSendTimeMutex.Unlock()
+}
+
+func AddTimeWindowToTimeWindowProducerQueue(window TimeWindow) {
+	TimeWindowProducerQueueMutex.RLock()
+	{
+		TimeWindowProducerQueue = append(TimeWindowProducerQueue, window)
+	}
+	TimeWindowProducerQueueMutex.RUnlock()
+}
+
+func AddTimeWindowToTimeWindowConsumerQueue(window TimeWindow) {
+	TimeWindowConsumerQueueMutex.RLock()
+	{
+		TimeWindowConsumerQueue = append(TimeWindowConsumerQueue, window)
+	}
+	TimeWindowConsumerQueueMutex.RUnlock()
+}
+
+func RemoveHeadOfdTimeWindowProducerQueue() {
+	TimeWindowProducerQueueMutex.RLock()
+	{
+		TimeWindowProducerQueue = TimeWindowProducerQueue[1:]
+	}
+	TimeWindowProducerQueueMutex.RUnlock()
+}
+
+func RemoveHeaOfdTimeWindowConsumerQueue() {
+	TimeWindowConsumerQueueMutex.RLock()
+	{
+		TimeWindowConsumerQueue = TimeWindowConsumerQueue[1:]
+	}
+	TimeWindowConsumerQueueMutex.RUnlock()
+}
+
+// GetLastTimeWindow 获取最后一个时间窗口
+func GetLastTimeWindow() *TimeWindow {
+	var lastTimeWindow *TimeWindow // 获取最后一个时间窗口
+	if len(TimeWindowProducerQueue) > 0 {
+		lastTimeWindow = &TimeWindowProducerQueue[len(TimeWindowProducerQueue)-1]
+	}
+	return lastTimeWindow
+}

+ 54 - 0
common/util/u_io.go

@@ -4,6 +4,8 @@ import (
 	"io"
 	"os"
 	"path/filepath"
+	"sort"
+	"strings"
 )
 
 func CreateDir(directory string) error {
@@ -105,3 +107,55 @@ func ReadFile(filePath string) (string, error) {
 	}
 	return string(content), err
 }
+
+func ListAbsolutePathWithSuffixAndSort(dir string, suffix string) ([]string, error) {
+	var result []string
+	if !strings.HasSuffix(dir, "/") {
+		dir = dir + "/"
+	}
+	files, err := os.ReadDir(dir)
+	if err != nil {
+		return nil, err
+	}
+	for _, file := range files {
+		if strings.HasSuffix(file.Name(), suffix) {
+			result = append(result, dir+file.Name())
+		}
+	}
+	// 根据文件名进行升序排序
+	sort.Slice(result, func(i, j int) bool {
+		return filepath.Base(result[i]) < filepath.Base(result[j])
+	})
+	return result, nil
+}
+
+func DeleteFile(path string) error {
+	// 检查文件是否存在
+	if _, err := os.Stat(path); err == nil {
+		// 文件存在,执行删除操作
+		err = os.Remove(path)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+func ListAbsolutePathAndSort(dir string) ([]string, error) {
+	var result []string
+	if !strings.HasSuffix(dir, "/") {
+		dir = dir + "/"
+	}
+	files, err := os.ReadDir(dir)
+	if err != nil {
+		return result, err
+	}
+	for _, file := range files {
+		result = append(result, dir+file.Name())
+	}
+
+	// 根据文件名进行升序排序
+	sort.Slice(result, func(i, j int) bool {
+		return filepath.Base(result[i]) < filepath.Base(result[j])
+	})
+	return result, nil
+}

+ 0 - 0
common/util/util_oss.go → common/util/u_oss.go


+ 11 - 0
common/util/u_slice.go

@@ -0,0 +1,11 @@
+package util
+
+// AppendIfNotExists 向切片中追加元素,如果元素已存在则不添加
+func AppendIfNotExists(slice []string, element string) []string {
+	for _, item := range slice {
+		if item == element {
+			return slice // 元素已存在,直接返回原切片
+		}
+	}
+	return append(slice, element) // 元素不存在,追加到切片末尾
+}

+ 43 - 1
common/util/u_time.go

@@ -1,9 +1,51 @@
 package util
 
-import "time"
+import (
+	"strconv"
+	"strings"
+	"time"
+)
 
 func GetNowTimeCustom() string {
 	currentTime := time.Now()
 	formattedTime := currentTime.Format("2006-01-02-15-04-05")
 	return formattedTime
 }
+func TimeCustom1GreaterEqualThanTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
+	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
+	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
+	return timeInt1 >= timeInt2
+}
+func GetBagTime(bagName string) string {
+	s1 := strings.Split(bagName, "_")[0]
+	s1Split := strings.Split(s1, "/")
+	s2 := s1Split[len(s1Split)-1]
+	return s2
+}
+func TimeCustomChange(originalTimeStr string, number int) string {
+	var newTimeStr string
+	layout := "2006-01-02-15-04-05"
+	originalTime, err := time.Parse(layout, originalTimeStr)
+	if err != nil {
+		return newTimeStr
+	}
+	newTime := originalTime.Add(time.Duration(number) * time.Second)
+	return newTime.Format(layout)
+}
+func CalculateDifferenceOfTimeCustom(timeCustom1 string, timeCustom2 string) int {
+	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
+	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
+	return timeInt2 - timeInt1 + 1
+
+}
+func TimeCustom1GreaterTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
+	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
+	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
+	return timeInt1 > timeInt2
+}
+
+func TimeCustom1LessEqualThanTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
+	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
+	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
+	return timeInt1 <= timeInt2
+}

+ 2 - 3
kinglong/common/svc/kill_self.go

@@ -4,7 +4,6 @@ import (
 	"cicv-data-closedloop/kinglong/common/cfg"
 	"cicv-data-closedloop/kinglong/common/log"
 	"cicv-data-closedloop/kinglong/common/util"
-	commonConfig "cicv-data-closedloop/pji/common/cfg"
 	"net/rpc"
 	"os"
 	"sync"
@@ -103,9 +102,9 @@ func killDone(restart bool) {
 		time.Sleep(time.Duration(1) * time.Second)
 		if KillChannel == KillTimes {
 			if restart {
-				_, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...)
+				_, err := util.ExecuteWithPath(cfg.LocalConfig.RestartCmd.Dir, cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args...)
 				if err != nil {
-					log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
+					log.GlobalLogger.Info("启动新程序失败,【path】=", cfg.LocalConfig.RestartCmd.Dir, "【cmd】=", cfg.LocalConfig.RestartCmd.Name, cfg.LocalConfig.RestartCmd.Args, ":", err)
 					os.Exit(-1)
 				}
 				log.GlobalLogger.Info("数据采集任务更新,正常退出当前程序。")

+ 0 - 21
pjisuv/common/config/kill_rpc_server_cfg.go

@@ -1,21 +0,0 @@
-package cfg
-
-import (
-	"cicv-data-closedloop/kinglong/common/log"
-	"net"
-	"os"
-)
-
-var KillSignalListener net.Listener
-
-func InitKillSignalListener(serverIp string) {
-	var err error
-	log.GlobalLogger.Info("初始化RPC端口监听Kill信号 - 开始。")
-	socket := serverIp + ":" + CloudConfig.RpcPort
-	KillSignalListener, err = net.Listen("tcp", socket)
-	if err != nil {
-		log.GlobalLogger.Error("监听rpc端口失败:", err)
-		os.Exit(-1)
-	}
-	log.GlobalLogger.Info("初始化RPC端口监听Kill信号 - 成功:", socket)
-}

+ 0 - 24
pjisuv/common/config/ros_cfg.go

@@ -1,24 +0,0 @@
-package cfg
-
-import (
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"cicv-data-closedloop/kinglong/common/log"
-	"github.com/bluenviron/goroslib/v2"
-	"os"
-)
-
-var RosNode *goroslib.Node
-
-func InitRosConfig() {
-	log.GlobalLogger.Info("初始化RosNode - 开始")
-	var err error
-	RosNode, err = goroslib.NewNode(goroslib.NodeConf{
-		Name:          "node" + cutil.GetNowTimeCustom(),
-		MasterAddress: CloudConfig.Ros.MasterAddress,
-	})
-	if err != nil {
-		log.GlobalLogger.Error("初始化RosNode - 失败:", err)
-		os.Exit(-1)
-	}
-	log.GlobalLogger.Info("初始化RosNode - 成功:", CloudConfig.Ros.MasterAddress)
-}

+ 0 - 13
pjisuv/common/entity/time_window.go

@@ -1,13 +0,0 @@
-package ent
-
-type TimeWindow struct {
-	FaultTime       string   `json:"FaultTime"`
-	TimeWindowBegin string   `json:"TimeWindowBegin"`
-	TimeWindowEnd   string   `json:"TimeWindowEnd"`
-	Labels          []string `json:"Labels"`
-	TriggerIds      []string `json:"TriggerIds"`
-	Length          int      `json:"Length"`
-	CanUpload       string   `json:"CanUpload"`
-	MasterTopics    []string `json:"MasterTopics"`
-	SlaveTopics     []string `json:"SlaveTopics"`
-}

+ 0 - 64
pjisuv/common/log/log_cfg.go

@@ -1,64 +0,0 @@
-package log
-
-import (
-	"cicv-data-closedloop/common/util"
-	"fmt"
-	"github.com/sirupsen/logrus"
-	"os"
-	"path/filepath"
-	"runtime"
-	"time"
-)
-
-var GlobalLogger *logrus.Logger
-
-//var MonitorLogger *logrus.Logger
-
-// InitLogConfig 初始化日志配置
-func InitLogConfig(prefix string) {
-	initGlobalLogger(prefix)
-	//initMonitorLogger()
-}
-
-// initGlobalLogger 初始化日志配置
-func initGlobalLogger(prefix string) {
-	time.Sleep(time.Duration(1) * time.Second)
-	logPath := "./log/" + prefix + "-" + time.Now().Format("2006-01-02-15-04-05") + ".log"
-	err := util.CreateParentDir(logPath)
-	// 创建、追加、读写,777,所有权限
-	f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm)
-	if err != nil {
-		os.Exit(-1)
-	}
-	GlobalLogger = logrus.New()
-	GlobalLogger.SetOutput(f)
-	GlobalLogger.SetReportCaller(true) // 开启行号显示
-	GlobalLogger.SetFormatter(&logrus.JSONFormatter{
-		CallerPrettyfier: func(frame *runtime.Frame) (string, string) {
-			fileName := filepath.Base(frame.File)
-			return "", fmt.Sprintf("%s:%d", fileName, frame.Line)
-		},
-	})
-	GlobalLogger.Info("初始化GlobalLogger - 成功")
-
-}
-
-//// initMonitorLogger 初始化日志配置
-//func initMonitorLogger() {
-//	time.Sleep(time.Duration(1) * time.Second)
-//	// 创建、追加、读写,777,所有权限
-//	f, err := os.OpenFile("monitor-"+time.Now().Format("2006-01-02-15-04-05")+".log", os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm)
-//	if err != nil {
-//		os.Exit(-1)
-//	}
-//	MonitorLogger = logrus.New()
-//	MonitorLogger.SetOutput(f)
-//	MonitorLogger.SetReportCaller(true) // 开启行号显示
-//	MonitorLogger.SetFormatter(&logrus.JSONFormatter{
-//		CallerPrettyfier: func(frame *runtime.Frame) (string, string) {
-//			fileName := filepath.Base(frame.File)
-//			return "", fmt.Sprintf("%s:%d", fileName, frame.Line)
-//		},
-//	})
-//	MonitorLogger.Info("初始化MonitorLogger - 成功")
-//}

+ 0 - 17
pjisuv/common/util/move_bag.go

@@ -1,17 +0,0 @@
-package util
-
-import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/cutil"
-	"strings"
-)
-
-func MoveFromDataToCopy(faultTime string, sourceBag string) {
-	dir := GetCopyDir(faultTime)
-	cutil.CreateDir(dir)
-	targetBag := strings.Replace(sourceBag, commonConfig.CloudConfig.BagDataDir, dir, 1)
-	var copyCommand []string
-	copyCommand = append(copyCommand, sourceBag)
-	copyCommand = append(copyCommand, targetBag)
-	Execute("mv", copyCommand...)
-}

+ 0 - 232
pjisuv/common/util/utils.go

@@ -1,232 +0,0 @@
-package util
-
-import (
-	commonConfig "cicv-data-closedloop/kinglong/common/cfg"
-	"cicv-data-closedloop/kinglong/common/ent"
-	"cicv-data-closedloop/kinglong/common/global"
-	"cicv-data-closedloop/kinglong/common/log"
-	"fmt"
-	"os"
-	"os/exec"
-	"path/filepath"
-	"sort"
-	"strconv"
-	"strings"
-	"time"
-)
-
-// AppendIfNotExists 向切片中追加元素,如果元素已存在则不添加
-func AppendIfNotExists(slice []string, element string) []string {
-	for _, item := range slice {
-		if item == element {
-			return slice // 元素已存在,直接返回原切片
-		}
-	}
-	return append(slice, element) // 元素不存在,追加到切片末尾
-}
-
-func AddTimeWindowToTimeWindowProducerQueue(window ent.TimeWindow) {
-	global.TimeWindowProducerQueueMutex.RLock()
-	{
-		global.TimeWindowProducerQueue = append(global.TimeWindowProducerQueue, window)
-	}
-	global.TimeWindowProducerQueueMutex.RUnlock()
-}
-
-func AddTimeWindowToTimeWindowConsumerQueue(window ent.TimeWindow) {
-	global.TimeWindowConsumerQueueMutex.RLock()
-	{
-		global.TimeWindowConsumerQueue = append(global.TimeWindowConsumerQueue, window)
-	}
-	global.TimeWindowConsumerQueueMutex.RUnlock()
-}
-
-func RemoveHeadOfdTimeWindowProducerQueue() {
-	global.TimeWindowProducerQueueMutex.RLock()
-	{
-		global.TimeWindowProducerQueue = global.TimeWindowProducerQueue[1:]
-	}
-	global.TimeWindowProducerQueueMutex.RUnlock()
-}
-
-func RemoveHeaOfdTimeWindowConsumerQueue() {
-	global.TimeWindowConsumerQueueMutex.RLock()
-	{
-		global.TimeWindowConsumerQueue = global.TimeWindowConsumerQueue[1:]
-	}
-	global.TimeWindowConsumerQueueMutex.RUnlock()
-}
-
-func GetBagTime(bagName string) string {
-	s1 := strings.Split(bagName, "_")[0]
-	s1Split := strings.Split(s1, "/")
-	s2 := s1Split[len(s1Split)-1]
-	return s2
-}
-
-func GetNowTimeCustom() string {
-	currentTime := time.Now()
-	formattedTime := currentTime.Format("2006-01-02-15-04-05")
-	return formattedTime
-}
-
-func TimeCustomChange(originalTimeStr string, number int) string {
-	var newTimeStr string
-	// 解析时间字符串
-	layout := "2006-01-02-15-04-05"
-	originalTime, err := time.Parse(layout, originalTimeStr)
-	if err != nil {
-		log.GlobalLogger.Info("无法解析时间字符串:", err)
-		return newTimeStr
-	}
-
-	// 减少1秒
-	newTime := originalTime.Add(time.Duration(number) * time.Second)
-
-	// 格式化新的时间为指定字符串格式
-	return newTime.Format(layout)
-}
-
-func CalculateDifferenceOfTimeCustom(timeCustom1 string, timeCustom2 string) int {
-	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
-	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
-	return timeInt2 - timeInt1 + 1
-
-}
-func TimeCustom1GreaterTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
-	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
-	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
-	return timeInt1 > timeInt2
-}
-
-func TimeCustom1GreaterEqualThanTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
-	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
-	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
-	return timeInt1 >= timeInt2
-}
-
-func TimeCustom1LessEqualThanTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
-	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
-	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
-	return timeInt1 <= timeInt2
-}
-
-// GetDiskUsagePercent 获取磁盘使用率
-func GetDiskUsagePercent() float64 {
-	// 执行 df 命令获取磁盘使用情况
-	cmd := exec.Command("df", "--total")
-	output, err := cmd.Output()
-	if err != nil {
-		log.GlobalLogger.Info("执行命令失败:", err)
-		return 0.0
-	}
-
-	// 解析 df 命令输出,计算磁盘占比
-	lines := strings.Split(string(output), "\n")
-	for _, line := range lines[1:] {
-		fields := strings.Fields(line)
-		if len(fields) >= 6 && fields[0] == "total" {
-			//filesystem := fields[0]
-			total, _ := strconv.ParseFloat(strings.TrimSuffix(fields[1], "G"), 64)
-			used, _ := strconv.ParseFloat(strings.TrimSuffix(fields[2], "G"), 64)
-			usedPercent := (used / total) * 100
-
-			//fmt.Printf("文件系统 %s 已使用 %.2f%%\n", filesystem, usedPercent)
-			return usedPercent
-		}
-	}
-	return 0.0
-}
-
-func ListAbsolutePathAndSort(dir string) []string {
-	var result []string
-	if !strings.HasSuffix(dir, "/") {
-		dir = dir + "/"
-	}
-	files, err := os.ReadDir(dir)
-	if err != nil {
-		log.GlobalLogger.Info("获取文件列表失败:", err)
-		return result
-	}
-	for _, file := range files {
-		result = append(result, dir+file.Name())
-	}
-
-	// 根据文件名进行升序排序
-	sort.Slice(result, func(i, j int) bool {
-		return filepath.Base(result[i]) < filepath.Base(result[j])
-	})
-	return result
-}
-
-func GetCopyDir(faultTime string) string {
-	return commonConfig.CloudConfig.BagCopyDir + faultTime + "/"
-}
-
-func MergeSlice(slice1 []string, slice2 []string) []string {
-
-	// 遍历第二个切片中的元素,并去重追加到结果切片1中
-	for _, element := range slice2 {
-		found := false
-		for _, item := range slice1 {
-			if element == item {
-				found = true
-				break
-			}
-		}
-		if !found {
-			slice1 = append(slice1, element)
-		}
-	}
-	return slice1
-}
-
-func DeleteFile(path string) {
-	// 检查文件是否存在
-	if _, err := os.Stat(path); err == nil {
-		// 文件存在,执行删除操作
-		err := os.Remove(path)
-		if err != nil {
-			fmt.Printf("删除文件时发生错误:%s\n", err)
-			return
-		}
-	}
-}
-
-// GetLastTimeWindow 获取最后一个时间窗口
-func GetLastTimeWindow() *ent.TimeWindow {
-	var lastTimeWindow *ent.TimeWindow // 获取最后一个时间窗口
-	if len(global.TimeWindowProducerQueue) > 0 {
-		lastTimeWindow = &global.TimeWindowProducerQueue[len(global.TimeWindowProducerQueue)-1]
-	}
-	return lastTimeWindow
-}
-
-func RefreshTcpSendTime() {
-	global.TcpSendTimeMutex.Lock()
-	global.TcpSendTime = time.Now()
-	global.TcpSendTimeMutex.Unlock()
-}
-
-// SupplyCopyBags 如果 Copy目录下的包不够,则补充一些
-func SupplyCopyBags(currentTimeWindow ent.TimeWindow) {
-	// 如果bag包没有达到length,补充几个
-	copyBags := ListAbsolutePathWithSuffixAndSort(GetCopyDir(currentTimeWindow.FaultTime), ".bag")
-	copyBagsLength := len(copyBags)
-	if copyBagsLength < currentTimeWindow.Length {
-		time.Sleep(time.Duration(copyBagsLength) * time.Second)
-		dataBags := ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
-		gap := currentTimeWindow.Length - copyBagsLength
-		log.GlobalLogger.Info("故障 ", currentTimeWindow.FaultTime, "需要补充 ", gap, " 个 bag 包")
-		for _, bag := range dataBags {
-			bagTime := GetBagTime(bag)
-			if TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.FaultTime) {
-				MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
-				gap = gap - 1
-				if gap == 0 {
-					break
-				}
-			}
-		}
-	}
-}

+ 0 - 38
pjisuv/slave/main/slave.go

@@ -1,38 +0,0 @@
-package main
-
-import (
-	cfg2 "cicv-data-closedloop/kinglong/common/cfg"
-	commonInit "cicv-data-closedloop/kinglong/common/init"
-	"cicv-data-closedloop/kinglong/common/log"
-	svc3 "cicv-data-closedloop/kinglong/common/svc"
-	slaveConfig "cicv-data-closedloop/kinglong/slave/pkg/cfg"
-	svc4 "cicv-data-closedloop/kinglong/slave/pkg/svc"
-)
-
-// init 初始化函数
-func init() {
-	//runtime.GOMAXPROCS(1)
-	log.InitLogConfig("kinglong-slave")
-	commonInit.Init()
-	slaveConfig.InitTcpListener()
-	cfg2.InitKillSignalListener(cfg2.CloudConfig.Hosts[1].Ip)
-	// 等待重启,接收到重启信号,会把信号分发给以下channel
-	go svc3.WaitKillSelf()
-}
-
-// main 主函数
-func main() {
-
-	// 1 负责打包数据到data目录
-	go svc3.BagRecord(cfg2.CloudConfig.Hosts[1].Name)
-	// 2 负责监控故障,并修改timeWindow
-	go svc4.PrepareTimeWindowProducerQueue()
-	// 3
-	go svc4.RunTimeWindowProducerQueue()
-	// 4 排队运行时间窗口
-	go svc3.RunTimeWindowConsumerQueue(cfg2.CloudConfig.Hosts[1].Name)
-
-	// 阻塞主线程,等待其他线程执行。
-	select {}
-
-}