Selaa lähdekoodia

Merge remote-tracking branch 'origin/20240309-saq-fix' into lcytest

# Conflicts:
#	simulation-resource-server/src/main/java/com/css/simulation/resource/server/infra/util/ChartUtil.java
李春阳 1 vuosi sitten
vanhempi
commit
4c1e9cf003

+ 4 - 0
api-common/src/main/java/api/common/pojo/constants/ProjectConstants.java

@@ -11,6 +11,10 @@ public class ProjectConstants {
     //任务终止,kafka推送主题
     public static final String STOP_TASK_TOPIC = "stopProject";//   manualProject-stop
 
+    public static final String RUN_MULTI_TOPIC = "multiProject";
+
+    public static final String STOP_MULTI_TOPIC = "stopMultiProject";
+
     //自动任务运行开始
 //    public static final String AUTO_PROJECT = "autoProject";
 

+ 18 - 0
api-common/src/main/java/api/common/pojo/param/project/MultiSimulationProjectKafkaParam.java

@@ -0,0 +1,18 @@
+package api.common.pojo.param.project;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.util.List;
+
+/**
+ * kafka让推送需要的参数
+ */
+@Getter
+@Setter
+@Accessors(chain = true)
+public class MultiSimulationProjectKafkaParam {
+    private String projectId;
+    private List<MultiSimulationSceneKafkaParam> kafkaParamList;
+}

+ 15 - 0
api-common/src/main/java/api/common/pojo/param/project/MultiSimulationSceneCarKafkaParam.java

@@ -0,0 +1,15 @@
+package api.common.pojo.param.project;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * kafka让推送需要的参数
+ */
+@Getter
+@Setter
+@Accessors(chain = true)
+public class MultiSimulationSceneCarKafkaParam {
+
+}

+ 19 - 0
api-common/src/main/java/api/common/pojo/param/project/MultiSimulationSceneKafkaParam.java

@@ -0,0 +1,19 @@
+package api.common.pojo.param.project;
+
+import api.common.pojo.po.project.MultiSimulationScenePO;
+import api.common.pojo.vo.project.MultiSimulationSceneCarVO;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+import java.util.List;
+
+/**
+ * kafka让推送需要的参数
+ */
+@Getter
+@Setter
+@Accessors(chain = true)
+public class MultiSimulationSceneKafkaParam extends MultiSimulationScenePO {
+    private List<MultiSimulationSceneCarVO> simulationSceneCarVOList;
+}

+ 66 - 2
simulation-resource-server/src/main/java/com/css/simulation/resource/server/app/impl/MultiSimulationProjectServiceImpl.java

@@ -2,15 +2,20 @@ package com.css.simulation.resource.server.app.impl;
 
 import api.common.pojo.common.PageVO;
 import api.common.pojo.common.ResponseBodyVO;
+import api.common.pojo.constants.DictConstants;
+import api.common.pojo.constants.ProjectConstants;
 import api.common.pojo.enums.MultiSimulationResultTypeEnum;
 import api.common.pojo.enums.MultiSimulationStatusEnum;
+import api.common.pojo.param.KafkaParameter;
 import api.common.pojo.param.project.*;
 import api.common.pojo.po.algorithm.AlgorithmPO;
 import api.common.pojo.po.model.ConfigPO;
 import api.common.pojo.po.project.MultiSimulationProjectPO;
 import api.common.pojo.po.project.MultiSimulationScenePO;
+import api.common.pojo.po.project.SimulationManualProjectPO;
 import api.common.pojo.vo.map.SimulationMapVO;
 import api.common.pojo.vo.project.*;
+import api.common.util.JsonUtil;
 import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
 import com.alibaba.cloud.commons.lang.StringUtils;
@@ -21,6 +26,7 @@ import com.css.simulation.resource.server.infra.util.PageUtil;
 import com.github.pagehelper.PageInfo;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.CollectionUtils;
@@ -51,6 +57,9 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
     @Resource
     private MultiSimulationProjectResultMapper resultMapper;
 
+    @Resource
+    private KafkaTemplate<String, String> kafkaTemplate;
+
     @Override
     @SneakyThrows
     public ResponseBodyVO<PageInfo<MultiSimulationProjectVO>> selectMultiSimulationProject(MultiSimulationProjectParam param) {
@@ -150,7 +159,8 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
                     return sceneCars;
                 }
             }
-            // TODO 发送kafka
+            MultiSimulationProjectKafkaParam multiSimulationProjectKafkaParam = buildSendKafkaParam(projectId);
+            projectRunToKafka(multiSimulationProjectKafkaParam);
             res = multiSimulationProjectMapper.updateMultiSimulationProjectStatus(param);
         } else if (projectStatus == MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus()) {
             return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE, "当前状态已完成,不允许操作");   
@@ -158,7 +168,7 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
             if (status != MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus()){
                 return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE, "当前任务状态不允许执行此操作");
             }
-            // TODO 发送kafka
+            projectStopMultiKafka(projectId);
             res = multiSimulationProjectMapper.updateMultiSimulationProjectStatus(param);
         }
         if (res > 0){
@@ -167,6 +177,60 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
         return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE);
     }
 
+    public MultiSimulationProjectKafkaParam buildSendKafkaParam(String projectId){
+        MultiSimulationProjectKafkaParam multiSimulationProjectKafkaParam = new MultiSimulationProjectKafkaParam();
+        multiSimulationProjectKafkaParam.setProjectId(projectId);
+        MultiSimulationSceneParam sceneParam = new MultiSimulationSceneParam();
+        sceneParam.setProjectId(projectId);
+        List<MultiSimulationSceneVO> multiSimulationSceneVOS = simulationSceneMapper.selectSceneList(sceneParam);
+        List<MultiSimulationSceneKafkaParam> sceneKafkaParamList = new ArrayList<>();
+        for (MultiSimulationSceneVO scene : multiSimulationSceneVOS) {
+            String id = scene.getId();
+            MultiSimulationSceneKafkaParam multiSimulationSceneKafkaParam = new MultiSimulationSceneKafkaParam();
+            multiSimulationSceneKafkaParam.setMapId(scene.getMapId());
+            multiSimulationSceneKafkaParam.setProjectId(scene.getProjectId());
+            MultiSimulationSceneCarParam sceneCarParam = new MultiSimulationSceneCarParam();
+            sceneCarParam.setSceneId(id);
+            List<MultiSimulationSceneCarVO> multiSimulationSceneCarVOS = sceneCarMapper.selectSceneCarList(sceneCarParam);
+            multiSimulationSceneKafkaParam.setSimulationSceneCarVOList(multiSimulationSceneCarVOS);
+            sceneKafkaParamList.add(multiSimulationSceneKafkaParam);
+        }
+        return multiSimulationProjectKafkaParam;
+    }
+
+    @SneakyThrows
+    private void projectRunToKafka(MultiSimulationProjectKafkaParam param) {
+        KafkaParameter kafkaParameter = new KafkaParameter();
+        kafkaParameter.setTopic(ProjectConstants.RUN_MULTI_TOPIC);
+        String data = JsonUtil.beanToJson(param);
+        kafkaParameter.setData(data);
+        log.info("推送项目运行消息到 Kafka:" + data);
+        sendKafka(kafkaParameter);
+    }
+    @SneakyThrows
+    private void projectStopMultiKafka(String projectId) {
+        KafkaParameter kafkaParameter = new KafkaParameter();
+        kafkaParameter.setTopic(ProjectConstants.STOP_MULTI_TOPIC);
+        JSONObject object = new JSONObject();
+        object.put("projectId", projectId);
+        kafkaParameter.setData(object.toJSONString());
+        log.info("停止任务推送项目运行消息到 Kafka:" + object.toJSONString());
+        sendKafka(kafkaParameter);
+    }
+
+    // TODO 测试时候打开
+    public void sendKafka(KafkaParameter kafkaParameter){
+//        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
+//            // 消息发送到的topic
+//            String topic = success.getRecordMetadata().topic();
+//            // 消息发送到的分区
+//            int partition = success.getRecordMetadata().partition();
+//            // 消息在分区内的offset
+//            long offset = success.getRecordMetadata().offset();
+//            log.info("停止任务发送消息成功,主题 topic 为:" + topic + ",分区 partition 为:" + partition + ",偏移量为:" + offset + ",消息体为:" + kafkaParameter.getData());
+//        }, failure -> log.error("发送消息失败:" + failure.getMessage()));
+    }
+
     public ResponseBodyVO<MultiSimulationProjectVO> submitMultiSimulationProjectDetail(MultiSimulationProjectParam param){
         // 先更新
         multiSimulationProjectMapper.updateMultiSimulationProject(param);