孟令鑫 1 سال پیش
والد
کامیت
a92fcea3bd

+ 16 - 13
aarch64/kinglong/common/config/c_cloud.go

@@ -9,43 +9,46 @@ import (
 	"time"
 )
 
-type Platform struct {
+type platform struct {
 	UrlDeviceAuth string `yaml:"url-device-auth"`
 	UrlTaskPoll   string `yaml:"url-task-poll"`
 	UrlTask       string `yaml:"url-task"`
 }
 
-type Host struct {
+type host struct {
 	Name   string   `yaml:"name"`
 	Ip     string   `yaml:"ip"`
 	Topics []string `yaml:"topics"`
 }
-type Ros struct {
+type ros struct {
 	MasterAddress string   `yaml:"master-address"`
 	Nodes         []string `yaml:"nodes"`
 }
 
+type disk struct {
+	Name string `yaml:"name"`
+	Used uint64 `yaml:"used"`
+}
+type trigger struct {
+	Label  string   `yaml:"label"`
+	Topics []string `yaml:"topics"`
+}
 type cloudConfig struct {
 	FullCollect           bool      `yaml:"full-collect"`
 	ConfigRefreshInterval int       `yaml:"config-refresh-interval"` // 配置刷新时间间隔
 	BagNumber             int       `yaml:"bag-number"`
 	TimeWindowSendGap     int       `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	DiskUsage             float64   `yaml:"disk-usage"`
 	TimeToLabelJsonPath   string    `yaml:"time-to-label-json-path"`
 	BagDataDir            string    `yaml:"bag-data-dir"`
 	BagCopyDir            string    `yaml:"bag-copy-dir"`
 	TriggersDir           string    `yaml:"triggers-dir"`
 	TcpPort               string    `yaml:"tcp-port"`
 	RpcPort               string    `yaml:"rpc-port"`
-	Triggers              []Trigger `yaml:"triggers"`
-	Hosts                 []Host    `yaml:"hosts"`
-	Ros                   Ros       `yaml:"ros"`
-	Platform              Platform  `yaml:"platform"`
-}
-
-type Trigger struct {
-	Label  string   `yaml:"label"`
-	Topics []string `yaml:"topics"`
+	Triggers              []trigger `yaml:"triggers"`
+	Hosts                 []host    `yaml:"hosts"`
+	Ros                   ros       `yaml:"ros"`
+	Platform              platform  `yaml:"platform"`
+	Disk                  disk      `yaml:"disk"`
 }
 
 var (

+ 3 - 1
aarch64/kinglong/common/config/yaml/cloud-config-v2253.yaml

@@ -7,7 +7,9 @@ platform:
 full-collect: true
 bag-number: 120
 config-refresh-interval: 60
-disk-usage: 90
+disk:
+  name: /dev/vdb # 磁盘名称
+  used: 5242880 # 磁盘占用阈值,单位bytes
 bag-data-dir: /mnt/media/sda1/rosbag-handle/data/
 bag-copy-dir: /mnt/media/sda1/rosbag-handle/copy/
 triggers-dir: /mnt/media/sda1/rosbag-handle/triggers/

+ 3 - 1
aarch64/kinglong/common/config/yaml/cloud-config-v2304.yaml

@@ -7,7 +7,9 @@ platform:
 full-collect: true
 bag-number: 120
 config-refresh-interval: 60
-disk-usage: 90
+disk:
+  name: /dev/vdb # 磁盘名称
+  used: 5242880 # 磁盘占用阈值,单位bytes
 bag-data-dir: /mnt/media/sda1/rosbag-handle/data/
 bag-copy-dir: /mnt/media/sda1/rosbag-handle/copy/
 triggers-dir: /mnt/media/sda1/rosbag-handle/triggers/

+ 9 - 9
aarch64/kinglong/common/service/disk_clean.go

@@ -30,11 +30,10 @@ func DiskClean() {
 			"LRU":  "保留高优先级",
 		}
 		// 1 获取磁盘占用
-		percent, _ := util.GetDiskUsagePercent()
-		if percent > commonConfig.CloudConfig.DiskUsage {
-			// 2 获取策略
+		diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name)
+		if diskUsed > commonConfig.CloudConfig.Disk.Used {
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
-			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
+			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.Disk.Used, policyToDescription[policy])
 			if policy == "TTL" {
 				// 1 获取时间窗口队列中的第二个
 				if len(entity.TimeWindowConsumerQueue) > 2 {
@@ -62,17 +61,18 @@ func DiskClean() {
 
 func deleteTimeWindow(indexToRemove int) {
 	timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
-	// 删除文件
+	// 1 删除队列中的窗口。使用切片的特性删除指定位置的元素
+	entity.TimeWindowConsumerQueueMutex.Lock()
+	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
+	entity.TimeWindowConsumerQueueMutex.Unlock()
+	// 2 删除该窗口对应的文件目录。
 	faultTime := timeWindowToRemove.FaultTime
 	dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime)
 	err := util.RemoveDir(dir)
 	if err != nil {
 		c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
 	}
-	entity.TimeWindowConsumerQueueMutex.Lock()
-	// 使用切片的特性删除指定位置的元素
-	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
-	entity.TimeWindowConsumerQueueMutex.Unlock()
+
 }
 
 func getIndexToRemoveForLRU() int {

+ 17 - 13
aarch64/pji/common/config/c_cloud.go

@@ -9,41 +9,45 @@ import (
 	"time"
 )
 
-type Platform struct {
+type platform struct {
 	UrlDeviceAuth string `yaml:"url-device-auth"`
 	UrlTaskPoll   string `yaml:"url-task-poll"`
 	UrlTask       string `yaml:"url-task"`
 }
 
-type Host struct {
+type host struct {
 	Name   string   `yaml:"name"`
 	Ip     string   `yaml:"ip"`
 	Topics []string `yaml:"topics"`
 }
-type Ros struct {
+type ros struct {
 	MasterAddress string   `yaml:"master-address"`
 	Nodes         []string `yaml:"nodes"`
 }
 
+type disk struct {
+	Name string `yaml:"name"`
+	Used uint64 `yaml:"used"`
+}
+type trigger struct {
+	Label  string   `yaml:"label"`
+	Topics []string `yaml:"topics"`
+}
+
 type cloudConfig struct {
 	FullCollect           bool      `yaml:"full-collect"`
 	ConfigRefreshInterval int       `yaml:"config-refresh-interval"` // 配置刷新时间间隔
 	BagNumber             int       `yaml:"bag-number"`
 	TimeWindowSendGap     int       `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	DiskUsage             float64   `yaml:"disk-usage"`
 	BagDataDir            string    `yaml:"bag-data-dir"`
 	BagCopyDir            string    `yaml:"bag-copy-dir"`
 	TriggersDir           string    `yaml:"triggers-dir"`
 	RpcPort               string    `yaml:"rpc-port"`
-	Triggers              []Trigger `yaml:"triggers"`
-	Hosts                 []Host    `yaml:"hosts"`
-	Ros                   Ros       `yaml:"ros"`
-	Platform              Platform  `yaml:"platform"`
-}
-
-type Trigger struct {
-	Label  string   `yaml:"label"`
-	Topics []string `yaml:"topics"`
+	Triggers              []trigger `yaml:"triggers"`
+	Hosts                 []host    `yaml:"hosts"`
+	Ros                   ros       `yaml:"ros"`
+	Platform              platform  `yaml:"platform"`
+	Disk                  disk      `yaml:"disk"`
 }
 
 var (

+ 8 - 6
aarch64/pji/common/config/yaml/cloud-config.yaml

@@ -6,7 +6,9 @@ platform:
 full-collect: false
 bag-number: 10
 config-refresh-interval: 60
-disk-usage: 90
+disk:
+  name: /dev/vdb # 磁盘名称
+  used: 5242880 # 磁盘占用阈值,单位bytes
 bag-data-dir: /root/rosbag-handle/data/
 bag-copy-dir: /root/rosbag-handle/copy/
 triggers-dir: /root/rosbag-handle/triggers/
@@ -47,12 +49,12 @@ hosts:
     topics:
       - /camera/color/image_raw
       - /camera/depth/points
-#      - /diagnostics
-#      - /locate_info
+      #      - /diagnostics
+      #      - /locate_info
       - /obstacle_detection
-#      - /odom
-#      - /move_base/global_costmap/costmap
-#      - /move_base/global_costmap/costmap_updates
+      #      - /odom
+      #      - /move_base/global_costmap/costmap
+      #      - /move_base/global_costmap/costmap_updates
       - /scan_map_icp_amcl_node/scan_point_transformed
 triggers:
   - label: detectfault

+ 0 - 33
aarch64/pji/common/init/common_init.go

@@ -1,33 +0,0 @@
-package init
-
-import (
-	"cicv-data-closedloop/aarch64/pji/common/config"
-	"cicv-data-closedloop/aarch64/pji/common/service"
-)
-
-func Init() {
-
-	// 初始化本地配置文件(第1处配置,在本地文件)
-	config.InitLocalConfig()
-
-	// 初始化Oss连接信息
-	config.InitOssConfig()
-
-	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
-	config.InitCloudConfig()
-
-	go config.RefreshCloudConfig()
-
-	// 初始化数据闭环平台的配置(第3处配置,在数据闭环平台接口)
-	config.InitPlatformConfig()
-
-	// 初始化ros节点
-	config.InitRosConfig()
-
-	// 维护data目录缓存的包数量
-	go service.BagCacheClean()
-
-	// 磁盘占用过高时根据缓存策略处理copy目录
-	go service.DiskClean()
-
-}

+ 8 - 8
aarch64/pji/common/service/disk_clean.go

@@ -27,10 +27,10 @@ func DiskClean() {
 	for {
 		time.Sleep(1000 * time.Millisecond)
 		// 1 获取磁盘占用
-		percent, _ := util.GetDiskUsagePercent()
-		if percent > commonConfig.CloudConfig.DiskUsage {
+		diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name)
+		if diskUsed > commonConfig.CloudConfig.Disk.Used {
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
-			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
+			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.Disk.Used, policyToDescription[policy])
 			// 2 获取策略
 			if policy == "TTL" {
 				// 1 获取时间窗口队列中的第二个
@@ -60,17 +60,17 @@ func DiskClean() {
 
 func deleteTimeWindow(indexToRemove int) {
 	timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
-	// 删除文件
+	// 1 删除队列中的窗口。使用切片的特性删除指定位置的元素
+	entity.TimeWindowConsumerQueueMutex.Lock()
+	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
+	entity.TimeWindowConsumerQueueMutex.Unlock()
+	// 2 删除该窗口对应的文件目录。
 	faultTime := timeWindowToRemove.FaultTime
 	dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime)
 	err := util.RemoveDir(dir)
 	if err != nil {
 		c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
 	}
-	entity.TimeWindowConsumerQueueMutex.Lock()
-	// 使用切片的特性删除指定位置的元素
-	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
-	entity.TimeWindowConsumerQueueMutex.Unlock()
 }
 
 func getIndexToRemoveForLRU() int {

+ 15 - 3
aarch64/pji/master/main/master.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	commonConfig "cicv-data-closedloop/aarch64/pji/common/config"
-	commonInit "cicv-data-closedloop/aarch64/pji/common/init"
 	commonService "cicv-data-closedloop/aarch64/pji/common/service"
 	masterConfig "cicv-data-closedloop/aarch64/pji/master/package/config"
 	masterService "cicv-data-closedloop/aarch64/pji/master/package/service"
@@ -15,10 +14,23 @@ func init() {
 	runtime.GOMAXPROCS(1)
 	// 初始化日志配置
 	c_log.InitLog("/root/rosbag-handle/log/", "pji-master")
-	commonInit.Init()
+	// 初始化本地配置文件(第1处配置,在本地文件)
+	commonConfig.InitLocalConfig()
+	// 初始化Oss连接信息
+	commonConfig.InitOssConfig()
+	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
+	commonConfig.InitCloudConfig()
+	go commonConfig.RefreshCloudConfig()
+	// 初始化数据闭环平台的配置(第3处配置,在数据闭环平台接口)
+	commonConfig.InitPlatformConfig()
+	// 初始化ros节点
+	commonConfig.InitRosConfig()
+	// 维护data目录缓存的包数量
+	go commonService.BagCacheClean()
+	// 磁盘占用过高时根据缓存策略处理copy目录
+	go commonService.DiskClean()
 	masterConfig.InitTriggerConfig()
 	commonConfig.InitKillSignalListener(commonConfig.CloudConfig.Hosts[0].Ip)
-
 	// 等待重启,接收到重启信号,会把信号分发给以下channel
 	go commonService.WaitKillSelf()
 	masterConfig.InitNacos()

+ 17 - 13
aarch64/pjisuv/common/config/c_cloud.go

@@ -9,43 +9,47 @@ import (
 	"time"
 )
 
-type Platform struct {
+type platform struct {
 	UrlDeviceAuth string `yaml:"url-device-auth"`
 	UrlTaskPoll   string `yaml:"url-task-poll"`
 	UrlTask       string `yaml:"url-task"`
 }
 
-type Host struct {
+type host struct {
 	Name   string   `yaml:"name"`
 	Ip     string   `yaml:"ip"`
 	Topics []string `yaml:"topics"`
 }
-type Ros struct {
+type ros struct {
 	MasterAddress string   `yaml:"master-address"`
 	Nodes         []string `yaml:"nodes"`
 }
+type disk struct {
+	Name string `yaml:"name"`
+	Used uint64 `yaml:"used"`
+}
+
+type trigger struct {
+	Label  string   `yaml:"label"`
+	Topics []string `yaml:"topics"`
+}
 
 type cloudConfig struct {
 	FullCollect           bool      `yaml:"full-collect"`
 	ConfigRefreshInterval int       `yaml:"config-refresh-interval"` // 配置刷新时间间隔
 	BagNumber             int       `yaml:"bag-number"`
 	TimeWindowSendGap     int       `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	DiskUsage             float64   `yaml:"disk-usage"`
 	TimeToLabelJsonPath   string    `yaml:"time-to-label-json-path"`
 	BagDataDir            string    `yaml:"bag-data-dir"`
 	BagCopyDir            string    `yaml:"bag-copy-dir"`
 	TriggersDir           string    `yaml:"triggers-dir"`
 	TcpPort               string    `yaml:"tcp-port"`
 	RpcPort               string    `yaml:"rpc-port"`
-	Triggers              []Trigger `yaml:"triggers"`
-	Hosts                 []Host    `yaml:"hosts"`
-	Ros                   Ros       `yaml:"ros"`
-	Platform              Platform  `yaml:"platform"`
-}
-
-type Trigger struct {
-	Label  string   `yaml:"label"`
-	Topics []string `yaml:"topics"`
+	Triggers              []trigger `yaml:"triggers"`
+	Hosts                 []host    `yaml:"hosts"`
+	Ros                   ros       `yaml:"ros"`
+	Platform              platform  `yaml:"platform"`
+	Disk                  disk      `yaml:"disk"`
 }
 
 var (

+ 3 - 1
aarch64/pjisuv/common/config/yaml/cloud-config.yaml

@@ -6,7 +6,9 @@ platform:
 full-collect: true
 bag-number: 120
 config-refresh-interval: 60
-disk-usage: 90
+disk:
+  name: /dev/vdb # 磁盘名称
+  used: 5242880 # 磁盘占用阈值,单位bytes
 bag-data-dir: /mnt/media/sda1/cicv-data-closedloop/data/
 bag-copy-dir: /mnt/media/sda1/cicv-data-closedloop/copy/
 time-to-label-json-path: /mnt/media/sda1/cicv-data-closedloop/timeToLabel.json

+ 8 - 9
aarch64/pjisuv/common/service/disk_clean.go

@@ -30,11 +30,10 @@ func DiskClean() {
 			"LRU":  "保留高优先级",
 		}
 		// 1 获取磁盘占用
-		percent, _ := util.GetDiskUsagePercent()
-		if percent > commonConfig.CloudConfig.DiskUsage {
-			// 2 获取策略
+		diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name)
+		if diskUsed > commonConfig.CloudConfig.Disk.Used {
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
-			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
+			c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.Disk.Used, policyToDescription[policy])
 			if policy == "TTL" {
 				// 1 获取时间窗口队列中的第二个
 				if len(entity.TimeWindowConsumerQueue) > 2 {
@@ -62,17 +61,17 @@ func DiskClean() {
 
 func deleteTimeWindow(indexToRemove int) {
 	timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
-	// 删除文件
+	// 1 删除队列中的窗口。使用切片的特性删除指定位置的元素
+	entity.TimeWindowConsumerQueueMutex.Lock()
+	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
+	entity.TimeWindowConsumerQueueMutex.Unlock()
+	// 2 删除该窗口对应的文件目录。
 	faultTime := timeWindowToRemove.FaultTime
 	dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime)
 	err := util.RemoveDir(dir)
 	if err != nil {
 		c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
 	}
-	entity.TimeWindowConsumerQueueMutex.Lock()
-	// 使用切片的特性删除指定位置的元素
-	entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
-	entity.TimeWindowConsumerQueueMutex.Unlock()
 }
 
 func getIndexToRemoveForLRU() int {

+ 19 - 0
common/util/u_disk.go

@@ -6,6 +6,25 @@ import (
 	"strings"
 )
 
+// GetDiskUsed 解析 df 命令的输出
+// df -B1 /dev/vdb
+// Filesystem        1B-blocks        Used    Available Use% Mounted on
+// /dev/vdb       527371075584 16390344704 484120408064   4% /mnt/disk001
+func GetDiskUsed(filesystem string) (uint64, error) {
+	cmd := exec.Command("df", "-B1", filesystem)
+	output, err := cmd.CombinedOutput()
+	if err != nil {
+		return 0, err
+	}
+	lines := strings.Split(string(output), "\n")
+	fields := strings.Fields(lines[1])
+	parseUint, err := strconv.ParseUint(fields[2], 10, 64)
+	if err != nil {
+		return 0, err
+	}
+	return parseUint, nil
+}
+
 // GetDiskUsagePercent 获取磁盘使用率
 func GetDiskUsagePercent() (float64, error) {
 	// 执行 df 命令获取磁盘使用情况