Переглянути джерело

Merge remote-tracking branch 'origin/master'

LingxinMeng 5 місяців тому
батько
коміт
bcef97688e
29 змінених файлів з 1345 додано та 288 видалено
  1. 67 33
      aarch64/pjibot_delivery/common/config/c_cloud.go
  2. 1 0
      aarch64/pjibot_delivery/common/config/c_local.go
  3. 17 0
      aarch64/pjibot_delivery/common/config/c_platform.go
  4. 225 0
      aarch64/pjibot_delivery/common/config/c_websocket.go
  5. 3 1
      aarch64/pjibot_delivery/common/service/disk_clean.go
  6. 24 0
      aarch64/pjibot_delivery/common/service/rosbag_upload.go
  7. 216 67
      aarch64/pjibot_delivery/control/main.go
  8. 45 0
      aarch64/pjibot_delivery/control/pkg/judge_cloud.go
  9. 44 0
      aarch64/pjibot_delivery/control/pkg/judge_local.go
  10. 1 0
      aarch64/pjibot_delivery/master/package/service/move_bag_and_send_window.go
  11. 97 53
      aarch64/pjibot_delivery/master/package/service/produce_window.go
  12. 10 0
      aarch64/pjibot_delivery/配送机器人默认配置文件-cloud-config.yaml
  13. 2 0
      aarch64/pjibot_delivery/配送机器人默认配置文件-local-config.yaml
  14. 25 21
      aarch64/pjibot_guide/common/config/c_cloud.go
  15. 9 9
      aarch64/pjibot_guide/common/config/c_websocket.go
  16. 60 16
      aarch64/pjibot_guide/control/main.go
  17. 39 34
      aarch64/pjibot_patrol/common/config/c_cloud.go
  18. 1 0
      aarch64/pjibot_patrol/common/config/c_local.go
  19. 225 0
      aarch64/pjibot_patrol/common/config/c_websocket.go
  20. 3 1
      aarch64/pjibot_patrol/common/service/disk_clean.go
  21. 24 0
      aarch64/pjibot_patrol/common/service/rosbag_upload.go
  22. 3 0
      aarch64/pjibot_patrol/control/main.go
  23. 1 0
      aarch64/pjibot_patrol/master/package/service/move_bag_and_send_window.go
  24. 97 53
      aarch64/pjibot_patrol/master/package/service/produce_window.go
  25. 5 0
      aarch64/pjibot_patrol/巡检机器人默认配置文件-cloud-config.yaml
  26. 2 0
      aarch64/pjibot_patrol/巡检机器人默认配置文件-local-config.yaml
  27. 0 0
      tools/pji/pji_api/main/main.go
  28. 99 0
      tools/pji/pji_time/main/main.go
  29. 0 0
      tools/pji/pji_websocket/main/main.go

+ 67 - 33
aarch64/pjibot_delivery/common/config/c_cloud.go

@@ -52,6 +52,10 @@ type trigger struct {
 	Topics []string `yaml:"topics"`
 }
 
+type CollectNumPlusStruct struct {
+	Url string `yaml:"url"`
+}
+
 type CollectLimitStruct struct {
 	Url   string `yaml:"url"`
 	Flag  int    `yaml:"flag"`
@@ -61,43 +65,54 @@ type CollectLimitStruct struct {
 	Year  int    `yaml:"year"`
 }
 
-type cloudConfig struct {
-	CollectLimit          CollectLimitStruct `yaml:"collect-limit"`
-	HasOneMsgTopic        bool               `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
-	FullCollect           bool               `yaml:"full-collect"`
-	ConfigRefreshInterval int                `yaml:"config-refresh-interval"` // 配置刷新时间间隔
-	BagNumber             int                `yaml:"bag-number"`
-	TimeWindowSendGap     int                `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	BagDataDir            string             `yaml:"bag-data-dir"`
-	BagCopyDir            string             `yaml:"bag-copy-dir"`
-	TriggersDir           string             `yaml:"triggers-dir"`
-	RpcPort               string             `yaml:"rpc-port"`
-	Triggers              []trigger          `yaml:"triggers"`
-	Hosts                 []hostStruct       `yaml:"hosts"`
-	Ros                   ros                `yaml:"ros"`
-	Platform              platform           `yaml:"platform"`
-	Disk                  disk               `yaml:"disk"`
-	Monitor               MonitorStruct      `yaml:"monitor"`
+type CollectWindowStruct struct {
+	Flag      int      `yaml:"flag"`
+	Days      []string `yaml:"days,omitempty"`
+	Start     string   `yaml:"start_time"`
+	End       string   `yaml:"end_time"`
+	StartTime time.Time
+	EndTime   time.Time
 }
 
-// Request 结构体定义
-type Request struct {
-	Type      string      `json:"type"`
-	UUID      string      `json:"uuid"`
-	CommandID string      `json:"commandId"`
-	Parameter interface{} `json:"parameter"`
+type cloudConfig struct {
+	CollectLimit          CollectLimitStruct   `yaml:"collect-limit"`
+	CollectWindow         CollectWindowStruct  `yaml:"collect-window"`
+	CollectNumPlus        CollectNumPlusStruct `yaml:"collect-num-plus"`
+	HasOneMsgTopic        bool                 `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
+	FullCollect           bool                 `yaml:"full-collect"`
+	ConfigRefreshInterval int                  `yaml:"config-refresh-interval"` // 配置刷新时间间隔
+	BagNumber             int                  `yaml:"bag-number"`
+	TimeWindowSendGap     int                  `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
+	BagDataDir            string               `yaml:"bag-data-dir"`
+	BagCopyDir            string               `yaml:"bag-copy-dir"`
+	TriggersDir           string               `yaml:"triggers-dir"`
+	RpcPort               string               `yaml:"rpc-port"`
+	Triggers              []trigger            `yaml:"triggers"`
+	Hosts                 []hostStruct         `yaml:"hosts"`
+	Ros                   ros                  `yaml:"ros"`
+	Platform              platform             `yaml:"platform"`
+	Disk                  disk                 `yaml:"disk"`
+	Monitor               MonitorStruct        `yaml:"monitor"`
 }
 
-// Response 结构体定义
-type Response struct {
-	CommandID string            `json:"commandId"`
-	ErrorCode string            `json:"errorCode"`
-	Results   map[string]string `json:"results"`
-	Status    string            `json:"status"`
-	Time      int64             `json:"time"`
-	Type      string            `json:"type"`
-	UUID      string            `json:"uuid"`
-}
+//// Request 结构体定义
+//type Request struct {
+//	Type      string      `json:"type"`
+//	UUID      string      `json:"uuid"`
+//	CommandID string      `json:"commandId"`
+//	Parameter interface{} `json:"parameter"`
+//}
+//
+//// Response 结构体定义
+//type Response struct {
+//	CommandID string            `json:"commandId"`
+//	ErrorCode string            `json:"errorCode"`
+//	Results   map[string]string `json:"results"`
+//	Status    string            `json:"status"`
+//	Time      int64             `json:"time"`
+//	Type      string            `json:"type"`
+//	UUID      string            `json:"uuid"`
+//}
 
 var (
 	CloudConfig      cloudConfig
@@ -165,6 +180,25 @@ func InitCloudConfig() {
 		c_log.GlobalLogger.Error("程序崩溃,配置文件 ", LocalConfig.CloudConfigLocalPath, " 解析失败:", err)
 		os.Exit(-1)
 	}
+	cw := time.Now()
+	loc := cw.Location()
+	// 单独解析采集时间
+	startTime, err := time.ParseInLocation("15:04", newCloudConfig.CollectWindow.Start, loc)
+	if err != nil {
+		c_log.GlobalLogger.Error("云端配置文件解析采集时间【startTime】失败 ", err, "取默认值【00:00】")
+		newCloudConfig.CollectWindow.StartTime, _ = time.ParseInLocation("15:04", "00:00", loc)
+	}
+	newCloudConfig.CollectWindow.StartTime = startTime
+	endTime, err := time.ParseInLocation("15:04", newCloudConfig.CollectWindow.End, loc)
+	if err != nil {
+		c_log.GlobalLogger.Error("云端配置文件解析采集时间【endTime】失败 ", err, "取默认值【23:59】")
+		newCloudConfig.CollectWindow.EndTime, _ = time.ParseInLocation("15:04", "23:59", loc)
+	}
+	newCloudConfig.CollectWindow.EndTime = endTime
+	if len(newCloudConfig.CollectWindow.Days) == 0 {
+		// 默认设置为每天
+		newCloudConfig.CollectWindow.Days = []string{"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"}
+	}
 
 	// 5 ------- 校验 yaml -------
 	if checkCloudConfig(newCloudConfig) {

+ 1 - 0
aarch64/pjibot_delivery/common/config/c_local.go

@@ -24,6 +24,7 @@ type localConfig struct {
 	OssBasePrefix        string     `yaml:"oss-base-prefix"`         // 云端配置文件的位置
 	CloudConfigFilename  string     `yaml:"cloud-config-filename"`   // 云端配置文件名称
 	CloudConfigLocalPath string     `yaml:"cloud-config-local-path"` // 将 oss 的配置文件下载到本地的位置
+	LocalWebsocketPort   string     `yaml:"local-websocket-port"`    // websocket端口号
 	RestartCmd           restartCmd `yaml:"restart-cmd"`             // 重启命令
 	EquipmentNo          string     // 当前设备的编号
 	SecretKey            string     // 当前设备的密钥

+ 17 - 0
aarch64/pjibot_delivery/common/config/c_platform.go

@@ -193,3 +193,20 @@ func checkPlatformConfig() bool {
 	}
 	return true
 }
+
+func CheckPlatformConfigStatus(maxRetryCount int) bool {
+	var err error
+	for i := 0; i < maxRetryCount; i++ {
+		time.Sleep(time.Duration(2) * time.Second)
+		// 判断是否有配置,第一次访问状态应该为:CHANGE(一共三种状态 CHANGE|UNCHANGE|NONE)
+		PlatformConfig, err = getConfig()
+		if err != nil {
+			c_log.GlobalLogger.Error("获取配置status失败:", err)
+			continue
+		}
+		if checkPlatformConfig() {
+			return true
+		}
+	}
+	return false
+}

+ 225 - 0
aarch64/pjibot_delivery/common/config/c_websocket.go

@@ -0,0 +1,225 @@
+package config
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/websocket"
+	"net/url"
+	"time"
+)
+
+var (
+	WsConn                 *websocket.Conn
+	reconnectionInProgress bool
+)
+
+// Request 结构体定义
+type Request struct {
+	Type      string      `json:"type"`
+	UUID      string      `json:"uuid"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Request1 结构体定义
+type Request1 struct {
+	Type      string      `json:"type"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Response 结构体定义
+type Response struct {
+	CommandID string            `json:"commandId"`
+	ErrorCode string            `json:"errorCode"`
+	Results   map[string]string `json:"results"`
+	Status    string            `json:"status"`
+	Time      int64             `json:"time"`
+	Type      string            `json:"type"`
+	UUID      string            `json:"uuid"`
+}
+
+// StatusMessage 状态消息 结构体定义
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
+func keepAlive() {
+	ticker := time.NewTicker(30 * time.Second)
+	defer ticker.Stop()
+
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		//c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+	}
+
+	for {
+		select {
+		case <-ticker.C:
+			err := WsConn.WriteMessage(websocket.TextMessage, requestJSON)
+			if err != nil {
+				//c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+				WsConn.Close()
+				//c_log.GlobalLogger.Info("重试连接websocket...")
+				ConnectWebsocket() // 重新连接
+				continue
+			}
+			//c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+		}
+	}
+}
+
+func SendWebsocketHeartbeat(conn *websocket.Conn, maxRetries int) (bool, error) {
+
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	// 将请求JSON编码为字节
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		return false, fmt.Errorf("marshal request: %w", err)
+	}
+
+	// 发送WebSocket消息
+	err = conn.WriteMessage(websocket.TextMessage, requestJSON)
+	if err != nil {
+		return false, fmt.Errorf("write: %w", err)
+	}
+
+	count := 0
+	for {
+		if count > maxRetries {
+			return false, fmt.Errorf("保持websocket连接活跃,读取websocket消息超过最大重试次数。")
+		}
+		time.Sleep(1 * time.Second)
+		_, message, err := conn.ReadMessage()
+		if err != nil {
+			c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err, " 继续读取消息。")
+			return false, err
+			//continue
+		}
+
+		var response Response
+		err = json.Unmarshal(message, &response)
+		c_log.GlobalLogger.Info("response ", response)
+		if err == nil && response.Type == "response" {
+			c_log.GlobalLogger.Info("response1 ", response)
+			return true, err
+		}
+		count++
+	}
+}
+
+func sendRequestAndAwaitResponse(ws *websocket.Conn) ([]byte, error) {
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+		return nil, err
+	}
+
+	err = ws.WriteMessage(websocket.TextMessage, requestJSON)
+	if err != nil {
+		c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+		return nil, err
+	}
+	c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+
+	// 使用channel等待响应
+	responseChan := make(chan []byte)
+	go handleMessages(ws, responseChan)
+
+	select {
+	case response := <-responseChan:
+		c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 成功。")
+		return response, nil
+	case <-time.After(60 * time.Second): // 设置超时时间
+		c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 超时。")
+		close(responseChan)
+		return nil, fmt.Errorf("保持websocket连接活跃,等待心跳响应 - 超时。")
+	}
+}
+
+func handleMessages(ws *websocket.Conn, responseChan chan<- []byte) {
+	for {
+		time.Sleep(100 * time.Millisecond)
+		_, message, err := ws.ReadMessage()
+		if err != nil {
+			c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err)
+			return
+		}
+
+		var response Response
+		if err := json.Unmarshal(message, &response); err == nil && response.Type == "response" {
+			responseChan <- message
+			close(responseChan)
+			return
+		}
+	}
+}
+
+func ConnectWebsocket() {
+	for {
+		// 防止重复调用
+		if reconnectionInProgress {
+			return
+		}
+		reconnectionInProgress = true
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。")
+		serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort
+		path := "/"
+
+		// 构建WebSocket连接URL
+		u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
+		//c_log.GlobalLogger.Info("URL:", u.String())
+
+		// 创建一个Dialer实例,用于建立WebSocket连接
+		dialer := websocket.Dialer{
+			ReadBufferSize:  1024,
+			WriteBufferSize: 1024,
+			// 可选:设置超时等
+			HandshakeTimeout: 5 * time.Minute,
+		}
+
+		// 建立WebSocket连接
+		coon, _, err := dialer.Dial(u.String(), nil)
+		if err != nil {
+			fmt.Println("err:", err)
+			//c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。")
+			time.Sleep(5 * time.Second)
+			reconnectionInProgress = false
+			//c_log.GlobalLogger.Info("重试连接websocket...")
+			continue
+		}
+
+		WsConn = coon
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
+		// 连接成功,退出循环
+		reconnectionInProgress = false
+		break
+	}
+}
+
+func InitWebsocketConfig() {
+	ConnectWebsocket()
+	// 保持连接活跃
+	go keepAlive()
+}

+ 3 - 1
aarch64/pjibot_delivery/common/service/disk_clean.go

@@ -27,7 +27,9 @@ func DiskClean() {
 	for {
 		time.Sleep(1000 * time.Millisecond)
 		// 1 获取磁盘占用
-		diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name)
+		//diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name) // 获取整个磁盘空间
+
+		diskUsed, _ := util.GetDirectoryDiskUsed(commonConfig.CloudConfig.Disk.Path) // 获取指定目录空间
 		if diskUsed > commonConfig.CloudConfig.Disk.Used {
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
 			c_log.GlobalLogger.Errorf("磁盘占用 %v 超过 %v,触发删除规则 %v", diskUsed, commonConfig.CloudConfig.Disk.Used, policyToDescription[policy])

+ 24 - 0
aarch64/pjibot_delivery/common/service/rosbag_upload.go

@@ -7,6 +7,8 @@ import (
 	"cicv-data-closedloop/common/domain"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
 	"fmt"
 	"os"
 	"strings"
@@ -148,6 +150,8 @@ outLoop:
 			c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err)
 		}
 
+		// 数据库中采集数量加一
+		collectNumPlus()
 		// 删除本地所有已上传的bag文件
 		c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
@@ -157,6 +161,26 @@ outLoop:
 		if len(entity.TimeWindowConsumerQueue) == 0 {
 			c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
 			ChannelKillRosRecord <- 2
+			entity.ProcessingFlag = false
 		}
 	}
 }
+
+func collectNumPlus() {
+	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+		commonConfig.CloudConfig.CollectNumPlus.Url,
+		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+		map[string]string{
+			"snCode": commonConfig.LocalConfig.SecretKey,
+		},
+	)
+	if err != nil {
+		c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
+	}
+	// 解析JSON字符串到Response结构体
+	var resp entity.Response
+	err = json.Unmarshal([]byte(responseString), &resp)
+	if err != nil {
+		c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
+	}
+}

+ 216 - 67
aarch64/pjibot_delivery/control/main.go

@@ -4,15 +4,38 @@ import (
 	commonConfig "cicv-data-closedloop/aarch64/pjibot_delivery/common/config"
 	commonService "cicv-data-closedloop/aarch64/pjibot_delivery/common/service"
 	"cicv-data-closedloop/aarch64/pjibot_delivery/common/variable"
+	"cicv-data-closedloop/aarch64/pjibot_delivery/control/pkg"
 	"cicv-data-closedloop/common/config/c_log"
+	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
+	"fmt"
 	"net/rpc"
 	"os"
 	"runtime"
 	"time"
 )
 
-var applicationName = "pji-control"
+var (
+	applicationName  = "pji-control"
+	localStatus      = "idle"
+	cloudStatus      = "NONE"
+	lastLocalStatus  = "idle"
+	lastCloudStatus  = "NONE"
+	timeAllowedFlag  = false
+	limitReachedFlag = false
+	localTurnLength  = 1  // s,本地状态刷新时间
+	cloudTurnLength  = 60 // s,云端状态刷新时间
+	renewTurnLength  = 3  // s,续约状态刷新时间
+	waitStopLength   = 1  // min,停止master前等待时间
+	checkLimitLength = 1  // min, 查询是否达到采集限制等待时间
+	launchedFlag     = false
+	renewedFlag      = false
+	renewTimer       *time.Timer // 续约定时器
+	RenewDur         = 5         // min, 续约时间
+	maxRetryCount    = 10        // 查询配置最大重试次数
+)
 
 func init() {
 	runtime.GOMAXPROCS(1)
@@ -24,87 +47,213 @@ func init() {
 	commonConfig.InitOssConfig()
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
 	commonConfig.InitCloudConfig()
+	// 初始化websocket配置
+	commonConfig.InitWebsocketConfig()
 }
 
-func main() {
-	init := true
-	turnLength := 60
-	lastStatus := "NONE"
-	//  轮询任务接口判断是否有更新
-	for {
-		c_log.GlobalLogger.Errorf("一轮次扫描时间【%v】秒:", turnLength)
-		if init {
-			time.Sleep(time.Duration(1) * time.Second)
-			init = false
-		} else {
-			time.Sleep(time.Duration(turnLength) * time.Second)
+func isTimeAllowed(currentTime time.Time) bool {
+	loc := currentTime.Location()
+	cw := commonConfig.CloudConfig.CollectWindow
+	if cw.Flag == 0 { // 关闭固定时间段采集, 则都返回true
+		return true
+	}
+	if len(cw.Days) > 0 { // 如果指定了周几
+		currentDay := currentTime.Weekday().String()
+		included := false
+		for _, day := range cw.Days {
+			if day == currentDay {
+				included = true
+				break
+			}
 		}
-		// 1 获取当前设备的任务的 status
-		status, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
-		if err != nil {
-			c_log.GlobalLogger.Error("获取配置status失败:", err)
-			continue
+		if !included {
+			return false
 		}
-		// 2 判断 status
-		// UN_CHANGE 没有新的任务,无需更改
-		// CHANGE 有新的任务,需要杀死旧的任务并重启
-		// NONE 设备没有配置任务,需要杀死旧的任务
-		if status == "UN_CHANGE" {
-			lastStatus = "UN_CHANGE"
-			continue
-		} else if status == "CHANGE" || status == "NONE" {
-			if lastStatus == "CHANGE" && status == "CHANGE" { // 供更新使用
-				commonConfig.InitPlatformConfig()
-				continue
-			}
-			if lastStatus == "NONE" && status == "NONE" {
+	}
+
+	start := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), cw.StartTime.Hour(), cw.StartTime.Minute(), cw.StartTime.Second(), 0, loc)
+	end := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), cw.EndTime.Hour(), cw.EndTime.Minute(), cw.EndTime.Second(), 0, loc)
+
+	if start.After(end) { // 如果时段跨天
+		end = end.AddDate(0, 0, 1)
+	}
+	return !currentTime.Before(start) && currentTime.Before(end)
+}
+
+func checkCollectLimit() {
+	for {
+		time.Sleep(time.Duration(checkLimitLength) * time.Minute)
+		// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+		if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+			//c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+			responseString, err := commonUtil.HttpPostJsonWithHeaders(
+				commonConfig.CloudConfig.CollectLimit.Url,
+				map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+				map[string]string{
+					"snCode":            commonConfig.LocalConfig.SecretKey,
+					"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+					"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+					"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+					"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+				},
+			)
+			if err != nil {
+				c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
 				continue
 			}
-			// 3 发送rpc信号杀死采集程序
-			if lastStatus == "NONE" && status == "CHANGE" {
-				if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
-					c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
-					os.Exit(-1)
-				}
-				c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
-				lastStatus = status
-				c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-				commonConfig.InitPlatformConfig()
+			// 解析JSON字符串到Response结构体
+			var resp entity.Response
+			err = json.Unmarshal([]byte(responseString), &resp)
+			if err != nil {
+				c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
 				continue
 			}
-			var killArgs commonService.KillSignal
-			if lastStatus == "UN_CHANGE" && status == "CHANGE" {
-				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: true}
-				c_log.GlobalLogger.Info("更新任务,发送rpc重启信号:", killArgs)
-			}
-			if lastStatus == "UN_CHANGE" && status == "NONE" {
-				killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
-				c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
+			if resp.Code != 200 { // 不是200 代表采集数量已超过限额不允许采集
+				//c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+				limitReachedFlag = true
 			}
+		}
+		limitReachedFlag = false
+	}
+}
 
-			KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
-			if err != nil {
-				// 此处如果连接失败说明采集程序已经停止了
-				lastStatus = "NONE"
-				c_log.GlobalLogger.Error("采集程序已经停止:", err)
-				continue
+func initRenew() {
+	c_log.GlobalLogger.Info("启动定时器 - 开始。")
+	if renewTimer != nil {
+		renewTimer.Stop()
+	}
+	renewedFlag = true
+	renewTimer = time.AfterFunc(time.Duration(RenewDur)*time.Minute, func() {
+		renewedFlag = false
+	})
+	c_log.GlobalLogger.Infof("定时时间【%v】分钟 - 成功。", RenewDur)
+}
+
+func renew() {
+	for {
+		time.Sleep(time.Duration(renewTurnLength) * time.Second)
+		if localStatus == "running" && launchedFlag && !renewedFlag { // 设备处于运行状态,数采程序已启动,且尚未续约
+			c_log.GlobalLogger.Info("设备仍处于运行状态,续约 - 开始。")
+			if renewTimer != nil {
+				renewTimer.Stop()
 			}
+			renewedFlag = true
+			renewTimer = time.AfterFunc(time.Duration(RenewDur)*time.Minute, func() {
+				renewedFlag = false
+			})
+			c_log.GlobalLogger.Infof("续约时间【%v】分钟 - 成功。", RenewDur)
+		}
+	}
+}
+
+func startMasterNode() {
+	c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
+
+	if commonConfig.CheckPlatformConfigStatus(maxRetryCount) {
+		c_log.GlobalLogger.Info("查询到数据闭环平台有配置任务。")
+		commonConfig.InitPlatformConfig()
+
+		if _, err := util.ExecuteWithPath(commonConfig.LocalConfig.RestartCmd.Dir, commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args...); err != nil {
+			c_log.GlobalLogger.Info("启动新程序失败,【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args, ":", err)
+			os.Exit(-1)
+		}
+		c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
+
+		initRenew()
+		launchedFlag = true
+
+		c_log.GlobalLogger.Info("数采程序启动 - 成功。")
+	} else {
+		c_log.GlobalLogger.Error("查询到数据闭环平台没有配置任务,不启动数采程序。")
+	}
+}
+
+func stopMasterNode() {
+	// 发送rpc信号杀死采集程序
+	var killArgs commonService.KillSignal
+	killArgs = commonService.KillSignal{NodeName: "master", DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
+	c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号:", killArgs)
+	KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
+	if err != nil {
+		// 此处如果连接失败说明采集程序已经停止了
+		lastCloudStatus = "NONE"
+		c_log.GlobalLogger.Error("采集程序已经停止:", err)
+		return
+	}
+
+	reply := 0
+	if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
+		c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
+	}
+
+	c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
+	commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
+	if err = KillRpcClient.Close(); err != nil {
+		// 不做处理
+	}
+
+	launchedFlag = false
+	c_log.GlobalLogger.Info("数采程序关闭 - 成功。")
+}
+
+func main() {
+	// 更新本地任务状态
+	go pkg.GetLocalStatus(&localStatus, &lastLocalStatus, localTurnLength)
+	// 更新云端任务状态
+	go pkg.GetCloudStatus(&cloudStatus, &lastCloudStatus, cloudTurnLength)
+
+	// 定期检查本地任务状态,执行续约,避免短时间内多次启停
+	go renew()
+
+	// 检查是否达到限额
+	go checkCollectLimit()
+
+	// 云端任务状态负责更新配置
+	go pkg.GetCloudConfig(&cloudStatus, &lastCloudStatus, cloudTurnLength)
+
+	for {
+		if launchedFlag { // 当前已启动master节点
+			time.Sleep(time.Duration(cloudTurnLength) * time.Second)
+		} else {
+			time.Sleep(time.Duration(localTurnLength) * time.Second)
+		}
+
+		fmt.Println("localStatus: ", localStatus, "lastLocalStatus: ", lastLocalStatus)
+		fmt.Println("cloudStatus: ", cloudStatus, "lastCloudStatus: ", lastCloudStatus)
+
+		// 采集时间限制
+		currentTime := time.Now()
+		timeAllowedFlag = isTimeAllowed(currentTime)
+
+		// 采集频率限制
+
+		// 综合判断 cloudStatus 和 localStatus
+		// cloudStatus
+		// UN_CHANGE 没有新的任务,无需更改
+		// CHANGE 有新的任务,需要杀死旧的数采任务并重启
+		// NONE 设备没有配置任务,需要杀死旧的数采任务
+		// localStatus
+		// idle 空闲状态,此状态下不启动数采任务
+		// running 繁忙状态,此状态需要启动数采任务
+		// error 错误状态,此状态下不启动数采任务
 
-			reply := 0
-			if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
-				c_log.GlobalLogger.Error("发送 rpc 请求到 master 报错:", err)
-				// 这里可能会报错unexpected EOF但是不影响,先注释 close 和 continue
-				//KillRpcClient.Close()
-				//continue
+		// 启停master
+		if localStatus == "running" && timeAllowedFlag && !limitReachedFlag {
+			if !launchedFlag { // 目前未启动数采程序
+				c_log.GlobalLogger.Info("数采程序启动 - 开始。")
+				startMasterNode()
 			}
-			lastStatus = status
-			c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
-			commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
-			if err = KillRpcClient.Close(); err != nil {
-				// 不做处理
+		} else if localStatus == "idle" || !timeAllowedFlag || !limitReachedFlag {
+			if !renewedFlag && launchedFlag && len(entity.TimeWindowConsumerQueue) == 0 {
+				time.Sleep(time.Duration(waitStopLength) * time.Minute)
+				c_log.GlobalLogger.Info("数采程序关闭 - 开始。")
+				stopMasterNode()
 			}
+		} else if localStatus == "error" {
+			c_log.GlobalLogger.Error("设备运行状态出错,停止数采程序。")
+			stopMasterNode()
 		} else {
-			c_log.GlobalLogger.Error("未知的采集任务状态。【status】=", status)
+			c_log.GlobalLogger.Error("未知的设备运行状态。【status】=", localStatus)
 		}
 	}
 }

+ 45 - 0
aarch64/pjibot_delivery/control/pkg/judge_cloud.go

@@ -0,0 +1,45 @@
+package pkg
+
+import (
+	commonConfig "cicv-data-closedloop/aarch64/pjibot_delivery/common/config"
+	"cicv-data-closedloop/common/config/c_log"
+	"time"
+)
+
+var (
+	maxRetryCount = 10
+)
+
+// UN_CHANGE 没有新的任务
+// CHANGE 有新的任务
+// NONE 设备没有配置任务
+func GetCloudStatus(cloudStatus *string, lastCloudStatus *string, turnLength int) {
+	// 轮询云端任务状态
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+
+		*lastCloudStatus = *cloudStatus
+		taskStatus, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
+		if err != nil {
+			c_log.GlobalLogger.Error("获取云端配置status失败:", err)
+			continue
+		}
+		if taskStatus == "" || taskStatus == " " {
+			taskStatus = "NONE"
+		}
+		*cloudStatus = taskStatus
+	}
+}
+
+func GetCloudConfig(cloudStatus *string, lastCloudStatus *string, turnLength int) {
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+		if *cloudStatus == "CHANGE" {
+			c_log.GlobalLogger.Error("cloudStatus:", *cloudStatus)
+			if commonConfig.CheckPlatformConfigStatus(maxRetryCount) {
+				c_log.GlobalLogger.Info("查询到数据闭环平台有配置任务。")
+				commonConfig.InitPlatformConfig()
+			}
+		}
+	}
+}

+ 44 - 0
aarch64/pjibot_delivery/control/pkg/judge_local.go

@@ -0,0 +1,44 @@
+package pkg
+
+import (
+	"cicv-data-closedloop/aarch64/pjibot_delivery/common/config"
+	"encoding/json"
+	"fmt"
+	"log"
+	"time"
+)
+
+// idle 空闲状态,此状态下机器人可进行任务下发
+// running 繁忙状态,此状态机器人不接受新任务
+// error 错误状态(硬件,不能正常工作的)
+func GetLocalStatus(localStatus *string, lastLocalStatus *string, turnLength int) {
+	defer config.WsConn.Close()
+	// 轮询本地任务状态
+	for {
+		time.Sleep(time.Duration(turnLength) * time.Second)
+
+		*lastLocalStatus = *localStatus
+		_, msg, err := config.WsConn.ReadMessage()
+		if err != nil {
+			log.Println("Error in receive:", err)
+			continue
+		}
+		//log.Printf("Received: %s\n", msg)
+
+		// 将响应字节解码为JSON
+		var statusMessage config.StatusMessage
+		err = json.Unmarshal(msg, &statusMessage)
+		if err != nil {
+			log.Println("Error in json:", err)
+			continue
+		}
+
+		if statusMessage.Type == "push" && statusMessage.Topic == "robotStatus" {
+			//fmt.Println("statusMessage:", statusMessage)
+			data := statusMessage.Data.(map[string]interface{})
+			//fmt.Println("statusMessage.Data", data)
+			fmt.Println("statusMessage.Data[\"status\"]", data["status"])
+			*localStatus = data["status"].(string)
+		}
+	}
+}

+ 1 - 0
aarch64/pjibot_delivery/master/package/service/move_bag_and_send_window.go

@@ -59,6 +59,7 @@ func RunTimeWindowProducerQueue() {
 				time.Sleep(time.Duration(2) * time.Second)
 				c_log.GlobalLogger.Info("采集数据,发送record命令进程关闭信号。")
 				commonService.ChannelKillRosRecord <- 3
+				entity.ProcessingFlag = true
 				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
 				// 将时间窗口移出准备队列
 				entity.RemoveHeadOfTimeWindowProducerQueue()

+ 97 - 53
aarch64/pjibot_delivery/master/package/service/produce_window.go

@@ -43,9 +43,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -55,6 +52,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfDiagnostics {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -76,9 +76,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *sensor_msgs.Imu) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -88,6 +85,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfImu {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -111,9 +111,6 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *pjibot_delivery_msgs.LocateInfo) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
-							if !canCollect() {
-								return
-							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -121,6 +118,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfLocateInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -142,9 +142,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -154,6 +151,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfObstacleDetection {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -177,9 +177,6 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *nav_msgs.Odometry) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
-							if !canCollect() {
-								return
-							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -187,6 +184,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfOdom {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -210,9 +210,6 @@ func PrepareTimeWindowProducerQueue() {
 					Callback: func(data *pjibot_delivery_msgs.SysInfo) {
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
-							if !canCollect() {
-								return
-							}
 							subscribersMutexes[i].Lock()
 							faultHappenTime := commonUtil.GetNowTimeCustom() // 获取当前故障发生时间
 							lastTimeWindow := entity.GetLastTimeWindow()     // 获取最后一个时间窗口
@@ -220,6 +217,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfSysInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -241,9 +241,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *geometry_msgs.PoseStamped) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -253,6 +250,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfRobotPose {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -274,9 +274,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_delivery_msgs.TaskFeedbackInfo) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -286,6 +283,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -307,9 +307,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -319,6 +316,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfWheelOdom {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -414,34 +414,78 @@ func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []st
 
 // 判断采集包数量是否超过限额
 func canCollect() bool {
-	responseString, err := commonUtil.HttpPostJsonWithHeaders(
-		commonConfig.CloudConfig.CollectLimit.Url,
-		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
-		map[string]string{
-			"snCode":            commonConfig.LocalConfig.SecretKey,
-			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
-			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
-			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
-			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
-		},
-	)
-	if err != nil {
-		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
-		return false
-	}
-	// 解析JSON字符串到Response结构体
-	var resp entity.Response
-	err = json.Unmarshal([]byte(responseString), &resp)
-	if err != nil {
-		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
-		return false
-	}
-	if resp.Code != 200 { // 不是200 代表不允许采集
-		return false
+	// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+	if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+		c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+		responseString, err := commonUtil.HttpPostJsonWithHeaders(
+			commonConfig.CloudConfig.CollectLimit.Url,
+			map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+			map[string]string{
+				"snCode":            commonConfig.LocalConfig.SecretKey,
+				"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+				"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+				"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+				"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+			},
+		)
+		if err != nil {
+			c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+			return false
+		}
+		// 解析JSON字符串到Response结构体
+		var resp entity.Response
+		err = json.Unmarshal([]byte(responseString), &resp)
+		if err != nil {
+			c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+			return false
+		}
+		if resp.Code != 200 { // 不是200 代表不允许采集
+			c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+			return false
+		}
+	} else {
+		c_log.GlobalLogger.Error("当前设备未开启数采频率限制,无需判断采集数量是否达到限额。")
 	}
-	if len(entity.TimeWindowConsumerQueue) != 0 {
-		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+
+	// 本地判断是否存在正在处理的数据
+	if entity.ProcessingFlag {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。")
 		return false
 	}
+
+	c_log.GlobalLogger.Info("允许采集。")
 	return true
 }
+
+//func canCollect() bool {
+//	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+//		commonConfig.CloudConfig.CollectLimit.Url,
+//		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+//		map[string]string{
+//			"snCode":            commonConfig.LocalConfig.SecretKey,
+//			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+//			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+//			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+//			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+//		},
+//	)
+//	if err != nil {
+//		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+//		return false
+//	}
+//	// 解析JSON字符串到Response结构体
+//	var resp entity.Response
+//	err = json.Unmarshal([]byte(responseString), &resp)
+//	if err != nil {
+//		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+//		return false
+//	}
+//	if resp.Code != 200 { // 不是200 代表不允许采集
+//		return false
+//	}
+//	if len(entity.TimeWindowConsumerQueue) != 0 {
+//		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+//		return false
+//	}
+//	return true
+//}

+ 10 - 0
aarch64/pjibot_delivery/配送机器人默认配置文件-cloud-config.yaml

@@ -1,10 +1,18 @@
 ---
 collect-limit:
   url: http://36.110.106.142:12341/web_server/collect_limit/can_collect
+  flag: 1 # 数采频率限制标志 0 - 关闭数采频率限制  1 - 开启数采频率限制
   day: 3
   week: 7
   month: 30
   year: 365
+collect-window:
+  flag: 1 # 时间段限制标志 0 - 关闭固定时间段采集 1 - 开启固定时间段采集
+  days: [] # 可选字段:"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday",空值默认为每天
+  start_time: "09:00"
+  end_time: "17:00"
+collect-num-plus:
+  url: http://36.110.106.142:12341/web_server/collect_limit/plus_collect_num
 monitor:
   url: http://36.110.106.142:12341/web_server/monitor/insert
 platform:
@@ -17,6 +25,8 @@ config-refresh-interval: 60
 disk:
   name: /dev/nvme0n1p1 # 磁盘名称
   used: 108000000000 # 磁盘占用阈值,单位bytes
+  path:
+    - /home/pji/cicv-data-closedloop
 has-one-msg: false
 bag-data-dir: /home/pji/cicv-data-closedloop/data/
 bag-copy-dir: /home/pji/cicv-data-closedloop/copy/

+ 2 - 0
aarch64/pjibot_delivery/配送机器人默认配置文件-local-config.yaml

@@ -11,6 +11,8 @@ oss-base-prefix: pjibot-delivery/
 cloud-config-filename: cloud-config.yaml
 # 将oss上的配置文件下载到本地的路径
 cloud-config-local-path: /home/pji/cicv-data-closedloop/config/cloud-config.yaml
+# websocket端口号
+local-websocket-port: 9002
 restart-cmd:
   dir: "/home/pji/cicv-data-closedloop/"
   name: "sh"

+ 25 - 21
aarch64/pjibot_guide/common/config/c_cloud.go

@@ -58,6 +58,10 @@ type CollectLimitStruct struct {
 	Year  int    `yaml:"year"`
 }
 
+type CollectNumPlusStruct struct {
+	Url string `yaml:"url"`
+}
+
 type DataDirStruct struct {
 	Src    string   `yaml:"src"`
 	SrcSub []string `yaml:"src-sub"`
@@ -65,27 +69,27 @@ type DataDirStruct struct {
 }
 
 type cloudConfig struct {
-	CollectLimit          CollectLimitStruct `yaml:"collect-limit"`
-	CollectNumPlus        CollectLimitStruct `yaml:"collect-num-plus"`
-	DataDir               DataDirStruct      `yaml:"data-dir"`
-	MapBufFiles           []string           `yaml:"map-buf-files"`
-	FullCollect           bool               `yaml:"full-collect"`
-	ConfigRefreshInterval int                `yaml:"config-refresh-interval"` // 配置刷新时间间隔
-	BagNumber             int                `yaml:"bag-number"`
-	TimeWindowSendGap     int                `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	MapBagPath            string             `yaml:"map-bag-path"`
-	TfstaticBagPath       string             `yaml:"tfstatic-bag-path"`
-	CostmapBagPath        string             `yaml:"costmap-bag-path"`
-	BagDataDir            string             `yaml:"bag-data-dir"`
-	BagCopyDir            string             `yaml:"bag-copy-dir"`
-	TriggersDir           string             `yaml:"triggers-dir"`
-	RpcPort               string             `yaml:"rpc-port"`
-	Triggers              []TriggerStruct    `yaml:"triggers"`
-	Hosts                 []HostStruct       `yaml:"hosts"`
-	Ros                   RosStruct          `yaml:"ros"`
-	Platform              PlatformStruct     `yaml:"platform"`
-	Disk                  DiskStruct         `yaml:"disk"`
-	Monitor               MonitorStruct      `yaml:"monitor"`
+	CollectLimit          CollectLimitStruct   `yaml:"collect-limit"`
+	CollectNumPlus        CollectNumPlusStruct `yaml:"collect-num-plus"`
+	DataDir               DataDirStruct        `yaml:"data-dir"`
+	MapBufFiles           []string             `yaml:"map-buf-files"`
+	FullCollect           bool                 `yaml:"full-collect"`
+	ConfigRefreshInterval int                  `yaml:"config-refresh-interval"` // 配置刷新时间间隔
+	BagNumber             int                  `yaml:"bag-number"`
+	TimeWindowSendGap     int                  `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
+	MapBagPath            string               `yaml:"map-bag-path"`
+	TfstaticBagPath       string               `yaml:"tfstatic-bag-path"`
+	CostmapBagPath        string               `yaml:"costmap-bag-path"`
+	BagDataDir            string               `yaml:"bag-data-dir"`
+	BagCopyDir            string               `yaml:"bag-copy-dir"`
+	TriggersDir           string               `yaml:"triggers-dir"`
+	RpcPort               string               `yaml:"rpc-port"`
+	Triggers              []TriggerStruct      `yaml:"triggers"`
+	Hosts                 []HostStruct         `yaml:"hosts"`
+	Ros                   RosStruct            `yaml:"ros"`
+	Platform              PlatformStruct       `yaml:"platform"`
+	Disk                  DiskStruct           `yaml:"disk"`
+	Monitor               MonitorStruct        `yaml:"monitor"`
 }
 
 //// Request 结构体定义

+ 9 - 9
aarch64/pjibot_guide/common/config/c_websocket.go

@@ -60,7 +60,7 @@ func keepAlive() {
 
 	requestJSON, err := json.Marshal(request)
 	if err != nil {
-		c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+		//c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
 	}
 
 	for {
@@ -68,13 +68,13 @@ func keepAlive() {
 		case <-ticker.C:
 			err := WsConn.WriteMessage(websocket.TextMessage, requestJSON)
 			if err != nil {
-				c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+				//c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
 				WsConn.Close()
-				c_log.GlobalLogger.Info("重试连接websocket...")
+				//c_log.GlobalLogger.Info("重试连接websocket...")
 				ConnectWebsocket() // 重新连接
 				continue
 			}
-			c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+			//c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
 		}
 	}
 }
@@ -183,13 +183,13 @@ func ConnectWebsocket() {
 			return
 		}
 		reconnectionInProgress = true
-		c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。")
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。")
 		serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort
 		path := "/"
 
 		// 构建WebSocket连接URL
 		u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
-		c_log.GlobalLogger.Info("URL:", u.String())
+		//c_log.GlobalLogger.Info("URL:", u.String())
 
 		// 创建一个Dialer实例,用于建立WebSocket连接
 		dialer := websocket.Dialer{
@@ -203,15 +203,15 @@ func ConnectWebsocket() {
 		coon, _, err := dialer.Dial(u.String(), nil)
 		if err != nil {
 			fmt.Println("err:", err)
-			c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。")
+			//c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。")
 			time.Sleep(5 * time.Second)
 			reconnectionInProgress = false
-			c_log.GlobalLogger.Info("重试连接websocket...")
+			//c_log.GlobalLogger.Info("重试连接websocket...")
 			continue
 		}
 
 		WsConn = coon
-		c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
 		// 连接成功,退出循环
 		reconnectionInProgress = false
 		break

+ 60 - 16
aarch64/pjibot_guide/control/main.go

@@ -8,6 +8,8 @@ import (
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
 	"fmt"
 	"net/rpc"
 	"os"
@@ -16,20 +18,22 @@ import (
 )
 
 var (
-	applicationName = "pji-control"
-	localStatus     = "idle"
-	cloudStatus     = "NONE"
-	lastLocalStatus = "idle"
-	lastCloudStatus = "NONE"
-	localTurnLength = 1  // s,本地状态刷新时间
-	cloudTurnLength = 60 // s,云端状态刷新时间
-	renewTurnLength = 3  // s,续约状态刷新时间
-	waitStopLength  = 1  // min,停止master前等待时间
-	launchedFlag    = false
-	renewedFlag     = false
-	renewTimer      *time.Timer // 续约定时器
-	RenewDur        = 5         // min, 续约时间
-	maxRetryCount   = 10        // 查询配置最大重试次数
+	applicationName  = "pji-control"
+	localStatus      = "idle"
+	cloudStatus      = "NONE"
+	lastLocalStatus  = "idle"
+	lastCloudStatus  = "NONE"
+	limitReachedFlag = false
+	localTurnLength  = 1  // s,本地状态刷新时间
+	cloudTurnLength  = 60 // s,云端状态刷新时间
+	renewTurnLength  = 3  // s,续约状态刷新时间
+	waitStopLength   = 1  // min,停止master前等待时间
+	checkLimitLength = 1  // min, 查询是否达到采集限制等待时间
+	launchedFlag     = false
+	renewedFlag      = false
+	renewTimer       *time.Timer // 续约定时器
+	RenewDur         = 5         // min, 续约时间
+	maxRetryCount    = 10        // 查询配置最大重试次数
 )
 
 func init() {
@@ -48,6 +52,43 @@ func init() {
 	commonConfig.InitWebsocketConfig()
 }
 
+func checkCollectLimit() {
+	for {
+		time.Sleep(time.Duration(checkLimitLength) * time.Minute)
+		// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+		if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+			//c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+			responseString, err := commonUtil.HttpPostJsonWithHeaders(
+				commonConfig.CloudConfig.CollectLimit.Url,
+				map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+				map[string]string{
+					"snCode":            commonConfig.LocalConfig.SecretKey,
+					"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+					"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+					"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+					"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+				},
+			)
+			if err != nil {
+				c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+				continue
+			}
+			// 解析JSON字符串到Response结构体
+			var resp entity.Response
+			err = json.Unmarshal([]byte(responseString), &resp)
+			if err != nil {
+				c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+				continue
+			}
+			if resp.Code != 200 { // 不是200 代表采集数量已超过限额不允许采集
+				//c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+				limitReachedFlag = true
+			}
+		}
+		limitReachedFlag = false
+	}
+}
+
 func initRenew() {
 	c_log.GlobalLogger.Info("启动定时器 - 开始。")
 	if renewTimer != nil {
@@ -137,6 +178,9 @@ func main() {
 	// 定期检查本地任务状态,执行续约,避免短时间内多次启停
 	go renew()
 
+	// 检查是否达到限额
+	go checkCollectLimit()
+
 	// 云端任务状态负责更新配置
 	go pkg.GetCloudConfig(&cloudStatus, &lastCloudStatus, cloudTurnLength)
 
@@ -161,12 +205,12 @@ func main() {
 		// error 错误状态,此状态下不启动数采任务
 
 		// 本地任务状态负责启停master
-		if localStatus == "running" {
+		if localStatus == "running" && !limitReachedFlag {
 			if !launchedFlag { // 目前未启动数采程序
 				c_log.GlobalLogger.Info("数采程序启动 - 开始。")
 				startMasterNode()
 			}
-		} else if localStatus == "idle" {
+		} else if localStatus == "idle" || limitReachedFlag {
 			if !renewedFlag && launchedFlag && len(entity.TimeWindowConsumerQueue) == 0 {
 				time.Sleep(time.Duration(waitStopLength) * time.Minute)
 				c_log.GlobalLogger.Info("设备不在运行状态且没有待处理的数据,数采程序关闭 - 开始。")

+ 39 - 34
aarch64/pjibot_patrol/common/config/c_cloud.go

@@ -52,6 +52,10 @@ type trigger struct {
 	Topics []string `yaml:"topics"`
 }
 
+type CollectNumPlusStruct struct {
+	Url string `yaml:"url"`
+}
+
 type CollectLimitStruct struct {
 	Url   string `yaml:"url"`
 	Flag  int    `yaml:"flag"`
@@ -62,42 +66,43 @@ type CollectLimitStruct struct {
 }
 
 type cloudConfig struct {
-	CollectLimit          CollectLimitStruct `yaml:"collect-limit"`
-	HasOneMsgTopic        bool               `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
-	FullCollect           bool               `yaml:"full-collect"`
-	ConfigRefreshInterval int                `yaml:"config-refresh-interval"` // 配置刷新时间间隔
-	BagNumber             int                `yaml:"bag-number"`
-	TimeWindowSendGap     int                `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
-	BagDataDir            string             `yaml:"bag-data-dir"`
-	BagCopyDir            string             `yaml:"bag-copy-dir"`
-	TriggersDir           string             `yaml:"triggers-dir"`
-	RpcPort               string             `yaml:"rpc-port"`
-	Triggers              []trigger          `yaml:"triggers"`
-	Hosts                 []hostStruct       `yaml:"hosts"`
-	Ros                   ros                `yaml:"ros"`
-	Platform              platform           `yaml:"platform"`
-	Disk                  disk               `yaml:"disk"`
-	Monitor               MonitorStruct      `yaml:"monitor"`
+	CollectLimit          CollectLimitStruct   `yaml:"collect-limit"`
+	CollectNumPlus        CollectNumPlusStruct `yaml:"collect-num-plus"`
+	HasOneMsgTopic        bool                 `yaml:"has-one-msg-topic"` // 是否存在只发单帧的话题
+	FullCollect           bool                 `yaml:"full-collect"`
+	ConfigRefreshInterval int                  `yaml:"config-refresh-interval"` // 配置刷新时间间隔
+	BagNumber             int                  `yaml:"bag-number"`
+	TimeWindowSendGap     int                  `yaml:"time-window-send-gap"` // 主节点向从节点发送窗口的最小时间间隔
+	BagDataDir            string               `yaml:"bag-data-dir"`
+	BagCopyDir            string               `yaml:"bag-copy-dir"`
+	TriggersDir           string               `yaml:"triggers-dir"`
+	RpcPort               string               `yaml:"rpc-port"`
+	Triggers              []trigger            `yaml:"triggers"`
+	Hosts                 []hostStruct         `yaml:"hosts"`
+	Ros                   ros                  `yaml:"ros"`
+	Platform              platform             `yaml:"platform"`
+	Disk                  disk                 `yaml:"disk"`
+	Monitor               MonitorStruct        `yaml:"monitor"`
 }
 
-// Request 结构体定义
-type Request struct {
-	Type      string      `json:"type"`
-	UUID      string      `json:"uuid"`
-	CommandID string      `json:"commandId"`
-	Parameter interface{} `json:"parameter"`
-}
-
-// Response 结构体定义
-type Response struct {
-	CommandID string            `json:"commandId"`
-	ErrorCode string            `json:"errorCode"`
-	Results   map[string]string `json:"results"`
-	Status    string            `json:"status"`
-	Time      int64             `json:"time"`
-	Type      string            `json:"type"`
-	UUID      string            `json:"uuid"`
-}
+//// Request 结构体定义
+//type Request struct {
+//	Type      string      `json:"type"`
+//	UUID      string      `json:"uuid"`
+//	CommandID string      `json:"commandId"`
+//	Parameter interface{} `json:"parameter"`
+//}
+//
+//// Response 结构体定义
+//type Response struct {
+//	CommandID string            `json:"commandId"`
+//	ErrorCode string            `json:"errorCode"`
+//	Results   map[string]string `json:"results"`
+//	Status    string            `json:"status"`
+//	Time      int64             `json:"time"`
+//	Type      string            `json:"type"`
+//	UUID      string            `json:"uuid"`
+//}
 
 var (
 	CloudConfig      cloudConfig

+ 1 - 0
aarch64/pjibot_patrol/common/config/c_local.go

@@ -24,6 +24,7 @@ type localConfig struct {
 	OssBasePrefix        string     `yaml:"oss-base-prefix"`         // 云端配置文件的位置
 	CloudConfigFilename  string     `yaml:"cloud-config-filename"`   // 云端配置文件名称
 	CloudConfigLocalPath string     `yaml:"cloud-config-local-path"` // 将 oss 的配置文件下载到本地的位置
+	LocalWebsocketPort   string     `yaml:"local-websocket-port"`    // websocket端口号
 	RestartCmd           restartCmd `yaml:"restart-cmd"`             // 重启命令
 	EquipmentNo          string     // 当前设备的编号
 	SecretKey            string     // 当前设备的密钥

+ 225 - 0
aarch64/pjibot_patrol/common/config/c_websocket.go

@@ -0,0 +1,225 @@
+package config
+
+import (
+	"cicv-data-closedloop/common/config/c_log"
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/websocket"
+	"net/url"
+	"time"
+)
+
+var (
+	WsConn                 *websocket.Conn
+	reconnectionInProgress bool
+)
+
+// Request 结构体定义
+type Request struct {
+	Type      string      `json:"type"`
+	UUID      string      `json:"uuid"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Request1 结构体定义
+type Request1 struct {
+	Type      string      `json:"type"`
+	CommandID string      `json:"commandId"`
+	Parameter interface{} `json:"parameter"`
+}
+
+// Response 结构体定义
+type Response struct {
+	CommandID string            `json:"commandId"`
+	ErrorCode string            `json:"errorCode"`
+	Results   map[string]string `json:"results"`
+	Status    string            `json:"status"`
+	Time      int64             `json:"time"`
+	Type      string            `json:"type"`
+	UUID      string            `json:"uuid"`
+}
+
+// StatusMessage 状态消息 结构体定义
+type StatusMessage struct {
+	Type  string      `json:"type"`
+	Topic string      `json:"topic"`
+	Time  int64       `json:"time"`
+	Data  interface{} `json:"data"`
+}
+
+func keepAlive() {
+	ticker := time.NewTicker(30 * time.Second)
+	defer ticker.Stop()
+
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		//c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+	}
+
+	for {
+		select {
+		case <-ticker.C:
+			err := WsConn.WriteMessage(websocket.TextMessage, requestJSON)
+			if err != nil {
+				//c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+				WsConn.Close()
+				//c_log.GlobalLogger.Info("重试连接websocket...")
+				ConnectWebsocket() // 重新连接
+				continue
+			}
+			//c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+		}
+	}
+}
+
+func SendWebsocketHeartbeat(conn *websocket.Conn, maxRetries int) (bool, error) {
+
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	// 将请求JSON编码为字节
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		return false, fmt.Errorf("marshal request: %w", err)
+	}
+
+	// 发送WebSocket消息
+	err = conn.WriteMessage(websocket.TextMessage, requestJSON)
+	if err != nil {
+		return false, fmt.Errorf("write: %w", err)
+	}
+
+	count := 0
+	for {
+		if count > maxRetries {
+			return false, fmt.Errorf("保持websocket连接活跃,读取websocket消息超过最大重试次数。")
+		}
+		time.Sleep(1 * time.Second)
+		_, message, err := conn.ReadMessage()
+		if err != nil {
+			c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err, " 继续读取消息。")
+			return false, err
+			//continue
+		}
+
+		var response Response
+		err = json.Unmarshal(message, &response)
+		c_log.GlobalLogger.Info("response ", response)
+		if err == nil && response.Type == "response" {
+			c_log.GlobalLogger.Info("response1 ", response)
+			return true, err
+		}
+		count++
+	}
+}
+
+func sendRequestAndAwaitResponse(ws *websocket.Conn) ([]byte, error) {
+	request := Request1{
+		Type:      "request",
+		CommandID: "heart",
+		Parameter: nil,
+	}
+
+	requestJSON, err := json.Marshal(request)
+	if err != nil {
+		c_log.GlobalLogger.Error("保持websocket连接活跃,解析requestJSON - 失败。", err)
+		return nil, err
+	}
+
+	err = ws.WriteMessage(websocket.TextMessage, requestJSON)
+	if err != nil {
+		c_log.GlobalLogger.Error("保持websocket连接活跃,发送心跳请求 - 失败。", err)
+		return nil, err
+	}
+	c_log.GlobalLogger.Info("保持websocket连接活跃,发送心跳请求 - 成功。")
+
+	// 使用channel等待响应
+	responseChan := make(chan []byte)
+	go handleMessages(ws, responseChan)
+
+	select {
+	case response := <-responseChan:
+		c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 成功。")
+		return response, nil
+	case <-time.After(60 * time.Second): // 设置超时时间
+		c_log.GlobalLogger.Error("保持websocket连接活跃,等待心跳响应 - 超时。")
+		close(responseChan)
+		return nil, fmt.Errorf("保持websocket连接活跃,等待心跳响应 - 超时。")
+	}
+}
+
+func handleMessages(ws *websocket.Conn, responseChan chan<- []byte) {
+	for {
+		time.Sleep(100 * time.Millisecond)
+		_, message, err := ws.ReadMessage()
+		if err != nil {
+			c_log.GlobalLogger.Error("保持websocket连接活跃,读取websocket消息 - 失败 ", err)
+			return
+		}
+
+		var response Response
+		if err := json.Unmarshal(message, &response); err == nil && response.Type == "response" {
+			responseChan <- message
+			close(responseChan)
+			return
+		}
+	}
+}
+
+func ConnectWebsocket() {
+	for {
+		// 防止重复调用
+		if reconnectionInProgress {
+			return
+		}
+		reconnectionInProgress = true
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 开始。")
+		serverURL := LocalConfig.Node.Ip + ":" + LocalConfig.LocalWebsocketPort
+		path := "/"
+
+		// 构建WebSocket连接URL
+		u := url.URL{Scheme: "ws", Host: serverURL, Path: path}
+		//c_log.GlobalLogger.Info("URL:", u.String())
+
+		// 创建一个Dialer实例,用于建立WebSocket连接
+		dialer := websocket.Dialer{
+			ReadBufferSize:  1024,
+			WriteBufferSize: 1024,
+			// 可选:设置超时等
+			HandshakeTimeout: 5 * time.Minute,
+		}
+
+		// 建立WebSocket连接
+		coon, _, err := dialer.Dial(u.String(), nil)
+		if err != nil {
+			fmt.Println("err:", err)
+			//c_log.GlobalLogger.Error("初始化Websocket连接 - 失败。")
+			time.Sleep(5 * time.Second)
+			reconnectionInProgress = false
+			//c_log.GlobalLogger.Info("重试连接websocket...")
+			continue
+		}
+
+		WsConn = coon
+		//c_log.GlobalLogger.Info("初始化Websocket连接 - 成功。")
+		// 连接成功,退出循环
+		reconnectionInProgress = false
+		break
+	}
+}
+
+func InitWebsocketConfig() {
+	ConnectWebsocket()
+	// 保持连接活跃
+	go keepAlive()
+}

+ 3 - 1
aarch64/pjibot_patrol/common/service/disk_clean.go

@@ -27,7 +27,9 @@ func DiskClean() {
 	for {
 		time.Sleep(1000 * time.Millisecond)
 		// 1 获取磁盘占用
-		diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name)
+		//diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name) // 获取整个磁盘空间
+
+		diskUsed, _ := util.GetDirectoryDiskUsed(commonConfig.CloudConfig.Disk.Path) // 获取指定目录空间
 		if diskUsed > commonConfig.CloudConfig.Disk.Used {
 			policy := commonConfig.PlatformConfig.TaskCachePolicy
 			c_log.GlobalLogger.Errorf("磁盘占用 %v 超过 %v,触发删除规则 %v", diskUsed, commonConfig.CloudConfig.Disk.Used, policyToDescription[policy])

+ 24 - 0
aarch64/pjibot_patrol/common/service/rosbag_upload.go

@@ -7,6 +7,8 @@ import (
 	"cicv-data-closedloop/common/domain"
 	"cicv-data-closedloop/common/entity"
 	"cicv-data-closedloop/common/util"
+	commonUtil "cicv-data-closedloop/common/util"
+	"encoding/json"
 	"fmt"
 	"os"
 	"strings"
@@ -148,6 +150,8 @@ outLoop:
 			c_log.GlobalLogger.Error("上传 callback.json 文件失败:", err)
 		}
 
+		// 数据库中采集数量加一
+		collectNumPlus()
 		// 删除本地所有已上传的bag文件
 		c_log.GlobalLogger.Infof("结束处理窗口,【Lable】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.FaultTime, currentTimeWindow.Length)
 		c_log.GlobalLogger.Infof("待处理窗口个数为:%v", len(entity.TimeWindowConsumerQueue))
@@ -157,7 +161,27 @@ outLoop:
 		if len(entity.TimeWindowConsumerQueue) == 0 {
 			c_log.GlobalLogger.Infof("已处理所有窗口,重启 record 命令。")
 			ChannelKillRosRecord <- 2
+			entity.ProcessingFlag = false
 		}
 
 	}
 }
+
+func collectNumPlus() {
+	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+		commonConfig.CloudConfig.CollectNumPlus.Url,
+		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+		map[string]string{
+			"snCode": commonConfig.LocalConfig.SecretKey,
+		},
+	)
+	if err != nil {
+		c_log.GlobalLogger.Error("发送http请求修改采集数量失败:", err)
+	}
+	// 解析JSON字符串到Response结构体
+	var resp entity.Response
+	err = json.Unmarshal([]byte(responseString), &resp)
+	if err != nil {
+		c_log.GlobalLogger.Error("解析修改采集数量结果失败:", err)
+	}
+}

+ 3 - 0
aarch64/pjibot_patrol/control/main.go

@@ -25,6 +25,9 @@ func init() {
 	// 初始化业务逻辑配置信息,配置文件在oss上(第2处配置,在oss文件)
 	commonConfig.InitCloudConfig()
 	// 初始化rpc客户端,用于杀死旧的采集程序
+
+	// 初始化websocket配置
+	commonConfig.InitWebsocketConfig()
 }
 
 func main() {

+ 1 - 0
aarch64/pjibot_patrol/master/package/service/move_bag_and_send_window.go

@@ -59,6 +59,7 @@ func RunTimeWindowProducerQueue() {
 				time.Sleep(time.Duration(2) * time.Second)
 				c_log.GlobalLogger.Info("采集数据,发送record命令进程关闭信号。")
 				commonService.ChannelKillRosRecord <- 3
+				entity.ProcessingFlag = true
 				domain.SupplyCopyBags(commonConfig.CloudConfig.BagDataDir, commonConfig.CloudConfig.BagCopyDir, currentTimeWindow)
 				// 将时间窗口移出准备队列
 				entity.RemoveHeadOfTimeWindowProducerQueue()

+ 97 - 53
aarch64/pjibot_patrol/master/package/service/produce_window.go

@@ -43,9 +43,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *diagnostic_msgs.DiagnosticArray) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -55,6 +52,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfDiagnostics {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -76,9 +76,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *sensor_msgs.Imu) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -88,6 +85,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfImu {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -109,9 +109,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.LocateInfo) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -121,6 +118,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfLocateInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -142,9 +142,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *std_msgs.UInt8) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -154,6 +151,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfObstacleDetection {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -175,9 +175,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -187,6 +184,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfOdom {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -208,9 +208,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.SysInfo) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -220,6 +217,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfSysInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -241,9 +241,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *geometry_msgs.PoseStamped) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -253,6 +250,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfRobotPose {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -274,9 +274,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *pjibot_patrol_msgs.TaskFeedbackInfo) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -286,6 +283,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfTaskFeedbackInfo {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -307,9 +307,6 @@ func PrepareTimeWindowProducerQueue() {
 					Node:  commonConfig.RosNode,
 					Topic: topic,
 					Callback: func(data *nav_msgs.Odometry) {
-						if !canCollect() {
-							return
-						}
 						subscribersTimeMutexes[i].Lock()
 						if time.Since(subscribersTimes[i]).Seconds() > triggerInterval {
 							subscribersMutexes[i].Lock()
@@ -319,6 +316,9 @@ func PrepareTimeWindowProducerQueue() {
 							for _, f := range masterConfig.RuleOfWheelOdom {
 								faultLabel = f(data)
 								if faultLabel != "" {
+									if !canCollect() {
+										break
+									}
 									subscribersTimes[i] = time.Now()
 									c_log.GlobalLogger.Errorf("触发事件【%v】,开始采集。", faultLabel)
 									saveTimeWindow(faultLabel, faultHappenTime, lastTimeWindow)
@@ -414,34 +414,78 @@ func getTopicsOfNode(faultLabel string) (masterTopics []string, slaveTopics []st
 
 // 判断采集包数量是否超过限额
 func canCollect() bool {
-	responseString, err := commonUtil.HttpPostJsonWithHeaders(
-		commonConfig.CloudConfig.CollectLimit.Url,
-		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
-		map[string]string{
-			"snCode":            commonConfig.LocalConfig.SecretKey,
-			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
-			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
-			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
-			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
-		},
-	)
-	if err != nil {
-		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
-		return false
-	}
-	// 解析JSON字符串到Response结构体
-	var resp entity.Response
-	err = json.Unmarshal([]byte(responseString), &resp)
-	if err != nil {
-		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
-		return false
-	}
-	if resp.Code != 200 { // 不是200 代表不允许采集
-		return false
+	// 如果开启了采集频率限制,则云端判断采集数量是否超过限额
+	if commonConfig.CloudConfig.CollectLimit.Flag == 1 {
+		c_log.GlobalLogger.Error("当前设备已开启数采频率限制,需判断采集数量是否达到限额。")
+		responseString, err := commonUtil.HttpPostJsonWithHeaders(
+			commonConfig.CloudConfig.CollectLimit.Url,
+			map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+			map[string]string{
+				"snCode":            commonConfig.LocalConfig.SecretKey,
+				"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+				"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+				"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+				"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+			},
+		)
+		if err != nil {
+			c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+			return false
+		}
+		// 解析JSON字符串到Response结构体
+		var resp entity.Response
+		err = json.Unmarshal([]byte(responseString), &resp)
+		if err != nil {
+			c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+			return false
+		}
+		if resp.Code != 200 { // 不是200 代表不允许采集
+			c_log.GlobalLogger.Info("采集数量已超过限额,当前周期内不再采集。", resp.Code)
+			return false
+		}
+	} else {
+		c_log.GlobalLogger.Error("当前设备未开启数采频率限制,无需判断采集数量是否达到限额。")
 	}
-	if len(entity.TimeWindowConsumerQueue) != 0 {
-		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+
+	// 本地判断是否存在正在处理的数据
+	if entity.ProcessingFlag {
+		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。")
 		return false
 	}
+
+	c_log.GlobalLogger.Info("允许采集。")
 	return true
 }
+
+//func canCollect() bool {
+//	responseString, err := commonUtil.HttpPostJsonWithHeaders(
+//		commonConfig.CloudConfig.CollectLimit.Url,
+//		map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+//		map[string]string{
+//			"snCode":            commonConfig.LocalConfig.SecretKey,
+//			"collectLimitDay":   util.ToString(commonConfig.CloudConfig.CollectLimit.Day),
+//			"collectLimitWeek":  util.ToString(commonConfig.CloudConfig.CollectLimit.Week),
+//			"collectLimitMonth": util.ToString(commonConfig.CloudConfig.CollectLimit.Month),
+//			"collectLimitYear":  util.ToString(commonConfig.CloudConfig.CollectLimit.Year),
+//		},
+//	)
+//	if err != nil {
+//		c_log.GlobalLogger.Error("发送http请求获取是否允许采集失败:", err)
+//		return false
+//	}
+//	// 解析JSON字符串到Response结构体
+//	var resp entity.Response
+//	err = json.Unmarshal([]byte(responseString), &resp)
+//	if err != nil {
+//		c_log.GlobalLogger.Error("解析是否允许采集接口返回结果失败:", err)
+//		return false
+//	}
+//	if resp.Code != 200 { // 不是200 代表不允许采集
+//		return false
+//	}
+//	if len(entity.TimeWindowConsumerQueue) != 0 {
+//		c_log.GlobalLogger.Info("存在正在处理的数据,此次不再采集。", resp.Code)
+//		return false
+//	}
+//	return true
+//}

+ 5 - 0
aarch64/pjibot_patrol/巡检机器人默认配置文件-cloud-config.yaml

@@ -1,10 +1,13 @@
 ---
 collect-limit:
   url: http://36.110.106.142:12341/web_server/collect_limit/can_collect
+  flag: 1 # 数采频率限制标志 0 - 关闭数采频率限制  1 - 开启数采频率限制
   day: 3
   week: 7
   month: 30
   year: 365
+collect-num-plus:
+  url: http://36.110.106.142:12341/web_server/collect_limit/plus_collect_num
 monitor:
   url: http://36.110.106.142:12341/web_server/monitor/insert
 platform:
@@ -17,6 +20,8 @@ config-refresh-interval: 60
 disk:
   name: /dev/nvme0n1p1 # 磁盘名称
   used: 108000000000 # 磁盘占用阈值,单位bytes
+  path:
+    - /home/pji/cicv-data-closedloop
 has-one-msg: false
 bag-data-dir: /home/pji/cicv-data-closedloop/data/
 bag-copy-dir: /home/pji/cicv-data-closedloop/copy/

+ 2 - 0
aarch64/pjibot_patrol/巡检机器人默认配置文件-local-config.yaml

@@ -12,6 +12,8 @@ oss-base-prefix: pjibot-patrol/
 cloud-config-filename: cloud-config.yaml
 # 将oss上的配置文件下载到本地的路径
 cloud-config-local-path: /home/pji/cicv-data-closedloop/config/cloud-config.yaml
+# websocket端口号
+local-websocket-port: 9002
 restart-cmd:
   dir: "/home/pji/cicv-data-closedloop/"
   name: "sh"

+ 0 - 0
tools/pji_api/main/main.go → tools/pji/pji_api/main/main.go


+ 99 - 0
tools/pji/pji_time/main/main.go

@@ -0,0 +1,99 @@
+package main
+
+import (
+	"fmt"
+	"os"
+	"time"
+)
+
+type CollectWindowStruct struct {
+	Flag      int      `yaml:"flag"`
+	Days      []string `yaml:"days,omitempty"`
+	Start     string   `yaml:"start_time"`
+	End       string   `yaml:"end_time"`
+	StartTime time.Time
+	EndTime   time.Time
+}
+
+func IsTimeAllowed(currentTime time.Time, startStr, endStr string) bool {
+	// 模拟解析数据
+	cw := CollectWindowStruct{
+		Flag: 1,
+		//Days:  []string{},
+		Days:  []string{"Sunday"},
+		Start: startStr,
+		End:   endStr,
+	}
+
+	loc := currentTime.Location()
+	// 单独解析采集时间
+	startTime, err := time.ParseInLocation("15:04", cw.Start, loc)
+	if err != nil {
+		fmt.Println("云端配置文件解析采集时间【startTime】失败 ", err, "取默认值【00:00】")
+		cw.StartTime, _ = time.ParseInLocation("15:04", "00:00", loc)
+	}
+	cw.StartTime = startTime
+	endTime, err := time.ParseInLocation("15:04", cw.End, loc)
+	if err != nil {
+		fmt.Println("云端配置文件解析采集时间【endTime】失败 ", err, "取默认值【23:59】")
+		cw.EndTime, _ = time.ParseInLocation("15:04", "23:59", loc)
+	}
+	cw.EndTime = endTime
+	if len(cw.Days) == 0 {
+		// 默认设置为每天
+		cw.Days = []string{"Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"}
+	}
+
+	// todo 待测逻辑
+	if cw.Flag == 0 { // 关闭固定时间段采集, 则都返回true
+		fmt.Println("已关闭采集时间段限制")
+		return true
+	}
+	if len(cw.Days) > 0 { // 如果指定了周几
+		currentDay := currentTime.Weekday().String()
+		fmt.Println("currentDay", currentDay)
+		included := false
+		for _, day := range cw.Days {
+			if day == currentDay {
+				fmt.Println("当前时间符合规定的日期要求")
+				included = true
+				break
+			}
+		}
+		if !included {
+			fmt.Println("当前时间不符合规定的日期要求")
+			return false
+		}
+	}
+
+	start := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), cw.StartTime.Hour(), cw.StartTime.Minute(), cw.StartTime.Second(), 0, loc)
+	end := time.Date(currentTime.Year(), currentTime.Month(), currentTime.Day(), cw.EndTime.Hour(), cw.EndTime.Minute(), cw.EndTime.Second(), 0, loc)
+
+	fmt.Println("start", start)
+	fmt.Println("end", end)
+
+	if start.After(end) { // 如果时段跨天
+		end = end.AddDate(0, 0, 1)
+		fmt.Println("当前时间设置的时间段跨天,更正end时间", end)
+	}
+	return !currentTime.Before(start) && currentTime.Before(end)
+}
+
+func main() {
+	if len(os.Args) == 3 {
+		fmt.Println("接收到的参数为:", os.Args[1:])
+	} else {
+		fmt.Println("参数数量应该为【2】,请输入开始时间和结束时间")
+		os.Exit(1)
+	}
+	startStr := os.Args[1]
+	endStr := os.Args[2]
+	fmt.Println("输入的开始时间:", startStr)
+	fmt.Println("输入的结束时间:", endStr)
+	flag := IsTimeAllowed(time.Now(), startStr, endStr)
+	if flag {
+		fmt.Println("当前时间段符合规定,允许采集")
+	} else {
+		fmt.Println("当前时间段不符合规定,不允许采集")
+	}
+}

+ 0 - 0
tools/pji_websocket/main/main.go → tools/pji/pji_websocket/main/main.go