Browse Source

feat:根据任务调度&采集话题

HeWang 2 months ago
parent
commit
8807e73355

+ 55 - 42
aarch64/jili/common/config/c_cloud.go

@@ -81,49 +81,62 @@ func InitCloudConfig() {
 	c_log.GlobalLogger.Info("初始化OSS配置文件 - 开始。")
 	var content []byte // cloud.yaml 内容
 	var err error
-	if LocalConfig.Internet {
-		c_log.GlobalLogger.Info("车辆可以访问互联网。")
-		// 获取文件的目录
-		_ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
-		// 3 ------- 获取 yaml 字符串 -------
-		cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
-		// todo 等待时间同步
-		// 判断文件是否存在。如果不存在则使用默认的
-		isExist, err := OssBucket.IsObjectExist(cloudConfigObjectKey)
-		if err != nil {
-			c_log.GlobalLogger.Errorf("判断配置文件是否存在失败,错误信息为:%v", err)
-		}
-		if isExist {
-			c_log.GlobalLogger.Info("使用机器人自定义配置文件:", cloudConfigObjectKey)
-		} else {
-			cloudConfigObjectKey = LocalConfig.OssBasePrefix + LocalConfig.CloudConfigFilename // 默认配置文件路径
-			c_log.GlobalLogger.Info("使用默认配置文件:", cloudConfigObjectKey)
-		}
-		for {
-			OssMutex.Lock()
-			err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
-			OssMutex.Unlock()
-			if err != nil {
-				c_log.GlobalLogger.Error("下载oss上的配置文件 "+cloudConfigObjectKey+" 失败。网络授时未完成或者未配置阿里云网络映射到/etc/hosts:", err)
-				time.Sleep(time.Duration(2) * time.Second)
-				continue
-			}
+
+	// todo 测试时不从网上拉取配置文件,编译时需修改好下面内容
+	//if LocalConfig.Internet {
+	//	c_log.GlobalLogger.Info("车辆可以访问互联网。")
+	//	// 获取文件的目录
+	//	_ = util.CreateParentDir(LocalConfig.CloudConfigLocalPath)
+	//	// 3 ------- 获取 yaml 字符串 -------
+	//	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
+	//	// todo 等待时间同步
+	//	// 判断文件是否存在。如果不存在则使用默认的
+	//	isExist, err := OssBucket.IsObjectExist(cloudConfigObjectKey)
+	//	if err != nil {
+	//		c_log.GlobalLogger.Errorf("判断配置文件是否存在失败,错误信息为:%v", err)
+	//	}
+	//	if isExist {
+	//		c_log.GlobalLogger.Info("使用机器人自定义配置文件:", cloudConfigObjectKey)
+	//	} else {
+	//		cloudConfigObjectKey = LocalConfig.OssBasePrefix + LocalConfig.CloudConfigFilename // 默认配置文件路径
+	//		c_log.GlobalLogger.Info("使用默认配置文件:", cloudConfigObjectKey)
+	//	}
+	//	for {
+	//		OssMutex.Lock()
+	//		err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
+	//		OssMutex.Unlock()
+	//		if err != nil {
+	//			c_log.GlobalLogger.Error("下载oss上的配置文件 "+cloudConfigObjectKey+" 失败。网络授时未完成或者未配置阿里云网络映射到/etc/hosts:", err)
+	//			time.Sleep(time.Duration(2) * time.Second)
+	//			continue
+	//		}
+	//		break
+	//	}
+	//	if CloudConfig.RefreshCloudConfig {
+	//		go RefreshCloudConfig()
+	//	}
+	//} else {
+	//	c_log.GlobalLogger.Infof("车辆不可以访问互联网,请提前将配置文件放到【%v】。", LocalConfig.CloudConfigLocalPath)
+	//	for {
+	//		time.Sleep(time.Duration(1) * time.Second)
+	//		if util.FileExists(LocalConfig.CloudConfigLocalPath) {
+	//			c_log.GlobalLogger.Infof("配置文件【%v】已准备好。", LocalConfig.CloudConfigLocalPath)
+	//			break
+	//		} else {
+	//			c_log.GlobalLogger.Infof("配置文件【%v】没有准备好。", LocalConfig.CloudConfigLocalPath)
+	//			continue
+	//		}
+	//	}
+	//}
+
+	for {
+		time.Sleep(time.Duration(1) * time.Second)
+		if util.FileExists(LocalConfig.CloudConfigLocalPath) {
+			c_log.GlobalLogger.Infof("配置文件【%v】已准备好。", LocalConfig.CloudConfigLocalPath)
 			break
-		}
-		if CloudConfig.RefreshCloudConfig {
-			go RefreshCloudConfig()
-		}
-	} else {
-		c_log.GlobalLogger.Infof("车辆不可以访问互联网,请提前将配置文件放到【%v】。", LocalConfig.CloudConfigLocalPath)
-		for {
-			time.Sleep(time.Duration(1) * time.Second)
-			if util.FileExists(LocalConfig.CloudConfigLocalPath) {
-				c_log.GlobalLogger.Infof("配置文件【%v】已准备好。", LocalConfig.CloudConfigLocalPath)
-				break
-			} else {
-				c_log.GlobalLogger.Infof("配置文件【%v】没有准备好。", LocalConfig.CloudConfigLocalPath)
-				continue
-			}
+		} else {
+			c_log.GlobalLogger.Infof("配置文件【%v】没有准备好。", LocalConfig.CloudConfigLocalPath)
+			continue
 		}
 	}
 

+ 1 - 1
aarch64/jili/common/config/c_oss.go

@@ -46,7 +46,7 @@ func InitOssConfig() {
 	//}
 
 	ossConnectInfo := OssConnectInfoStruct{
-		Endpoint: "http://dcl.oss.icvdc.com",
+		Endpoint: "http://36.110.106.156:20204/",
 		//Endpoint:        "http://oss-cn-beijing-gqzl-d01-a.ops.gqzl-cloud.com/",
 		AccessKeyId:     "n8glvFGS25MrLY7j",
 		AccessKeySecret: "xZ2Fozoarpfw0z28FUhtg8cu0yDc5d",

+ 18 - 112
aarch64/jili/common/config/c_platform.go

@@ -12,15 +12,15 @@ import (
 	"time"
 )
 
-type taskTrigger struct {
-	TriggerId              int    `json:"triggerId"`              // 触发器id
+type TaskTrigger struct {
+	TriggerId              string `json:"triggerId"`              // 触发器id
 	TriggerName            string `json:"triggerName"`            // 触发器名称
 	TriggerScriptPath      string `json:"triggerScriptPath"`      // 脚本在oss上的path
 	ListenTopic            string `json:"listenTopic"`            // 触发器监听的topic
 	CollectionTopic        string `json:"collectionTopic"`        // 触发器采集的topic
-	CollectionPreSeconds   string `json:"collectionPreSeconds"`   // 采集前几秒的数据
-	CollectionAfterSeconds string `json:"collectionAfterSeconds"` // 采集后几秒的数据
-	Priority               string `json:"priority"`               // 优先级, 数值越小越优先
+	CollectionPreSeconds   int    `json:"collectionPreSeconds"`   // 采集前几秒的数据
+	CollectionAfterSeconds int    `json:"collectionAfterSeconds"` // 采集后几秒的数据
+	Priority               int    `json:"priority"`               // 优先级, 数值越小越优先
 }
 
 type PlatformConfigStruct struct {
@@ -34,7 +34,7 @@ type PlatformConfigStruct struct {
 	CollectionStrategy int           `json:"collectionStrategy"` // 采集策略
 	UploadStrategy     int           `json:"uploadStrategy"`     // 上传策略
 	CompressionType    int           `json:"compressionType"`    // 压缩类型
-	TaskTriggers       []taskTrigger `json:"taskTriggers"`       // 触发器信息
+	TaskTriggers       []TaskTrigger `json:"taskTriggers"`       // 触发器信息
 }
 
 type response struct {
@@ -46,9 +46,10 @@ type response struct {
 }
 
 var (
-	PlatformConfig  PlatformConfigStruct
-	PlatformConfig1 []PlatformConfigStruct
-	RecordTopics    []string
+	PlatformConfig PlatformConfigStruct
+	PlatformTasks  []PlatformConfigStruct
+	RecordTopics   []string
+	CurrTask       PlatformConfigStruct // 正在运行中的任务
 )
 
 // InitPlatformConfig 初始化数据闭环平台的配置
@@ -61,110 +62,19 @@ func InitPlatformConfig() {
 	// 1 如果车辆没有配置任务,则阻塞在这里,不启动任务
 	for {
 		time.Sleep(time.Duration(2) * time.Second)
-		PlatformConfig1, err = GetConfig()
+		PlatformTasks, err = GetConfig()
 		if err != nil {
 			c_log.GlobalLogger.Error("获取配置status失败:", err)
 			continue
 		}
-		//if checkPlatformConfig() {
-		//	RecordTopics = strings.Split(PlatformConfig.EquipmentTopic, ",")
-		//	// 去掉首尾空格
-		//	for i, topic := range RecordTopics {
-		//		RecordTopics[i] = strings.TrimSpace(topic)
-		//	}
-		//	break
-		//}
+		if checkPlatformTasks() {
+			break
+		}
 	}
+	c_log.GlobalLogger.Infof("获取数据闭环平台配置 - 成功。")
 	//c_log.GlobalLogger.Infof("获取数据闭环平台配置 - 成功。【触发前】=【%v 秒】,触发后=【%v 秒】,【最大窗口】=【%v 秒】。", PlatformConfig.TaskBeforeTime, PlatformConfig.TaskAfterTime, PlatformConfig.TaskMaxTime)
 }
 
-/*
-	{
-	  "data": {
-	    "accessToken": "YWRmYWRzZmFzZGZhZHNmYWRmYWRm=",
-	    "expireTime": "28800",
-	    "equipmentNo": "robot-001"
-	  },
-	  "success": true,
-	  "message": "ok",
-	  "code": 1,
-	  "nowTime": "2023-12-09 22:41:00"
-	}
-*/
-// 认证接口,获取access_token
-func GetAccessToken() (string, error) {
-	url := &CloudConfig.Platform.UrlDeviceAuth
-	param := &map[string]string{
-		"equipmentNo": LocalConfig.EquipmentNo,
-		"secretKey":   LocalConfig.SecretKey,
-	}
-	respJson, err := util.HttpPostJsonResponseString(
-		*url,
-		*param,
-	)
-	if err != nil {
-		return "", nil
-	}
-	respMap, err := util.JsonStringToMap(respJson)
-	if err != nil {
-		c_log.GlobalLogger.Errorf("解析返回结果【%v】失败,请求地址为【%v】,请求参数为【%v】:%v", respJson, *url, *param, err)
-		return "", nil
-	}
-
-	dataMap, ok := respMap["data"].(map[string]interface{})
-	if !ok {
-		c_log.GlobalLogger.Error("解析返回结果.data", dataMap, "失败:", err)
-		return "", nil
-	}
-	return dataMap["accessToken"].(string), nil
-}
-
-/*
-	{
-	  "data": {
-	    "status": "UNCHANGE"
-	    "taskConfigld": "xxx"
-	  },
-	  "success": true,
-	  "message": "ok",
-	  "code": 1,
-	  "nowTime": "2023-12-09 21:08:49"
-	}
-*/
-//GetStatus 根据taskConfigId获取任务status,如果传入空代表车端没有配置,直接获取新的配置
-func GetStatus(taskConfigId string) (string, error) {
-	token, err := GetAccessToken()
-	if err != nil {
-		return "", err
-	}
-	resp, err := util.HttpGetStringAddHeadersResponseString(
-		CloudConfig.Platform.UrlTaskPoll,
-		map[string]string{
-			"authorization": token,
-		},
-		map[string]string{
-			"equipmentNo":  LocalConfig.EquipmentNo,
-			"taskConfigId": taskConfigId,
-		},
-	)
-
-	if err != nil {
-		c_log.GlobalLogger.Error("访问接口", CloudConfig.Platform.UrlTask, "失败:", err)
-		return "", err
-	}
-	respMap, err := util.JsonStringToMap(resp)
-	if err != nil {
-		c_log.GlobalLogger.Error("解析【返回结果1】", resp, "失败:", err)
-		return "", err
-	}
-	dataMap, ok := respMap["data"].(map[string]interface{})
-	if !ok {
-		c_log.GlobalLogger.Errorf("解析【返回结果.data】的类型不是(map[string]interface{}),【dataMap】=%v", dataMap)
-		return "", err
-	}
-	return dataMap["status"].(string), nil
-}
-
 func GetConfig() ([]PlatformConfigStruct, error) {
 	queryParams := map[string]string{
 		"equipmentNo": LocalConfig.SecretKey,
@@ -201,15 +111,11 @@ func GetConfig() ([]PlatformConfigStruct, error) {
 	return result.Data, nil
 }
 
-func checkPlatformConfig() bool {
-	if PlatformConfig.TaskConfigId == "" {
-		c_log.GlobalLogger.Error("数据闭环平台没有配置任务。")
+func checkPlatformTasks() bool {
+	if len(PlatformTasks) == 0 {
+		c_log.GlobalLogger.Error("数据闭环平台中该车辆今天没有配置任务。")
 		return false
 	}
-	//if PlatformConfig.EquipmentTopic == "" {
-	//	c_log.GlobalLogger.Error("数据闭环平台没有配置topic序列。")
-	//	return false
-	//}
 	return true
 }
 

+ 111 - 57
aarch64/jili/control/main.go

@@ -5,9 +5,11 @@ import (
 	commonService "cicv-data-closedloop/aarch64/jili/common/service"
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
+	"fmt"
 	"net/rpc"
 	"os"
 	"runtime"
+	"strings"
 	"time"
 )
 
@@ -32,80 +34,78 @@ func main() {
 }
 
 func RunWithInternet() {
-	lastStatus := "NONE"
 	wait := false
-	//  轮询任务接口判断是否有更新
+	//  轮询任务接口, 决定数采程序启停
 	for {
 		if wait { // 第一次就不等待一分钟了
 			time.Sleep(time.Duration(60) * time.Second)
 		}
 		wait = true
-		// 1 获取当前设备的任务的 status
-		status, err := commonConfig.GetStatus(commonConfig.PlatformConfig.TaskConfigId)
+
+		// 1. 获取当前设备的任务配置
+		platformTasks, err := commonConfig.GetConfig()
+		fmt.Println("platformTasks", platformTasks)
+		commonConfig.PlatformTasks = platformTasks
 		if err != nil {
-			c_log.GlobalLogger.Error("获取配置status失败:", err)
+			c_log.GlobalLogger.Error("获取平台任务失败:", err)
 			continue
 		}
-		c_log.GlobalLogger.Info("【lastStatus】=", lastStatus, ",【status】=", status)
-		// 2 判断 status
-		// UN_CHANGE 没有新的任务,无需更改
-		// CHANGE 有新的任务,需要杀死旧的任务并重启
-		// NONE 设备没有配置任务,需要杀死旧的任务
-		if status == "UN_CHANGE" {
-			lastStatus = "UN_CHANGE"
-			continue
-		} else if status == "CHANGE" || status == "NONE" {
-			if lastStatus == "CHANGE" && status == "CHANGE" { // 供更新使用
-				commonConfig.InitPlatformConfig()
-				continue
-			}
-			if lastStatus == "NONE" && status == "NONE" {
+
+		// 2. 判断当前时间是否有需要执行的任务
+		for _, task := range platformTasks {
+			flag := shouldExecuteTask(task.StartTime, task.EndTime)
+			fmt.Println("flag", flag)
+			// 2.1 当前时间不执行此任务,跳过
+			if !flag {
 				continue
 			}
-			// 3 发送rpc信号杀死两个服务,并重启程序
-			if lastStatus == "NONE" && status == "CHANGE" {
-				startMasterOrSlave()
-				lastStatus = status
-				c_log.GlobalLogger.Info("获取数据闭环平台最新配置。")
-				commonConfig.InitPlatformConfig()
-				continue
+			// 2.2 当前时间需要执行此任务
+			// 2.2.1 判断此任务是否正在执行
+			if commonConfig.CurrTask.TaskConfigId == task.TaskConfigId { // 此任务正在执行,退出循环
+				break
 			}
-			var killArgs commonService.KillSignal
-			//if lastStatus == "UN_CHANGE" && status == "CHANGE" {
-			//	killArgs = commonService.KillSignal{DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: true}
-			//	c_log.GlobalLogger.Info("更新任务,发送rpc重启信号到本地:", killArgs)
-			//}
-			//if lastStatus == "UN_CHANGE" && status == "NONE" {
-			//	killArgs = commonService.KillSignal{DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: false}
-			//	c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号到本地:", killArgs)
-			//}
-
-			KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
-			if err != nil {
-				// 此处如果连接失败说明采集程序已经停止了
-				lastStatus = "NONE"
-				c_log.GlobalLogger.Error("采集程序已经停止:", err)
-				continue
+			// 此任务没有在执行,需要启动此任务
+			// 2.2.2 判断是否有其他任务正在执行
+			if commonConfig.CurrTask.TaskConfigId != "" { // 存在其他任务正在执行,杀死旧的任务
+				c_log.GlobalLogger.Info("需要执行的任务id为", task.TaskConfigId, ", 当前任务id为:", commonConfig.CurrTask.TaskConfigId, ", 需要杀死该任务。")
+				// 杀死旧的任务
+				var killArgs commonService.KillSignal
+				// todo 吉利新平台没有DropUploadData字段(旧平台,已采数据:保留/丢弃)
+				killArgs = commonService.KillSignal{DropUploadData: true, Restart: false}
+				c_log.GlobalLogger.Info("杀死任务,发送rpc结束信号到本地:", killArgs)
+				KillRpcClient, err := rpc.Dial("tcp", commonConfig.LocalConfig.Node.Ip+":"+commonConfig.CloudConfig.RpcPort)
+				if err != nil {
+					// 此处如果连接失败说明采集程序已经停止了
+					commonConfig.RecordTopics = []string{}
+					commonConfig.CurrTask = commonConfig.PlatformConfigStruct{}
+					c_log.GlobalLogger.Error("采集程序已经停止:", err)
+					continue
+				}
+				reply := 0
+				if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
+					c_log.GlobalLogger.Error("发送rpc请求到master失败:", err)
+					// 这里可能会报错 unexpected EOF 但是不影响,先注释 close 和 continue
+					//KillRpcClient.Close()
+					//continue
+				}
+				c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
+				commonConfig.RecordTopics = []string{}
+				commonConfig.CurrTask = commonConfig.PlatformConfigStruct{}
+				if err = KillRpcClient.Close(); err != nil {
+					// 不做处理
+				}
 			}
 
-			reply := 0
-			if err = KillRpcClient.Call("KillService.Kill", killArgs, &reply); err != nil {
-				c_log.GlobalLogger.Error("发送rpc请求到master失败:", err)
-				// 这里可能会报错 unexpected EOF 但是不影响,先注释 close 和 continue
-				//KillRpcClient.Close()
-				//continue
-			}
-			lastStatus = status
-			c_log.GlobalLogger.Info("结束任务后,将数据闭环平台配置置空。")
-			commonConfig.PlatformConfig = commonConfig.PlatformConfigStruct{}
-			if err = KillRpcClient.Close(); err != nil {
-				// 不做处理
-			}
-		} else {
-			c_log.GlobalLogger.Error("未知的采集任务状态。【status】=", status)
+			// 开启新的任务
+			startMasterOrSlave()
+			commonConfig.RecordTopics = getUniqueCollectionTopics(task.TaskTriggers)
+			commonConfig.CurrTask = task
+			// 退出循环
+			break
 		}
 	}
 }
+
 func RunWithoutInternet() {
 	startMasterOrSlave()
 }
@@ -117,3 +117,57 @@ func startMasterOrSlave() {
 	}
 	c_log.GlobalLogger.Info("启动任务,本地执行启动命令:【path】=", commonConfig.LocalConfig.RestartCmd.Dir, "【cmd】=", commonConfig.LocalConfig.RestartCmd.Name, commonConfig.LocalConfig.RestartCmd.Args)
 }
+
+// shouldExecuteTask 给定任务起止时间,判断当前时刻此任务是否应该执行
+func shouldExecuteTask(startTimeStr, endTimeStr string) bool {
+	// 定义时间格式(根据输入的时间字符串格式)
+	timeFormat := "2006-01-02 15:04:05"
+
+	// 注意: time.Parse 时间没有考虑时区,当做默认的 +0 时区 UTC 时间来解析了,就造成了实际时间比预期的 +8 小时
+	// 解析 startTime 和 endTime
+	startTime, err := time.ParseInLocation(timeFormat, startTimeStr, time.Local)
+	//fmt.Println("startTime", startTime)
+	if err != nil {
+		fmt.Println("Error parsing startTime:", err)
+		return false
+	}
+	endTime, err := time.ParseInLocation(timeFormat, endTimeStr, time.Local)
+	//fmt.Println("endTime", endTime)
+	if err != nil {
+		fmt.Println("Error parsing endTime:", err)
+		return false
+	}
+
+	// 获取当前时间
+	currentTime := time.Now()
+	//fmt.Println("currentTime", currentTime)
+
+	// 判断当前时间是否在 startTime 和 endTime 之间
+	return !currentTime.Before(startTime) && !currentTime.After(endTime)
+}
+
+// getUniqueCollectionTopics 统计需要采集的 topic 并去重
+func getUniqueCollectionTopics(taskTriggers []commonConfig.TaskTrigger) []string {
+	// 使用 map 去重
+	topicMap := make(map[string]struct{})
+
+	// 遍历 taskTriggers
+	for _, trigger := range taskTriggers {
+		// 分割 collectionTopic 字段
+		topics := strings.Split(trigger.CollectionTopic, ",")
+		for _, topic := range topics {
+			trimmedTopic := strings.TrimSpace(topic) // 去除空格
+			if trimmedTopic != "" {                  // 忽略空字符串
+				topicMap[trimmedTopic] = struct{}{}
+			}
+		}
+	}
+
+	// 将 map 中的 key 转换为切片
+	var uniqueTopics []string
+	for topic := range topicMap {
+		uniqueTopics = append(uniqueTopics, topic)
+	}
+
+	return uniqueTopics
+}

+ 2 - 1
aarch64/pjisuv/control/main.go

@@ -63,7 +63,7 @@ func RunWithInternet() {
 			if lastStatus == "NONE" && status == "NONE" {
 				continue
 			}
-			// 3 发送rpc信号杀死两个服务,并重启程序
+
 			if lastStatus == "NONE" && status == "CHANGE" {
 				startMasterOrSlave()
 				lastStatus = status
@@ -71,6 +71,7 @@ func RunWithInternet() {
 				commonConfig.InitPlatformConfig()
 				continue
 			}
+			// 3 发送rpc信号杀死两个服务,并重启程序
 			var killArgs commonService.KillSignal
 			if lastStatus == "UN_CHANGE" && status == "CHANGE" {
 				killArgs = commonService.KillSignal{DropUploadData: commonConfig.PlatformConfig.DropUploadData, Restart: true}

+ 61 - 10
test/platform_api_test.go

@@ -5,23 +5,74 @@ import (
 	"cicv-data-closedloop/common/config/c_log"
 	"fmt"
 	"testing"
+	"time"
 )
 
-//
-//func init() {
-//	// 初始化日志配置
-//	c_log.InitLog(commonConfig.LogDir, commonConfig.ControlLogPrefix)
-//	// 初始化本地配置文件(第1处配置,在本地文件)
-//	commonConfig.InitLocalConfig()
-//	commonConfig.InitOssConfig()
-//	//commonConfig.InitCloudConfig()
-//}
+func init() {
+	// 初始化日志配置
+	c_log.InitLog(commonConfig.LogDir, commonConfig.ControlLogPrefix)
+	// 初始化本地配置文件(第1处配置,在本地文件)
+	commonConfig.InitLocalConfig()
+	commonConfig.InitOssConfig()
+	commonConfig.InitCloudConfig()
+}
 
 func TestGetConfig(t *testing.T) {
 	fmt.Println(commonConfig.LocalConfig.EquipmentNo)
 	PlatformConfig, err := commonConfig.GetConfig()
 	fmt.Println("PlatformConfig", PlatformConfig)
 	if err != nil {
-		c_log.GlobalLogger.Error("获取配置status失败:", err)
+		fmt.Println("获取配置status失败:", err)
+	}
+}
+
+func TestControl(t *testing.T) {
+	wait := false
+	//  轮询任务接口判断是否有更新
+	for {
+		if wait { // 第一次就不等待一分钟了
+			time.Sleep(time.Duration(60) * time.Second)
+		}
+		wait = true
+		// 1. 获取当前设备的任务配置
+		platformTasks, err := commonConfig.GetConfig()
+		fmt.Println("platformTasks", platformTasks)
+		commonConfig.PlatformTasks = platformTasks
+		if err != nil {
+			c_log.GlobalLogger.Error("获取平台任务失败:", err)
+			continue
+		}
+
+		// 2. 判断当前时间是否有需要执行的任务
+		for _, task := range platformTasks {
+			flag := shouldExecuteTask(task.StartTime, task.EndTime)
+			fmt.Println("flag", flag)
+		}
+	}
+}
+
+func shouldExecuteTask(startTimeStr, endTimeStr string) bool {
+	// 定义时间格式(根据输入的时间字符串格式)
+	timeFormat := "2006-01-02 15:04:05"
+
+	// 解析 startTime 和 endTime
+	startTime, err := time.ParseInLocation(timeFormat, startTimeStr, time.Local)
+	fmt.Println("startTime", startTime)
+	if err != nil {
+		fmt.Println("Error parsing startTime:", err)
+		return false
 	}
+	endTime, err := time.ParseInLocation(timeFormat, endTimeStr, time.Local)
+	fmt.Println("endTime", endTime)
+	if err != nil {
+		fmt.Println("Error parsing endTime:", err)
+		return false
+	}
+
+	// 获取当前时间
+	currentTime := time.Now()
+	fmt.Println("currentTime", currentTime)
+
+	// 判断当前时间是否在 startTime 和 endTime 之间
+	return !currentTime.Before(startTime) && !currentTime.After(endTime)
 }