孟令鑫 1 năm trước cách đây
mục cha
commit
1906c9c0f7

+ 4 - 0
aarch64/kinglong/common/config/c_cloud.go

@@ -65,7 +65,9 @@ func InitCloudConfig() {
 	// 3 ------- 获取 yaml 字符串 -------
 	var content []byte
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
+	OssMutex.Lock()
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
+	OssMutex.Unlock()
 	if err != nil {
 		c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		os.Exit(-1)
@@ -108,7 +110,9 @@ func refreshCloudConfig() {
 	// 3 ------- 获取 yaml 字符串 -------
 	var content []byte
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
+	OssMutex.Lock()
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
+	OssMutex.Unlock()
 	if err != nil {
 		c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		return

+ 6 - 2
aarch64/kinglong/common/config/c_oss.go

@@ -6,6 +6,7 @@ import (
 	"encoding/json"
 	"github.com/aliyun/aliyun-oss-go-sdk/oss"
 	"os"
+	"sync"
 )
 
 type OssConnectInfoStruct struct {
@@ -15,8 +16,11 @@ type OssConnectInfoStruct struct {
 	BucketName      string `json:"bucketName"`
 }
 
-var OssClient *oss.Client
-var OssBucket *oss.Bucket
+var (
+	OssClient *oss.Client
+	OssBucket *oss.Bucket
+	OssMutex  sync.Mutex
+)
 
 func InitOssConfig() {
 	c_log.GlobalLogger.Info("初始化OSS客户端对象 - 开始。")

+ 33 - 28
aarch64/kinglong/common/service/rosbag_upload.go

@@ -106,44 +106,49 @@ outLoop:
 		for i, bag := range bags {
 			bagSlice := strings.Split(bag, "/")
 			c_log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
+			commonConfig.OssMutex.Lock()
 			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
+			commonConfig.OssMutex.Unlock()
 			if err != nil {
 				c_log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
 				continue
 			}
 		}
 		c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))
-		// 在上传完成的包目录同级下添加一个目录同名的json
-		triggerIds := make([]string, 0)
-		for _, label := range currentTimeWindow.Labels {
-			triggerIdToAppend := masterConfig.LabelMapTriggerId[label]
-			c_log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
-			triggerIds = append(triggerIds, triggerIdToAppend)
-		}
-		c_log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds)
-		callBackMap := map[string]interface{}{
-			"dataName":    currentTimeWindow.FaultTime,
-			"dataSize":    "", // 由合并程序补充
-			"equipmentNo": commonConfig.LocalConfig.EquipmentNo,
-			"secretKey":   commonConfig.LocalConfig.SecretKey,
-			"rosBagPath":  objectKey2,
-			"filePath":    objectKey3,
-			"taskId":      commonConfig.PlatformConfig.TaskConfigId,
-			"triggerId":   triggerIds,
-		}
-		callBackJson, err := util.MapToJsonString(callBackMap)
-		c_log.GlobalLogger.Info("【callBackJson】=", callBackJson)
-		if err != nil {
-			c_log.GlobalLogger.Error("callBackMap", callBackMap, "转 json 失败:", err)
-		}
-		err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
-		if err != nil {
-			c_log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
+		if commonConfig.LocalConfig.Node.Name == "master" {
+			// 在上传完成的包目录同级下添加一个目录同名的json
+			triggerIds := make([]string, 0)
+			for _, label := range currentTimeWindow.Labels {
+				triggerIdToAppend := masterConfig.LabelMapTriggerId[label]
+				c_log.GlobalLogger.Info("添加一个【triggerId】=", triggerIdToAppend)
+				triggerIds = append(triggerIds, triggerIdToAppend)
+			}
+			c_log.GlobalLogger.Info("json 中添加【triggerIds】=", triggerIds)
+			callBackMap := map[string]interface{}{
+				"dataName":    currentTimeWindow.FaultTime,
+				"dataSize":    "", // 由合并程序补充
+				"equipmentNo": commonConfig.LocalConfig.EquipmentNo,
+				"secretKey":   commonConfig.LocalConfig.SecretKey,
+				"rosBagPath":  objectKey2,
+				"filePath":    objectKey3,
+				"taskId":      commonConfig.PlatformConfig.TaskConfigId,
+				"triggerId":   triggerIds,
+			}
+			callBackJson, err := util.MapToJsonString(callBackMap)
+			c_log.GlobalLogger.Info("【callBackJson】=", callBackJson)
+			if err != nil {
+				c_log.GlobalLogger.Error("callBackMap", callBackMap, "转 json 失败:", err)
+			}
+			commonConfig.OssMutex.Lock()
+			err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
+			commonConfig.OssMutex.Unlock()
+			if err != nil {
+				c_log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
+			}
 		}
-
 		// 删除本地所有已上传的bag文件
 		c_log.GlobalLogger.Infof("结束处理窗口,【Label】=%v,【TriggerIds】=%v,【FaultTime】=%v,【Length】=%v", currentTimeWindow.Labels, currentTimeWindow.TriggerIds, currentTimeWindow.FaultTime, currentTimeWindow.Length)
-		if err = util.RemoveDir(dir); err != nil {
+		if err := util.RemoveDir(dir); err != nil {
 			continue outLoop
 		}
 

+ 2 - 0
aarch64/kinglong/master/package/config/master_trigger_cfg.go

@@ -35,7 +35,9 @@ func InitTriggerConfig() {
 		triggerLocalPath := config.CloudConfig.TriggersDir + fileName
 		util.CreateParentDir(triggerLocalPath)
 		c_log.GlobalLogger.Info("下载触发器插件从 ", trigger.TriggerScriptPath, " 到 ", triggerLocalPath)
+		config.OssMutex.Lock()
 		err := config.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath)
+		config.OssMutex.Unlock()
 		if err != nil {
 			c_log.GlobalLogger.Error("下载 oss 上的触发器插件失败:", err)
 			continue

+ 4 - 0
aarch64/pji/common/config/c_cloud.go

@@ -64,7 +64,9 @@ func InitCloudConfig() {
 	// 3 ------- 获取 yaml 字符串 -------
 	var content []byte
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
+	OssMutex.Lock()
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
+	OssMutex.Unlock()
 	if err != nil {
 		c_log.GlobalLogger.Error("程序崩溃,下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		os.Exit(-1)
@@ -105,7 +107,9 @@ func refreshCloudConfig() {
 	// 3 ------- 获取 yaml 字符串 -------
 	var content []byte
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
+	OssMutex.Lock()
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
+	OssMutex.Unlock()
 	if err != nil {
 		c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		//os.Exit(-1)

+ 6 - 2
aarch64/pji/common/config/c_oss.go

@@ -6,6 +6,7 @@ import (
 	"encoding/json"
 	"github.com/aliyun/aliyun-oss-go-sdk/oss"
 	"os"
+	"sync"
 )
 
 type OssConnectInfoStruct struct {
@@ -15,8 +16,11 @@ type OssConnectInfoStruct struct {
 	BucketName      string `json:"bucketName"`
 }
 
-var OssClient *oss.Client
-var OssBucket *oss.Bucket
+var (
+	OssClient *oss.Client
+	OssBucket *oss.Bucket
+	OssMutex  sync.Mutex
+)
 
 func InitOssConfig() {
 	c_log.GlobalLogger.Info("初始化OSS客户端对象 - 开始。")

+ 4 - 0
aarch64/pji/common/service/rosbag_upload.go

@@ -107,7 +107,9 @@ outLoop:
 		for i, bag := range bags {
 			bagSlice := strings.Split(bag, "/")
 			c_log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
+			commonConfig.OssMutex.Lock()
 			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
+			commonConfig.OssMutex.Unlock()
 			if err != nil {
 				c_log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
 				continue
@@ -133,7 +135,9 @@ outLoop:
 		if err != nil {
 			c_log.GlobalLogger.Error("callBackMap", callBackMap, "转json失败:", err)
 		}
+		commonConfig.OssMutex.Lock()
 		err = commonConfig.OssBucket.PutObject(objectKey3+"callback.json", strings.NewReader(callBackJson))
+		commonConfig.OssMutex.Unlock()
 		if err != nil {
 			c_log.GlobalLogger.Error("上传 callback.json", callBackJson, "失败:", err)
 		}

+ 2 - 0
aarch64/pji/master/package/config/master_trigger_cfg.go

@@ -26,7 +26,9 @@ func InitTriggerConfig() {
 		triggerLocalPath := config.CloudConfig.TriggersDir + fileName
 		_ = util.CreateParentDir(triggerLocalPath)
 		c_log.GlobalLogger.Info("下载触发器插件从", trigger.TriggerScriptPath, "到", triggerLocalPath)
+		config.OssMutex.Lock()
 		err := config.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath)
+		config.OssMutex.Unlock()
 		if err != nil {
 			c_log.GlobalLogger.Error("下载oss上的触发器插件失败:", err)
 			continue

+ 1 - 1
aarch64/pjisuv/common/config/c_cloud.go

@@ -68,11 +68,11 @@ func InitCloudConfig() {
 	cloudConfigObjectKey := LocalConfig.OssBasePrefix + LocalConfig.EquipmentNo + "/" + LocalConfig.CloudConfigFilename
 	OssMutex.Lock()
 	err := OssBucket.GetObjectToFile(cloudConfigObjectKey, LocalConfig.CloudConfigLocalPath)
+	OssMutex.Unlock()
 	if err != nil {
 		c_log.GlobalLogger.Error("下载oss上的配置文件"+cloudConfigObjectKey+"失败。", err)
 		os.Exit(-1)
 	}
-	OssMutex.Unlock()
 
 	content, err = os.ReadFile(LocalConfig.CloudConfigLocalPath)
 	if err != nil {