夜得朦胧 1 éve
szülő
commit
e2261c0324

+ 66 - 2
simulation-resource-server/src/main/java/com/css/simulation/resource/server/app/impl/MultiSimulationProjectServiceImpl.java

@@ -2,15 +2,20 @@ package com.css.simulation.resource.server.app.impl;
 
 import api.common.pojo.common.PageVO;
 import api.common.pojo.common.ResponseBodyVO;
+import api.common.pojo.constants.DictConstants;
+import api.common.pojo.constants.ProjectConstants;
 import api.common.pojo.enums.MultiSimulationResultTypeEnum;
 import api.common.pojo.enums.MultiSimulationStatusEnum;
+import api.common.pojo.param.KafkaParameter;
 import api.common.pojo.param.project.*;
 import api.common.pojo.po.algorithm.AlgorithmPO;
 import api.common.pojo.po.model.ConfigPO;
 import api.common.pojo.po.project.MultiSimulationProjectPO;
 import api.common.pojo.po.project.MultiSimulationScenePO;
+import api.common.pojo.po.project.SimulationManualProjectPO;
 import api.common.pojo.vo.map.SimulationMapVO;
 import api.common.pojo.vo.project.*;
+import api.common.util.JsonUtil;
 import api.common.util.StringUtil;
 import api.common.util.TimeUtil;
 import com.alibaba.cloud.commons.lang.StringUtils;
@@ -21,6 +26,7 @@ import com.css.simulation.resource.server.infra.util.PageUtil;
 import com.github.pagehelper.PageInfo;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.CollectionUtils;
@@ -51,6 +57,9 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
     @Resource
     private MultiSimulationProjectResultMapper resultMapper;
 
+    @Resource
+    private KafkaTemplate<String, String> kafkaTemplate;
+
     @Override
     @SneakyThrows
     public ResponseBodyVO<PageInfo<MultiSimulationProjectVO>> selectMultiSimulationProject(MultiSimulationProjectParam param) {
@@ -150,7 +159,8 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
                     return sceneCars;
                 }
             }
-            // TODO 发送kafka
+            MultiSimulationProjectKafkaParam multiSimulationProjectKafkaParam = buildSendKafkaParam(projectId);
+            projectRunToKafka(multiSimulationProjectKafkaParam);
             res = multiSimulationProjectMapper.updateMultiSimulationProjectStatus(param);
         } else if (projectStatus == MultiSimulationStatusEnum.COMPLETED_STATUS.getProjectStatus()) {
             return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE, "当前状态已完成,不允许操作");   
@@ -158,7 +168,7 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
             if (status != MultiSimulationStatusEnum.TERMINATED_STATUS.getProjectStatus()){
                 return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE, "当前任务状态不允许执行此操作");
             }
-            // TODO 发送kafka
+            projectStopMultiKafka(projectId);
             res = multiSimulationProjectMapper.updateMultiSimulationProjectStatus(param);
         }
         if (res > 0){
@@ -167,6 +177,60 @@ public class MultiSimulationProjectServiceImpl implements MultiSimulationProject
         return new ResponseBodyVO<>(ResponseBodyVO.Response.CLIENT_FAILURE);
     }
 
+    public MultiSimulationProjectKafkaParam buildSendKafkaParam(String projectId){
+        MultiSimulationProjectKafkaParam multiSimulationProjectKafkaParam = new MultiSimulationProjectKafkaParam();
+        multiSimulationProjectKafkaParam.setProjectId(projectId);
+        MultiSimulationSceneParam sceneParam = new MultiSimulationSceneParam();
+        sceneParam.setProjectId(projectId);
+        List<MultiSimulationSceneVO> multiSimulationSceneVOS = simulationSceneMapper.selectSceneList(sceneParam);
+        List<MultiSimulationSceneKafkaParam> sceneKafkaParamList = new ArrayList<>();
+        for (MultiSimulationSceneVO scene : multiSimulationSceneVOS) {
+            String id = scene.getId();
+            MultiSimulationSceneKafkaParam multiSimulationSceneKafkaParam = new MultiSimulationSceneKafkaParam();
+            multiSimulationSceneKafkaParam.setMapId(scene.getMapId());
+            multiSimulationSceneKafkaParam.setProjectId(scene.getProjectId());
+            MultiSimulationSceneCarParam sceneCarParam = new MultiSimulationSceneCarParam();
+            sceneCarParam.setSceneId(id);
+            List<MultiSimulationSceneCarVO> multiSimulationSceneCarVOS = sceneCarMapper.selectSceneCarList(sceneCarParam);
+            multiSimulationSceneKafkaParam.setSimulationSceneCarVOList(multiSimulationSceneCarVOS);
+            sceneKafkaParamList.add(multiSimulationSceneKafkaParam);
+        }
+        return multiSimulationProjectKafkaParam;
+    }
+
+    @SneakyThrows
+    private void projectRunToKafka(MultiSimulationProjectKafkaParam param) {
+        KafkaParameter kafkaParameter = new KafkaParameter();
+        kafkaParameter.setTopic(ProjectConstants.RUN_MULTI_TOPIC);
+        String data = JsonUtil.beanToJson(param);
+        kafkaParameter.setData(data);
+        log.info("推送项目运行消息到 Kafka:" + data);
+        sendKafka(kafkaParameter);
+    }
+    @SneakyThrows
+    private void projectStopMultiKafka(String projectId) {
+        KafkaParameter kafkaParameter = new KafkaParameter();
+        kafkaParameter.setTopic(ProjectConstants.STOP_MULTI_TOPIC);
+        JSONObject object = new JSONObject();
+        object.put("projectId", projectId);
+        kafkaParameter.setData(object.toJSONString());
+        log.info("停止任务推送项目运行消息到 Kafka:" + object.toJSONString());
+        sendKafka(kafkaParameter);
+    }
+
+    // TODO 测试时候打开
+    public void sendKafka(KafkaParameter kafkaParameter){
+//        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
+//            // 消息发送到的topic
+//            String topic = success.getRecordMetadata().topic();
+//            // 消息发送到的分区
+//            int partition = success.getRecordMetadata().partition();
+//            // 消息在分区内的offset
+//            long offset = success.getRecordMetadata().offset();
+//            log.info("停止任务发送消息成功,主题 topic 为:" + topic + ",分区 partition 为:" + partition + ",偏移量为:" + offset + ",消息体为:" + kafkaParameter.getData());
+//        }, failure -> log.error("发送消息失败:" + failure.getMessage()));
+    }
+
     public ResponseBodyVO<MultiSimulationProjectVO> submitMultiSimulationProjectDetail(MultiSimulationProjectParam param){
         // 先更新
         multiSimulationProjectMapper.updateMultiSimulationProject(param);