LingxinMeng 1 year ago
parent
commit
751dcc61c1

+ 11 - 7
aarch64/kinglong/common/service/rosbag_upload.go

@@ -104,14 +104,18 @@ outLoop:
 		objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
 		objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		for i, bag := range bags {
+			startOne := time.Now()
 			bagSlice := strings.Split(bag, "/")
-			c_log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
-			commonConfig.OssMutex.Lock()
-			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
-			commonConfig.OssMutex.Unlock()
-			if err != nil {
-				c_log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
-				continue
+			for {
+				commonConfig.OssMutex.Lock()
+				err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
+				commonConfig.OssMutex.Unlock()
+				if err != nil {
+					c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
+					continue
+				}
+				c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
+				break
 			}
 		}
 		c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))

+ 11 - 8
aarch64/pji/common/service/rosbag_upload.go

@@ -103,16 +103,19 @@ outLoop:
 		objectKey1 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
 		objectKey2 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_merge/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + ".bag"
 		objectKey3 := commonConfig.LocalConfig.OssBasePrefix + commonConfig.LocalConfig.EquipmentNo + "/data_parse/" + currentTimeWindow.FaultTime + "_" + strings.Join(currentTimeWindow.Labels, "_") + "_" + fmt.Sprintf("%d", bagNumber) + "/"
-
 		for i, bag := range bags {
+			startOne := time.Now()
 			bagSlice := strings.Split(bag, "/")
-			c_log.GlobalLogger.Info("正在上传中,【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
-			commonConfig.OssMutex.Lock()
-			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
-			commonConfig.OssMutex.Unlock()
-			if err != nil {
-				c_log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
-				continue
+			for {
+				commonConfig.OssMutex.Lock()
+				err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
+				commonConfig.OssMutex.Unlock()
+				if err != nil {
+					c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
+					continue
+				}
+				c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
+				break
 			}
 		}
 		c_log.GlobalLogger.Info("上传完成,花费时间:", time.Since(start))

+ 1 - 118
aarch64/pji/master/package/config/master_trigger_cfg.go

@@ -38,6 +38,7 @@ var (
 )
 
 func InitTriggerConfig() {
+	c_log.GlobalLogger.Infof("一共有%v个触发器:", len(config.PlatformConfig.TaskTriggers))
 	// 下载所有触发器的文件
 	for _, trigger := range config.PlatformConfig.TaskTriggers {
 		// 获取文件名
@@ -140,121 +141,3 @@ func InitTriggerConfig() {
 	}
 	c_log.GlobalLogger.Info("主节点加载触发器插件 - 成功。")
 }
-
-//
-//var (
-//	TopicOfOdom              = "/odom"
-//	RuleOfOdom               []func(data *nav_msgs.Odometry) string
-//	TopicOfObstacleDetection = "/obstacle_detection"
-//	RuleOfObstacleDetection  []func(data *std_msgs.UInt8) string
-//	TopicOfSysInfo           = "/sys_info"
-//	RuleOfSysInfo            []func(data *pji_msgs.SysInfo) string
-//	TopicOfLocateInfo        = "/locate_info"
-//	RuleOfLocateInfo         []func(data *pji_msgs.LocateInfo) string
-//	TopicOfImu               = "/imu"
-//	RuleOfImu                []func(data *sensor_msgs.Imu) string
-//	TopicOfDiagnostics       = "/diagnostics"
-//	RuleOfDiagnostics        []func(data *diagnostic_msgs.DiagnosticArray) string
-//	LabelMapTriggerId        = make(map[string]string)
-//
-//)
-//
-//func InitTriggerConfig() {
-//	// 下载所有触发器的文件
-//	for _, trigger := range config.PlatformConfig.TaskTriggers {
-//		// 获取文件名
-//		slice := strings.Split(trigger.TriggerScriptPath, "/")
-//		fileName := slice[len(slice)-1]
-//		// 下载
-//		triggerLocalPath := config.CloudConfig.TriggersDir + fileName
-//		cutil.CreateParentDir(triggerLocalPath)
-//		c_log.GlobalLogger.Info("下载触发器插件从", trigger.TriggerScriptPath, "到", triggerLocalPath)
-//		err := config.OssBucket.GetObjectToFile(trigger.TriggerScriptPath, triggerLocalPath)
-//		if err != nil {
-//			c_log.GlobalLogger.Error("下载oss上的触发器插件失败:", err)
-//			continue
-//		}
-//		// 载入插件到数组
-//		open, err := plugin.Open(triggerLocalPath)
-//		if err != nil {
-//			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "失败。", err)
-//		}
-//		topic0, err := open.Lookup("Topic")
-//		if err != nil {
-//			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Topic方法失败。", err)
-//			continue
-//		}
-//		topic1, ok := topic0.(func() string)
-//		if ok != true {
-//			c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func() string):", err)
-//			continue
-//		}
-//		topic2 := topic1()
-//		rule, err := open.Lookup("Rule")
-//		if err != nil {
-//			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的Rule方法失败。", err)
-//			continue
-//		}
-//		if TopicOfOdom == topic2 {
-//			f, ok := rule.(func(data *nav_msgs.Odometry) string)
-//			if ok != true {
-//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *nav_msgs.Odometry) string):", err)
-//				continue
-//			}
-//			RuleOfOdom = append(RuleOfOdom, f)
-//		} else if TopicOfObstacleDetection == topic2 {
-//			f, ok := rule.(func(data *std_msgs.UInt8) string)
-//			if ok != true {
-//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *std_msgs.UInt8) string):", err)
-//				continue
-//			}
-//			RuleOfObstacleDetection = append(RuleOfObstacleDetection, f)
-//		} else if TopicOfSysInfo == topic2 {
-//			f, ok := rule.(func(data *pji_msgs.SysInfo) string)
-//			if ok != true {
-//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *pji_msgs.SysInfo) string):", err)
-//				continue
-//			}
-//			RuleOfSysInfo = append(RuleOfSysInfo, f)
-//		} else if TopicOfLocateInfo == topic2 {
-//			f, ok := rule.(func(data *pji_msgs.LocateInfo) string)
-//			if ok != true {
-//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *pji_msgs.LocateInfo) string):", err)
-//				continue
-//			}
-//			RuleOfLocateInfo = append(RuleOfLocateInfo, f)
-//		} else if TopicOfImu == topic2 {
-//			f, ok := rule.(func(data *sensor_msgs.Imu) string)
-//			if ok != true {
-//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *sensor_msgs.Imu) string):", err)
-//				continue
-//			}
-//			RuleOfImu = append(RuleOfImu, f)
-//		} else if TopicOfDiagnostics == topic2 {
-//			f, ok := rule.(func(data *diagnostic_msgs.DiagnosticArray) string)
-//			if ok != true {
-//				c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Topic方法必须是(func(data *diagnostic_msgs.DiagnosticArray) string):", err)
-//				continue
-//			}
-//			RuleOfDiagnostics = append(RuleOfDiagnostics, f)
-//		} else {
-//			c_log.GlobalLogger.Error("未知的topic:", topic2)
-//			continue
-//		}
-//
-//		label, err := open.Lookup("Label")
-//		if err != nil {
-//			c_log.GlobalLogger.Error("加载本地插件", triggerLocalPath, "中的TriggerName方法失败。", err)
-//			continue
-//		}
-//		labelFunc, ok := label.(func() string)
-//		if ok != true {
-//			c_log.GlobalLogger.Error("插件", triggerLocalPath, "中的Label方法必须是(func() string):", err)
-//			continue
-//		}
-//		labelString := labelFunc()
-//		LabelMapTriggerId[labelString] = strconv.Itoa(trigger.TriggerId)
-//		c_log.GlobalLogger.Info("主节点加载触发器插件:【ros topic】=", topic2, ",【触发器label】=", labelString, "【触发器ID】=", trigger.TriggerId, "【label和id映射关系】=", LabelMapTriggerId)
-//	}
-//	c_log.GlobalLogger.Info("主节点加载触发器插件 - 成功。")
-//}

+ 10 - 7
aarch64/pjisuv/common/service/rosbag_upload.go

@@ -107,14 +107,17 @@ outLoop:
 		for i, bag := range bags {
 			startOne := time.Now()
 			bagSlice := strings.Split(bag, "/")
-			commonConfig.OssMutex.Lock()
-			err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
-			commonConfig.OssMutex.Unlock()
-			if err != nil {
-				c_log.GlobalLogger.Info("上传包 ", bag, " 时报错:", err)
-				continue
+			for {
+				commonConfig.OssMutex.Lock()
+				err := commonConfig.OssBucket.PutObjectFromFile(objectKey1+bagSlice[len(bagSlice)-1], bag)
+				commonConfig.OssMutex.Unlock()
+				if err != nil {
+					c_log.GlobalLogger.Info("因网络原因上传包 ", bag, " 时报错,需要等待网络恢复后重新上传:", err)
+					continue
+				}
+				c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
+				break
 			}
-			c_log.GlobalLogger.Info("上传耗时 ", time.Since(startOne), ",【FaultTime】=", currentTimeWindow.FaultTime, "【Label】=", currentTimeWindow.Labels, ",进度", i+1, "/", bagNumber, "。【", bag, "】-------【", objectKey1+bagSlice[len(bagSlice)-1], "】")
 		}
 		c_log.GlobalLogger.Info("上传完成,总耗时:", time.Since(start))
 		if commonConfig.LocalConfig.Node.Name == "node1" {

+ 1 - 1
aarch64/topic-echo/main/main.go

@@ -28,7 +28,7 @@ func main() {
 	eType := util.ToString(os.Args[1])
 	topic := util.ToString(os.Args[2])
 
-	if eType == "pjisuv" {
+	if eType == "pjibot" {
 		//1 一致
 		if topic == pjiConfig.TopicOfDiagnostics {
 			_, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{