LingxinMeng vor 1 Jahr
Ursprung
Commit
0bdd9768ab

+ 70 - 5
aarch64/pjisuv/master/service/for_competition.go

@@ -4,6 +4,7 @@ import (
 	commonConfig "cicv-data-closedloop/aarch64/pjisuv/common/config"
 	"cicv-data-closedloop/common/config/c_log"
 	"cicv-data-closedloop/common/util"
+	"cicv-data-closedloop/pjisuv_msgs"
 	"github.com/bluenviron/goroslib/v2"
 	"github.com/bluenviron/goroslib/v2/pkg/msgs/std_msgs"
 	"os"
@@ -14,21 +15,27 @@ import (
 
 var (
 	dir                    = "/userdata/competition/"
+	dirBak                 = "/userdata/competition-bak/"
 	commandArgs            = []string{"record", "--split", "--duration=1", "/pji_gps", "/data_read"}
 	topic                  = "/cicv_competition"
+	urlExamTick            = "http://36.110.106.142:12341/web_server/exam/tick"
 	urlExamBegin           = "http://36.110.106.142:12341/web_server/exam/begin"
 	urlExamEnd             = "http://36.110.106.142:12341/web_server/exam/end"
 	cacheMutex             sync.Mutex
 	cacheTeamName          = make(map[string]time.Time)
 	heartBeatTimeThreshold = 5 * time.Second // 心跳时间
+	bakSecondNumber        = 3600
+	cachePositionX         = -999.00
+	cachePositionY         = -999.00
 )
 
 // todo 实车比赛临时使用
 // history record命令无法录制()
 func ForCompetition() {
 	go dataCollection()
-	go examBegin()
-	go examEnd()
+	go dataCollectionBak(bakSecondNumber) // 需要再车上备份的数据,防止网络差传不回来
+	go location()                         // 记录经纬度和速度
+	go tick()
 }
 
 // 全量数据采集
@@ -47,7 +54,7 @@ func dataCollection() {
 		time.Sleep(time.Duration(2) * time.Second)
 		files, _ := util.ListAbsolutePathAndSort(dir)
 		if len(files) >= 2 {
-			c_log.GlobalLogger.Info("扫描车比赛数据采集目录,", files)
+			c_log.GlobalLogger.Info("扫描车比赛数据采集目录,", files)
 			for i := range files {
 				if i == len(files)-1 { // 最后一个包在录制中,不上传
 					break
@@ -63,9 +70,67 @@ func dataCollection() {
 	}
 }
 
+// 全量数据采集
+func dataCollectionBak(bakSecondNumber int) {
+	c_log.GlobalLogger.Info("开始备份实车算法比赛全量数据。")
+	util.CreateDir(dirBak)
+	// 1 打包
+	command, err := util.ExecuteWithEnvAndDirAsync(commonConfig.RosbagEnvs, dirBak, commonConfig.RosbagPath, commandArgs...)
+	if err != nil {
+		c_log.GlobalLogger.Error("程序崩溃。备份目录执行record命令", command, "出错:", err)
+		os.Exit(-1)
+	}
+	// 2 扫描目录文件
+	for {
+		time.Sleep(time.Duration(2) * time.Second)
+		files, _ := util.ListAbsolutePathAndSort(dirBak)
+		if len(files) >= bakSecondNumber {
+			// 超出阈值就删除心跳+1个文件
+			_ = util.DeleteFile(files[0])
+			_ = util.DeleteFile(files[1])
+			_ = util.DeleteFile(files[2])
+		}
+	}
+}
+
+// data格式为队伍编号
+// 保存单次考试时间区间
+func location() {
+	_, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
+		Node:  commonConfig.RosNode,
+		Topic: "/cicv_location",
+		Callback: func(data *pjisuv_msgs.PerceptionLocalization) {
+			cachePositionX = data.PositionX
+			cachePositionY = data.PositionY
+		},
+	})
+}
+
+// data格式为队伍编号
+// 保存单次考试时间区间
+func tick() {
+	_, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
+		Node:  commonConfig.RosNode,
+		Topic: topic,
+		Callback: func(data *std_msgs.String) {
+			teamName := data.Data
+			_, _ = util.HttpPostJsonWithHeaders(
+				urlExamTick,
+				map[string]string{"Authorization": "U9yKpD6kZZDDe4LFKK6myAxBUT1XRrDM"},
+				map[string]string{
+					"teamName":  teamName,
+					"positionX": util.ToString(cachePositionX),
+					"positionY": util.ToString(cachePositionY),
+				},
+			)
+			c_log.GlobalLogger.Infof("队伍 %v 的心跳发送到云端成功。", teamName)
+		},
+	})
+}
+
 // data格式为队伍编号
 // 保存单次考试时间区间
-func examBegin() {
+func examBeginBak() {
 	_, _ = goroslib.NewSubscriber(goroslib.SubscriberConf{
 		Node:  commonConfig.RosNode,
 		Topic: topic,
@@ -91,7 +156,7 @@ func examBegin() {
 	})
 }
 
-func examEnd() {
+func examEndBak() {
 	for {
 		time.Sleep(time.Duration(1) * time.Second)
 		cacheMutex.Lock()

+ 6 - 2
amd64/dispatch_server/main.go

@@ -69,8 +69,12 @@ func main() {
 	// 启动 web 服务器
 	router := gin.Default()
 	router.Use(commonHandler.ValidateHeaders())
-	api := router.Group(infra.ApplicationYaml.Web.RoutePrefix)
-	api.POST("/start-project", handler.StartProject)
+	api1 := router.Group(infra.ApplicationYaml.Web.RoutePrefix)
+	api1.POST("/start-project", handler.StartProject)
+	api2 := router.Group("/simulation/resource/scheduler")
+	api2.POST("/confirm", handler.Confirm)
+	api2.POST("/state", handler.State)
+	api2.POST("/confirm", handler.Confirm)
 	err := router.Run(":" + infra.ApplicationYaml.Web.Port)
 	if err != nil {
 		infra.GlobalLogger.Error("程序崩溃,监听端口 " + util.ToString(infra.ApplicationYaml.Web.Port) + " 失败。")

+ 15 - 0
amd64/dispatch_server/package/handler/old_interface_adapter.go

@@ -0,0 +1,15 @@
+package handler
+
+import "github.com/gin-gonic/gin"
+
+func Confirm(c *gin.Context) {
+
+}
+
+func Tick(c *gin.Context) {
+
+}
+
+func State(c *gin.Context) {
+
+}

+ 0 - 70
amd64/dispatch_server/vtd-pod-template.yaml

@@ -1,70 +0,0 @@
-apiVersion: v1
-kind: Pod
-metadata:
-  name: pod-name
-  namespace: namespace-name
-  labels:
-    user: CICV
-spec:
-  nodeName: node-name
-  dnsPolicy: None
-  dnsConfig:
-    nameservers:
-      #- 223.6.6.6
-      #- 8.8.8.8
-      - 10.16.11.1
-      - 10.16.11.2
-  hostAliases:
-    - ip: 10.14.85.239
-      hostnames:
-        - simulation004
-    - ip: 10.14.85.237
-      hostnames:
-        - gpu001
-  initContainers:
-    - name: init
-      image: algorithm-image
-      imagePullPolicy: Always
-      command: [ 'sh', '-c', 'echo algorithm image downloaded && sleep 2' ]
-  containers:
-    - name: vtd-container-name
-      image: vtd-image
-      imagePullPolicy: Always
-      command: [ "/Controller/VTDController", "vtd-command", "kafka-topic" ]
-      #command: [ "/Controller/VTDController", "/Controller/config/docker_cloud_algContest.ini", "kafka-topic" ]
-      resources:
-        limits:
-          nvidia.com/gpu: 1
-          #nvidia.com/mig-1g.10gb: 1
-      env:
-        - name: PodName
-          valueFrom:
-            fieldRef:
-              fieldPath: metadata.name
-        - name: LM_LICENSE_FILE
-          value: 27500@10.14.85.247
-          #value: 27500@10.14.8.24
-          #value: 27500@172.20.0.2
-        - name: CICV_DATA_CLOSEDLOOP_IP
-          value: cicv-data-closedloop-ip
-        - name: KAFKA_IP
-          value: kafka-ip
-        - name: MINIO_IP
-          value: minio-ip
-        - name: MINIO_ACCESS_KEY
-          value: minio-access-key
-        - name: MINIO_SECRET_KEY
-          value: minio-secret-key
-        - name: KAFKA_PARTITION
-          value: kafka-partition
-        - name: KAFKA_OFFSET
-          value: kafka-offset
-        - name: CPU_ORDER
-          value: cpu-order
-        - name: MINIO_BUCKET_NAME
-          value: minio-bucket
-    - name: algorithm-container
-      image: algorithm-image
-      imagePullPolicy: Never
-      command: [ "/bin/sh", "-c", "/run.sh; touch /tmp/hello.txt;while true;do /bin/echo $(date +%T) >> /tmp/hello.txt; sleep 600; done;" ]
-  restartPolicy: Never

+ 3 - 1
amd64/web_server/entity/e_exam.go

@@ -5,7 +5,9 @@ import (
 )
 
 type ExamPao struct {
-	TeamName string `json:"teamName"` // 队伍名字
+	TeamName  string  `json:"teamName"`  // 队伍名字
+	PositionX float64 `json:"positionX"` // 队伍名字
+	PositionY float64 `json:"positionY"` // 队伍名字
 }
 
 type ExamPo struct {

+ 103 - 1
amd64/web_server/handler/h_exam.go

@@ -10,12 +10,114 @@ import (
 	"github.com/signintech/gopdf"
 	"io"
 	"log"
+	"math"
 	"net/http"
 	"os"
+	"sync"
 	"time"
 )
 
-var defaultTime = time.Date(2006, time.January, 2, 15, 4, 5, 0, time.Local)
+var (
+	defaultTime            = time.Date(2006, time.January, 2, 15, 4, 5, 0, time.Local)
+	cacheMutex             sync.Mutex
+	cacheTeamName          = make(map[string]time.Time)
+	heartBeatTimeThreshold = 5 * time.Second // 心跳时间
+	InitialPositionX       = 0.00            // 需要比赛确认起点
+	InitialPositionY       = 0.00            // 需要比赛确认起点
+)
+
+// 考试心跳
+func Tick(c *gin.Context) {
+	param := new(webServerEntity.ExamPao)
+	// 映射到结构体
+	if err := c.ShouldBindJSON(&param); err != nil {
+		c_log.GlobalLogger.Error("请求体解析失败,错误信息为:", err)
+		c.JSON(http.StatusBadRequest, commonEntity.Response{
+			Code: 500,
+			Msg:  "请求体解析失败。",
+		})
+		return
+	}
+	teamName := param.TeamName
+	positionX := param.PositionX
+	positionY := param.PositionY
+	if !util.ContainsKey(cacheTeamName, teamName) && math.Abs(positionX-InitialPositionX) < 5.00 && math.Abs(positionY-InitialPositionY) < 5.00 { // (在起点开始)
+		sqlTemplate, _ := util.ReadFile(c_db.SqlFilesMap["exam-insert-begin_time-by-team_name.sql"])
+		c_log.GlobalLogger.Info("保存比赛开始时间", sqlTemplate)
+		if err := c_db.DoTx(sqlTemplate, []any{
+			param.TeamName,
+			time.Now(),
+		}); err != nil {
+			c_log.GlobalLogger.Error("保存比赛开始时间报错:", err)
+			c.JSON(http.StatusBadRequest, commonEntity.Response{
+				Code: 500,
+				Msg:  "保存比赛开始时间报错。",
+			})
+			return
+		}
+	} else if !util.ContainsKey(cacheTeamName, teamName) && math.Abs(positionX-InitialPositionX) < 5.00 && math.Abs(positionY-InitialPositionY) < 5.00 { // 不在起点(开始)
+		// todo 查询队伍上一个记录,修改结束时间为初始时间,并添加心跳到缓存
+
+	} else if util.ContainsKey(cacheTeamName, teamName) { // 进行中
+		cacheTeamName[teamName] = time.Now()
+	}
+
+	c.JSON(http.StatusOK, commonEntity.Response{
+		Code: 200,
+		Msg:  "心跳接收成功。",
+	})
+}
+
+func ExamEndTicker() {
+	// 创建一个定时器,每隔一秒触发一次
+	ticker := time.NewTicker(1 * time.Second)
+	for {
+		select {
+		// 定时器触发时执行的代码
+		case <-ticker.C:
+			cacheMutex.Lock()
+			{
+				var keysToDelete []string
+				for teamName, heartBeatTime := range cacheTeamName {
+					if time.Since(heartBeatTime) > heartBeatTimeThreshold { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
+						keysToDelete = append(keysToDelete, teamName)
+					}
+				}
+				for _, teamName := range keysToDelete { // 检查缓存中的队名,如果超过心跳时间,则代表考试结束,删除缓存中的队名
+					delete(cacheTeamName, teamName)
+					// 1 查询指定队伍的开始时间最新的考试是否有结束时间,如果有则不在处理,如果没有则更新
+					var result []webServerEntity.ExamPo
+					selectSql, err := util.ReadFile(c_db.SqlFilesMap["exam-select-latest-by-team_name.sql"])
+					if err != nil {
+						c_log.GlobalLogger.Error("读取sql文件报错:", err)
+						return
+					}
+					// 可以传参数
+					if err = c_db.MysqlDb.Select(&result, selectSql, teamName); err != nil {
+						c_log.GlobalLogger.Error("数据库查询报错:", err)
+						return
+					}
+					c_log.GlobalLogger.Info("数据库查询成功:", result)
+					if !result[0].EndTime.Equal(defaultTime) {
+						c_log.GlobalLogger.Error("赛队", teamName, "考试已结束!")
+						return
+					}
+					// 更新到数据库
+					sqlTemplate, _ := util.ReadFile(c_db.SqlFilesMap["exam-update-end_time-by-team_name.sql"])
+					if err := c_db.DoTx(sqlTemplate, []any{
+						time.Now(),
+						teamName,
+					}); err != nil {
+						c_log.GlobalLogger.Error("插入数据报错:", err)
+						return
+					}
+					c_log.GlobalLogger.Infof("队伍 %v 的考试结束。", teamName)
+				}
+			}
+			cacheMutex.Unlock()
+		}
+	}
+}
 
 // 考试开始时间
 func Begin(c *gin.Context) {

+ 4 - 2
amd64/web_server/main.go

@@ -70,8 +70,10 @@ func main() {
 	// 通过路由组设置全局前缀
 	projectPrefix := router.Group(ApplicationYaml.Web.RoutePrefix)
 	examPrefix := projectPrefix.Group("/exam")
-	examPrefix.POST("/begin", handler.Begin)   // 考试开始
-	examPrefix.POST("/end", handler.End)       // 考试结束
+	examPrefix.POST("/tick", handler.Tick)     // 考试开始
+	go handler.ExamEndTicker()                 // 考试结束
+	examPrefix.POST("/begin", handler.Begin)   // 考试开始2
+	examPrefix.POST("/end", handler.End)       // 考试结束2
 	examPrefix.POST("/page", handler.Page)     // 分页查询
 	examPrefix.POST("/report", handler.Report) // pdf下载
 	monitorPrefix := projectPrefix.Group("/monitor")