孟令鑫 1 年間 前
コミット
d4976061c1

+ 1 - 1
aarch64/pjisuv/common/service/rosbag_upload.go

@@ -42,7 +42,7 @@ outLoop:
 		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
 		// 1 获取即将处理的窗口
 		currentTimeWindow := entity.TimeWindowConsumerQueue[0]
-		entity.RemoveHeaOfdTimeWindowConsumerQueue()
+		entity.RemoveHeadOfTimeWindowConsumerQueue()
 		c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 
 		// 2 获取目录

+ 1 - 1
aarch64/pjisuv/master/package/service/move_bag_and_send_window.go

@@ -60,7 +60,7 @@ func RunTimeWindowProducerQueue() {
 			domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
 			entity.RefreshTcpSendTime()
 			go sendTimeWindowByTcp(currentTimeWindow)
-			entity.RemoveHeadOfdTimeWindowProducerQueue()
+			entity.RemoveHeadOfTimeWindowProducerQueue()
 			entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
 			// 获取copy目录下的字典json,key为触发时间,value为label
 			timeToLabelJson, _ := util.ReadFile(commonConfig.CloudConfig.TimeToLabelJsonPath)

+ 1 - 1
aarch64/pjisuv/slave/package/service/move_bag.go

@@ -32,7 +32,7 @@ func RunTimeWindowProducerQueue() {
 		if len(entity.TimeWindowProducerQueue) > 0 {
 			currentTimeWindow := entity.TimeWindowProducerQueue[0]
 			// 将时间窗口移出准备队列
-			entity.RemoveHeadOfdTimeWindowProducerQueue()
+			entity.RemoveHeadOfTimeWindowProducerQueue()
 			if currentTimeWindow.CanUpload == "yes" {
 				c_log.GlobalLogger.Info("从节点接收到可上传的timeWindow")
 			}

+ 2 - 2
common/entity/time_window.go

@@ -58,7 +58,7 @@ func AddTimeWindowToTimeWindowConsumerQueue(window TimeWindow) {
 	TimeWindowConsumerQueueMutex.RUnlock()
 }
 
-func RemoveHeadOfdTimeWindowProducerQueue() {
+func RemoveHeadOfTimeWindowProducerQueue() {
 	TimeWindowProducerQueueMutex.RLock()
 	{
 		TimeWindowProducerQueue = TimeWindowProducerQueue[1:]
@@ -66,7 +66,7 @@ func RemoveHeadOfdTimeWindowProducerQueue() {
 	TimeWindowProducerQueueMutex.RUnlock()
 }
 
-func RemoveHeaOfdTimeWindowConsumerQueue() {
+func RemoveHeadOfTimeWindowConsumerQueue() {
 	TimeWindowConsumerQueueMutex.RLock()
 	{
 		TimeWindowConsumerQueue = TimeWindowConsumerQueue[1:]

+ 0 - 13
pji/common/ent/time_window.go

@@ -1,13 +0,0 @@
-package ent
-
-type TimeWindow struct {
-	FaultTime       string   `json:"FaultTime"`
-	TimeWindowBegin string   `json:"TimeWindowBegin"`
-	TimeWindowEnd   string   `json:"TimeWindowEnd"`
-	Labels          []string `json:"Labels"`
-	TriggerIds      []string `json:"TriggerIds"`
-	Length          int      `json:"Length"`
-	CanUpload       string   `json:"CanUpload"`
-	MasterTopics    []string `json:"MasterTopics"`
-	SlaveTopics     []string `json:"SlaveTopics"`
-}

+ 0 - 25
pji/common/global/global.go

@@ -1,25 +0,0 @@
-package global
-
-import (
-	"cicv-data-closedloop/pji/common/ent"
-	"sync"
-	"time"
-)
-
-var (
-	TimeWindowProducerQueue      []ent.TimeWindow
-	TimeWindowProducerQueueMutex sync.RWMutex
-
-	TimeWindowConsumerQueue      []ent.TimeWindow
-	TimeWindowConsumerQueueMutex sync.RWMutex
-
-	Subscriber0Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber0TimeMutex sync.Mutex
-	Subscriber2Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber2TimeMutex sync.Mutex
-	Subscriber3Time      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	Subscriber3TimeMutex sync.Mutex
-
-	TcpSendTime      = time.Date(2023, time.November, 23, 10, 30, 0, 0, time.UTC)
-	TcpSendTimeMutex sync.Mutex
-)

+ 14 - 13
pji/common/svc/disk_clean.go

@@ -2,10 +2,11 @@ package svc
 
 import (
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	"cicv-data-closedloop/kinglong/common/cfg"
 	commonConfig "cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/global"
-	"cicv-data-closedloop/pji/common/util"
 	masterConfig "cicv-data-closedloop/pji/master/package/cfg"
 	"time"
 )
@@ -27,24 +28,24 @@ func DiskClean() {
 	for {
 		time.Sleep(1000 * time.Millisecond)
 		// 1 获取磁盘占用
-		percent := util.GetDiskUsagePercent()
+		percent, _ := util.GetDiskUsagePercent()
 		if percent > commonConfig.CloudConfig.DiskUsage {
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
 			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
 			// 2 获取策略
 			if policy == "TTL" {
 				// 1 获取时间窗口队列中的第二个
-				if len(global.TimeWindowConsumerQueue) > 2 {
+				if len(entity.TimeWindowConsumerQueue) > 2 {
 					deleteTimeWindow(1)
 				}
 			} else if policy == "STOP" {
 				// 2 获取时间窗口队列中的倒数第一个
-				if len(global.TimeWindowConsumerQueue) > 2 {
-					deleteTimeWindow(len(global.TimeWindowConsumerQueue) - 1)
+				if len(entity.TimeWindowConsumerQueue) > 2 {
+					deleteTimeWindow(len(entity.TimeWindowConsumerQueue) - 1)
 				}
 			} else if policy == "LRU" {
 				// 3 获取优先级最低的时间窗口
-				if len(global.TimeWindowConsumerQueue) > 2 {
+				if len(entity.TimeWindowConsumerQueue) > 2 {
 					indexToRemove := getIndexToRemoveForLRU()
 					if indexToRemove != -1 {
 						deleteTimeWindow(indexToRemove)
@@ -59,25 +60,25 @@ func DiskClean() {
 }
 
 func deleteTimeWindow(indexToRemove int) {
-	timeWindowToRemove := global.TimeWindowConsumerQueue[indexToRemove]
+	timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
 	// 删除文件
 	faultTime := timeWindowToRemove.FaultTime
-	dir := util.GetCopyDir(faultTime)
+	dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime)
 	err := util.RemoveDir(dir)
 	if err != nil {
 		c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
 	}
-	global.TimeWindowConsumerQueueMutex.Lock()
+	entity.TimeWindowConsumerQueueMutex.Lock()
 	// 使用切片的特性删除指定位置的元素
-	global.TimeWindowConsumerQueue = append(global.TimeWindowConsumerQueue[:indexToRemove], global.TimeWindowConsumerQueue[indexToRemove+1:]...)
-	global.TimeWindowConsumerQueueMutex.Unlock()
+	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
+	entity.TimeWindowConsumerQueueMutex.Unlock()
 }
 
 func getIndexToRemoveForLRU() int {
 	lru := cfg.PlatformConfig.Lru
 	i := len(lru) - 1
 	for i >= 0 {
-		for i2, window := range global.TimeWindowConsumerQueue {
+		for i2, window := range entity.TimeWindowConsumerQueue {
 			for _, label := range window.Labels {
 				if masterConfig.LabelMapTriggerId[label] == lru[i] {
 					return i2

+ 1 - 1
pji/common/svc/kill_self.go

@@ -2,8 +2,8 @@ package svc
 
 import (
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	commonConfig "cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/util"
 	"net/rpc"
 	"os"
 	"sync"

+ 2 - 2
pji/common/svc/rosbag_clean.go

@@ -2,8 +2,8 @@ package svc
 
 import (
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/util"
 	"cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/util"
 	"time"
 )
 
@@ -29,7 +29,7 @@ func BagCacheClean() {
 		if len(bags) > cfg.CloudConfig.BagNumber {
 			diff := len(bags) - cfg.CloudConfig.BagNumber
 			for i := 0; i < diff; i++ {
-				util.DeleteFile(bags[i])
+				_ = util.DeleteFile(bags[i])
 			}
 		}
 	}

+ 11 - 11
pji/common/svc/rosbag_upload.go

@@ -2,10 +2,10 @@ package svc
 
 import (
 	"cicv-data-closedloop/common/config/c_log"
-	commonUtil "cicv-data-closedloop/common/util"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	commonConfig "cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/global"
-	"cicv-data-closedloop/pji/common/util"
 	masterConfig "cicv-data-closedloop/pji/master/package/cfg"
 	"fmt"
 	"os"
@@ -23,7 +23,7 @@ outLoop:
 		case signal := <-ChannelKillConsume:
 			if signal == 1 {
 				ChannelKillConsume <- 1
-				if len(global.TimeWindowConsumerQueue) == 0 {
+				if len(entity.TimeWindowConsumerQueue) == 0 {
 					AddKillTimes("5")
 					return
 				}
@@ -36,17 +36,17 @@ outLoop:
 		// 每一秒扫一次
 		time.Sleep(time.Duration(1) * time.Second)
 
-		waitLength := len(global.TimeWindowConsumerQueue)
+		waitLength := len(entity.TimeWindowConsumerQueue)
 		if waitLength == 0 {
 			continue outLoop
 		}
-		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(global.TimeWindowConsumerQueue))
+		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
 		// 1 获取即将处理的窗口
-		currentTimeWindow := global.TimeWindowConsumerQueue[0]
-		util.RemoveHeaOfdTimeWindowConsumerQueue()
+		currentTimeWindow := entity.TimeWindowConsumerQueue[0]
+		entity.RemoveHeadOfTimeWindowConsumerQueue()
 		c_log.GlobalLogger.Infof("开始处理窗口,【Lable】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		// 2 获取目录
-		dir := util.GetCopyDir(currentTimeWindow.FaultTime)
+		dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, currentTimeWindow.FaultTime)
 		bags, _ := util.ListAbsolutePathWithSuffixAndSort(dir, ".bag")
 		bagNumber := len(bags)
 		if bagNumber > currentTimeWindow.Length {
@@ -70,7 +70,7 @@ outLoop:
 				oldName := bag
 				newName := bag + "_filter"
 				filterCommand := []string{"filter", oldName, newName, "\"" + strings.Join(topicsFilterSlice, " or ") + "\""}
-				_, output, err := commonUtil.ExecuteWithEnvSync(os.Environ(), "rosbag", filterCommand...)
+				_, output, err := util.ExecuteWithEnvSync(os.Environ(), "rosbag", filterCommand...)
 				c_log.GlobalLogger.Info("正在过滤中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
 				if err != nil {
 					c_log.GlobalLogger.Errorf("filter命令执行出错【命令】=%v,【输出】=%v,【err】=%v", filterCommand, output, err)
@@ -92,7 +92,7 @@ outLoop:
 			oldName := bag
 			compressCommand := []string{"compress", "--bz2", oldName}
 			c_log.GlobalLogger.Info("正在压缩中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。")
-			if _, output, err := commonUtil.ExecuteWithEnvSync(os.Environ(), "rosbag", compressCommand...); err != nil {
+			if _, output, err := util.ExecuteWithEnvSync(os.Environ(), "rosbag", compressCommand...); err != nil {
 				c_log.GlobalLogger.Errorf("compress命令执行出错【命令】=%v,【输出】=%v,【err】=%v", compressCommand, output, err)
 				continue
 			}

+ 0 - 17
pji/common/util/move_bag.go

@@ -1,17 +0,0 @@
-package util
-
-import (
-	commonConfig "cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/cutil"
-	"strings"
-)
-
-func MoveFromDataToCopy(faultTime string, sourceBag string) {
-	dir := GetCopyDir(faultTime)
-	cutil.CreateDir(dir)
-	targetBag := strings.Replace(sourceBag, commonConfig.CloudConfig.BagDataDir, dir, 1)
-	var copyCommand []string
-	copyCommand = append(copyCommand, sourceBag)
-	copyCommand = append(copyCommand, targetBag)
-	Execute("mv", copyCommand...)
-}

+ 0 - 29
pji/common/util/util_exec.go

@@ -1,29 +0,0 @@
-package util
-
-import (
-	"os/exec"
-)
-
-func ExecuteWithPath(path string, name string, arg ...string) (*exec.Cmd, error) {
-	// 创建一个Cmd对象,表示要执行的命令
-	cmd := exec.Command(name, arg...)
-	// 指定目录
-	cmd.Dir = path
-	err := cmd.Start()
-	if err != nil {
-		return nil, err
-	} else {
-		// 执行命令并等待它完成d
-		return cmd, nil
-	}
-
-}
-
-func Execute(name string, arg ...string) (*exec.Cmd, string, error) {
-	cmd := exec.Command(name, arg...)
-	combinedOutput, err := cmd.CombinedOutput()
-	if err != nil {
-		return nil, "", err
-	}
-	return cmd, string(combinedOutput), nil
-}

+ 0 - 66
pji/common/util/util_io.go

@@ -1,66 +0,0 @@
-package util
-
-import (
-	"os"
-	"path/filepath"
-	"sort"
-	"strings"
-)
-
-func ListAbsolutePathWithSuffixAndSort(dir string, suffix string) ([]string, error) {
-	var result []string
-	if !strings.HasSuffix(dir, "/") {
-		dir = dir + "/"
-	}
-	files, err := os.ReadDir(dir)
-	if err != nil {
-		return nil, err
-	}
-	for _, file := range files {
-		if strings.HasSuffix(file.Name(), suffix) {
-			result = append(result, dir+file.Name())
-		}
-	}
-	// 根据文件名进行升序排序
-	sort.Slice(result, func(i, j int) bool {
-		return filepath.Base(result[i]) < filepath.Base(result[j])
-	})
-	return result, nil
-}
-
-// RemoveDir 递归删除目录及其下的所有文件和子目录
-func RemoveDir(dirPath string) error {
-	// 打开目录
-	dir, err := os.Open(dirPath)
-	if err != nil {
-		return err
-	}
-	defer dir.Close()
-	// 读取目录下的文件和子目录
-	fileInfos, err := dir.Readdir(-1)
-	if err != nil {
-		return err
-	}
-	// 遍历文件和子目录
-	for _, fileInfo := range fileInfos {
-		path := filepath.Join(dirPath, fileInfo.Name())
-
-		if fileInfo.IsDir() {
-			// 如果是子目录,递归调用removeDir删除子目录及其下的文件和子目录
-			RemoveDir(path)
-
-		} else {
-			// 如果是文件,直接删除文件
-			err = os.Remove(path)
-			if err != nil {
-				return err
-			}
-		}
-	}
-	// 删除目录本身
-	err = os.Remove(dirPath)
-	if err != nil {
-		return err
-	}
-	return nil
-}

+ 0 - 13
pji/common/util/util_json.go

@@ -1,13 +0,0 @@
-package util
-
-import (
-	"encoding/json"
-)
-
-func MapToJsonString(inputMap map[string]interface{}) (string, error) {
-	jsonBytes, err := json.Marshal(inputMap)
-	if err != nil {
-		return "", err
-	}
-	return string(jsonBytes), nil
-}

+ 0 - 224
pji/common/util/utils.go

@@ -1,224 +0,0 @@
-package util
-
-import (
-	"cicv-data-closedloop/kinglong/common/log"
-	"cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/ent"
-	"cicv-data-closedloop/pji/common/global"
-	"fmt"
-	"os"
-	"os/exec"
-	"path/filepath"
-	"sort"
-	"strconv"
-	"strings"
-	"time"
-)
-
-// AppendIfNotExists 向切片中追加元素,如果元素已存在则不添加
-func AppendIfNotExists(slice []string, element string) []string {
-	for _, item := range slice {
-		if item == element {
-			return slice // 元素已存在,直接返回原切片
-		}
-	}
-	return append(slice, element) // 元素不存在,追加到切片末尾
-}
-
-func AddTimeWindowToTimeWindowProducerQueue(window ent.TimeWindow) {
-	global.TimeWindowProducerQueueMutex.RLock()
-	{
-		global.TimeWindowProducerQueue = append(global.TimeWindowProducerQueue, window)
-	}
-	global.TimeWindowProducerQueueMutex.RUnlock()
-}
-
-func AddTimeWindowToTimeWindowConsumerQueue(window ent.TimeWindow) {
-	global.TimeWindowConsumerQueueMutex.RLock()
-	{
-		global.TimeWindowConsumerQueue = append(global.TimeWindowConsumerQueue, window)
-	}
-	global.TimeWindowConsumerQueueMutex.RUnlock()
-}
-
-func RemoveHeaOfdTimeWindowProducerQueue() {
-	global.TimeWindowProducerQueueMutex.RLock()
-	{
-		global.TimeWindowProducerQueue = global.TimeWindowProducerQueue[1:]
-	}
-	global.TimeWindowProducerQueueMutex.RUnlock()
-}
-
-func RemoveHeaOfdTimeWindowConsumerQueue() {
-	global.TimeWindowConsumerQueueMutex.RLock()
-	{
-		global.TimeWindowConsumerQueue = global.TimeWindowConsumerQueue[1:]
-	}
-	global.TimeWindowConsumerQueueMutex.RUnlock()
-}
-
-func GetBagTime(bagName string) string {
-	s1 := strings.Split(bagName, "_")[0]
-	s1Split := strings.Split(s1, "/")
-	s2 := s1Split[len(s1Split)-1]
-	return s2
-}
-
-func TimeCustomChange(originalTimeStr string, number int) string {
-	var newTimeStr string
-	// 解析时间字符串
-	layout := "2006-01-02-15-04-05"
-	originalTime, err := time.Parse(layout, originalTimeStr)
-	if err != nil {
-		log.GlobalLogger.Info("无法解析时间字符串:", err)
-		return newTimeStr
-	}
-
-	// 减少1秒
-	newTime := originalTime.Add(time.Duration(number) * time.Second)
-
-	// 格式化新的时间为指定字符串格式
-	return newTime.Format(layout)
-}
-
-func CalculateDifferenceOfTimeCustom(timeCustom1 string, timeCustom2 string) int {
-	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
-	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
-	return timeInt2 - timeInt1 + 1
-
-}
-func TimeCustom1GreaterTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
-	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
-	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
-	return timeInt1 > timeInt2
-}
-
-func TimeCustom1GreaterEqualThanTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
-	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
-	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
-	return timeInt1 >= timeInt2
-}
-
-func TimeCustom1LessEqualThanTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
-	timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
-	timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
-	return timeInt1 <= timeInt2
-}
-
-func Hex(num int) string {
-	return fmt.Sprintf("0x%x", num)
-}
-
-// GetDiskUsagePercent 获取磁盘使用率
-func GetDiskUsagePercent() float64 {
-	// 执行 df 命令获取磁盘使用情况
-	cmd := exec.Command("df", "--total")
-	output, err := cmd.Output()
-	if err != nil {
-		log.GlobalLogger.Info("执行命令失败:", err)
-		return 0.0
-	}
-
-	// 解析 df 命令输出,计算磁盘占比
-	lines := strings.Split(string(output), "\n")
-	for _, line := range lines[1:] {
-		fields := strings.Fields(line)
-		if len(fields) >= 6 && fields[0] == "total" {
-			//filesystem := fields[0]
-			total, _ := strconv.ParseFloat(strings.TrimSuffix(fields[1], "G"), 64)
-			used, _ := strconv.ParseFloat(strings.TrimSuffix(fields[2], "G"), 64)
-			usedPercent := (used / total) * 100
-
-			//fmt.Printf("文件系统 %s 已使用 %.2f%%\n", filesystem, usedPercent)
-			return usedPercent
-		}
-	}
-	return 0.0
-}
-
-func ListAbsolutePathAndSort(dir string) []string {
-	var result []string
-	if !strings.HasSuffix(dir, "/") {
-		dir = dir + "/"
-	}
-	files, err := os.ReadDir(dir)
-	if err != nil {
-		log.GlobalLogger.Info("获取文件列表失败:", err)
-		return result
-	}
-	for _, file := range files {
-		result = append(result, dir+file.Name())
-	}
-
-	// 根据文件名进行升序排序
-	sort.Slice(result, func(i, j int) bool {
-		return filepath.Base(result[i]) < filepath.Base(result[j])
-	})
-	return result
-}
-
-func GetCopyDir(faultTime string) string {
-	return cfg.CloudConfig.BagCopyDir + faultTime + "/"
-}
-
-func MergeSlice(slice1 []string, slice2 []string) []string {
-
-	// 遍历第二个切片中的元素,并去重追加到结果切片1中
-	for _, element := range slice2 {
-		found := false
-		for _, item := range slice1 {
-			if element == item {
-				found = true
-				break
-			}
-		}
-		if !found {
-			slice1 = append(slice1, element)
-		}
-	}
-	return slice1
-}
-
-func DeleteFile(path string) {
-	// 检查文件是否存在
-	if _, err := os.Stat(path); err == nil {
-		// 文件存在,执行删除操作
-		err := os.Remove(path)
-		if err != nil {
-			fmt.Printf("删除文件时发生错误:%s\n", err)
-			return
-		}
-	}
-}
-
-// GetLastTimeWindow 获取最后一个时间窗口
-func GetLastTimeWindow() *ent.TimeWindow {
-	var lastTimeWindow *ent.TimeWindow // 获取最后一个时间窗口
-	if len(global.TimeWindowProducerQueue) > 0 {
-		lastTimeWindow = &global.TimeWindowProducerQueue[len(global.TimeWindowProducerQueue)-1]
-	}
-	return lastTimeWindow
-}
-
-// SupplyCopyBags 如果 Copy目录下的包不够,则补充一些
-func SupplyCopyBags(currentTimeWindow ent.TimeWindow) {
-	// 如果bag包没有达到length,补充几个
-	copyBags, _ := ListAbsolutePathWithSuffixAndSort(GetCopyDir(currentTimeWindow.FaultTime), ".bag")
-	copyBagsLength := len(copyBags)
-	if copyBagsLength < currentTimeWindow.Length {
-		time.Sleep(time.Duration(copyBagsLength) * time.Second)
-		dataBags, _ := ListAbsolutePathWithSuffixAndSort(cfg.CloudConfig.BagDataDir, ".bag")
-		gap := currentTimeWindow.Length - copyBagsLength
-		log.GlobalLogger.Info("故障 ", currentTimeWindow.FaultTime, "需要补充 ", gap, " 个 bag 包")
-		for _, bag := range dataBags {
-			bagTime := GetBagTime(bag)
-			if TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.FaultTime) {
-				MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
-				gap = gap - 1
-				if gap == 0 {
-					break
-				}
-			}
-		}
-	}
-}

+ 15 - 14
pji/master/package/svc/move_bag_and_send_window.go

@@ -2,10 +2,11 @@ package svc
 
 import (
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/domain"
+	"cicv-data-closedloop/common/entity"
+	"cicv-data-closedloop/common/util"
 	commonConfig "cicv-data-closedloop/pji/common/cfg"
-	"cicv-data-closedloop/pji/common/global"
 	commonService "cicv-data-closedloop/pji/common/svc"
-	util2 "cicv-data-closedloop/pji/common/util"
 	"time"
 )
 
@@ -18,7 +19,7 @@ func RunTimeWindowProducerQueue() {
 		case signal := <-commonService.ChannelKillMove:
 			if signal == 1 {
 				commonService.ChannelKillMove <- 1
-				if len(global.TimeWindowProducerQueue) == 0 {
+				if len(entity.TimeWindowProducerQueue) == 0 {
 					commonService.AddKillTimes("4")
 					return
 				}
@@ -31,22 +32,22 @@ func RunTimeWindowProducerQueue() {
 
 		// 处理
 		time.Sleep(time.Duration(1) * time.Second)
-		if len(global.TimeWindowProducerQueue) > 0 {
-			bags, _ := util2.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
-			currentTimeWindow := global.TimeWindowProducerQueue[0]
+		if len(entity.TimeWindowProducerQueue) > 0 {
+			bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagDataDir, ".bag")
+			currentTimeWindow := entity.TimeWindowProducerQueue[0]
 			move := false
 			bigger := false
 			for _, bag := range bags {
-				bagTime := util2.GetBagTime(bag)
+				bagTime := util.GetBagTime(bag)
 				// 2 如果bag不小于timeWindowBegin不大于timeWindowEnd,则移动
-				compare1 := util2.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
-				compare2 := util2.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
+				compare1 := util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin)
+				compare2 := util.TimeCustom1LessEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowEnd)
 				if compare1 && compare2 {
 					// 将bag包移动到Copy目录
-					util2.MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
+					domain.MoveFromDataToCopy(currentTimeWindow.FaultTime, commonConfig.CloudConfig.BagDataDir, bag, commonConfig.CloudConfig.BagCopyDir)
 					move = true
 				} else {
-					if util2.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
+					if util.TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.TimeWindowBegin) {
 						// 必须已经生成了窗口之后的包才算窗口结束了
 						bigger = true
 						break
@@ -55,11 +56,11 @@ func RunTimeWindowProducerQueue() {
 			}
 			// 如果没有包可以供当前窗口移动,且已经生成了更新的包,则当前窗口已经可以上传
 			if !move && bigger {
-				util2.SupplyCopyBags(currentTimeWindow)
+				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
 				// 将时间窗口移出准备队列
-				util2.RemoveHeaOfdTimeWindowProducerQueue()
+				entity.RemoveHeadOfTimeWindowProducerQueue()
 				// 将时间窗口加入运行队列
-				util2.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
+				entity.AddTimeWindowToTimeWindowConsumerQueue(currentTimeWindow)
 				continue
 			}
 		}

+ 18 - 24
pji/master/package/svc/produce_window.go

@@ -2,12 +2,10 @@ package svc
 
 import (
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/entity"
 	commonUtil "cicv-data-closedloop/common/util"
 	commonConfig "cicv-data-closedloop/pji/common/cfg"
-	commonEntity "cicv-data-closedloop/pji/common/ent"
-	"cicv-data-closedloop/pji/common/global"
 	commonService "cicv-data-closedloop/pji/common/svc"
-	"cicv-data-closedloop/pji/common/util"
 	masterConfig "cicv-data-closedloop/pji/master/package/cfg"
 	"github.com/bluenviron/goroslib/v2"
 	"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
@@ -38,7 +36,7 @@ func PrepareTimeWindowProducerQueue() {
 					if time.Since(subscribersTimes[i]).Seconds() > 1 {
 						subscribersMutexes[i].Lock()
 						faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
-						lastTimeWindow := util.GetLastTimeWindow()       // 获取最后一个时间窗口
+						lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
 						var faultLabel string
 						for _, f := range masterConfig.RuleOfObstacleDetection {
 							faultLabel = f(data)
@@ -70,45 +68,45 @@ func PrepareTimeWindowProducerQueue() {
 	}
 }
 
-func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *commonEntity.TimeWindow) {
+func saveTimeWindow(faultLabel string, faultHappenTime string, lastTimeWindow *entity.TimeWindow) {
 	masterTopics, slaveTopics := getTopicsOfNode(faultLabel)
-	if lastTimeWindow == nil || util.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
+	if lastTimeWindow == nil || commonUtil.TimeCustom1GreaterTimeCustom2(faultHappenTime, lastTimeWindow.TimeWindowEnd) {
 		// 2-1 如果是不在旧故障窗口内,添加一个新窗口
-		newTimeWindow := commonEntity.TimeWindow{
+		newTimeWindow := entity.TimeWindow{
 			FaultTime:       faultHappenTime,
-			TimeWindowBegin: util.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
-			TimeWindowEnd:   util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
+			TimeWindowBegin: commonUtil.TimeCustomChange(faultHappenTime, -commonConfig.PlatformConfig.TaskBeforeTime),
+			TimeWindowEnd:   commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime),
 			Length:          commonConfig.PlatformConfig.TaskBeforeTime + commonConfig.PlatformConfig.TaskAfterTime + 1,
 			Labels:          []string{faultLabel},
 			MasterTopics:    masterTopics,
 			SlaveTopics:     slaveTopics,
 		}
 		c_log.GlobalLogger.Infof("不在旧故障窗口内,向生产者队列添加一个新窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", newTimeWindow.Labels, newTimeWindow.FaultTime, newTimeWindow.Length)
-		util.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
+		entity.AddTimeWindowToTimeWindowProducerQueue(newTimeWindow)
 	} else {
 		// 2-2 如果在旧故障窗口内
-		global.TimeWindowProducerQueueMutex.RLock()
-		defer global.TimeWindowProducerQueueMutex.RUnlock()
+		entity.TimeWindowProducerQueueMutex.RLock()
+		defer entity.TimeWindowProducerQueueMutex.RUnlock()
 		// 2-2-1 更新故障窗口end时间
-		maxEnd := util.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
-		expectEnd := util.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
-		if util.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
+		maxEnd := commonUtil.TimeCustomChange(lastTimeWindow.TimeWindowBegin, commonConfig.PlatformConfig.TaskMaxTime)
+		expectEnd := commonUtil.TimeCustomChange(faultHappenTime, commonConfig.PlatformConfig.TaskAfterTime)
+		if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, maxEnd) {
 			lastTimeWindow.TimeWindowEnd = maxEnd
 			lastTimeWindow.Length = commonConfig.PlatformConfig.TaskMaxTime
 		} else {
-			if util.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
+			if commonUtil.TimeCustom1GreaterTimeCustom2(expectEnd, lastTimeWindow.TimeWindowEnd) {
 				lastTimeWindow.TimeWindowEnd = expectEnd
-				lastTimeWindow.Length = util.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
+				lastTimeWindow.Length = commonUtil.CalculateDifferenceOfTimeCustom(lastTimeWindow.TimeWindowBegin, expectEnd)
 			}
 		}
 		// 2-2-2 更新label
 		labels := lastTimeWindow.Labels
-		lastTimeWindow.Labels = util.AppendIfNotExists(labels, faultLabel)
+		lastTimeWindow.Labels = commonUtil.AppendIfNotExists(labels, faultLabel)
 		// 2-2-3 更新 topic
 		sourceMasterTopics := lastTimeWindow.MasterTopics
-		lastTimeWindow.MasterTopics = util.MergeSlice(sourceMasterTopics, masterTopics)
+		lastTimeWindow.MasterTopics = commonUtil.MergeSlice(sourceMasterTopics, masterTopics)
 		sourceSlaveTopics := lastTimeWindow.SlaveTopics
-		lastTimeWindow.SlaveTopics = util.MergeSlice(sourceSlaveTopics, slaveTopics)
+		lastTimeWindow.SlaveTopics = commonUtil.MergeSlice(sourceSlaveTopics, slaveTopics)
 		c_log.GlobalLogger.Infof("在旧故障窗口内,更新生产者队列最新的窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", lastTimeWindow.Labels, lastTimeWindow.FaultTime, lastTimeWindow.Length)
 	}
 }
@@ -123,7 +121,3 @@ func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []st
 	}
 	return faultCodeTopics, nil
 }
-
-func initCallbackFunc() {
-
-}