martin před 3 roky
rodič
revize
d4289fe764

+ 20 - 0
api-common/src/main/java/api/common/pojo/dto/ProjectMessageDTO.java

@@ -0,0 +1,20 @@
+package api.common.pojo.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProjectMessageDTO {
+
+    private String projectId;// 项目 id
+    private String vehicleId;// 车辆 id
+    private String algorithmId;// 算法 id
+    private String defaultTime;// 任务执行时间,一个工作里的所有任务公用
+    private String parallelism;// 并行度,创建 pod 时使用
+
+}

+ 0 - 17
api-common/src/main/java/api/common/pojo/vo/KafkaMessageVO.java

@@ -1,17 +0,0 @@
-package api.common.pojo.vo;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import javax.validation.constraints.NotBlank;
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class KafkaMessageVO {
-    @NotBlank(message = "主题不能为空!")
-    private String topic;
-    @NotBlank(message = "消息不能为空!")
-    private String data;
-}

+ 3 - 3
simulation-resource-common/src/main/java/com/css/simulation/resource/common/controller/KafkaController.java

@@ -1,6 +1,6 @@
 package com.css.simulation.resource.common.controller;
 
-import api.common.pojo.vo.KafkaMessageVO;
+import api.common.pojo.param.KafkaParameter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.validation.annotation.Validated;
@@ -20,8 +20,8 @@ public class KafkaController {
 
     // -------------------------------- 带回调函数的发送 --------------------------------
     @RequestMapping("/send")
-    public void send(@RequestBody @Validated KafkaMessageVO kafkaMessageVO) {
-        kafkaTemplate.send(kafkaMessageVO.getTopic(), kafkaMessageVO.getData()).addCallback(success -> {
+    public void send(@RequestBody @Validated KafkaParameter kafkaParameter) {
+        kafkaTemplate.send(kafkaParameter.getTopic(), kafkaParameter.getData()).addCallback(success -> {
             // 消息发送到的topic
             String topic = success.getRecordMetadata().topic();
             // 消息发送到的分区

+ 1 - 1
simulation-resource-common/src/main/java/com/css/simulation/resource/common/controller/MinioController.java

@@ -35,7 +35,7 @@ public class MinioController {
     public ResponseBodyVO<String> upload(
             @RequestPart("file") MultipartFile file,
             @RequestParam("bucketName") String bucketName,
-            @RequestParam("objectName") String objectName
+            @RequestParam("objectName") String objectName//  "/xx/xxx/x/xx"
     ) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
         MinioUtil.uploadFromMultipartFile(
                 minioClient,

+ 1 - 4
simulation-resource-common/src/main/resources/bootstrap.yml

@@ -11,7 +11,4 @@ spring:
       config:
         server-addr: 10.15.12.70:8848
         namespace: 3698bfc2-a612-487a-b2a2-aaad16cd9d9d
-        file-extension: yaml
-server:
-  tomcat:
-    uri-encoding: utf-8
+        file-extension: yaml

+ 102 - 95
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -1,95 +1,102 @@
-//package com.css.simulation.resource.scheduler.consumer;
-//
-//
-//import api.common.util.JsonUtil;
-//import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
-//import com.css.simulation.resource.scheduler.mapper.SensorMapper;
-//import com.css.simulation.resource.scheduler.mapper.TaskMapper;
-//import com.css.simulation.resource.scheduler.pojo.dto.task.*;
-//import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
-//import com.fasterxml.jackson.core.JsonProcessingException;
-//import org.apache.kafka.clients.consumer.ConsumerRecord;
-//import org.springframework.boot.configurationprocessor.json.JSONException;
-//import org.springframework.boot.configurationprocessor.json.JSONObject;
-//import org.springframework.kafka.annotation.KafkaListener;
-//import org.springframework.kafka.core.KafkaTemplate;
-//import org.springframework.stereotype.Component;
-//
-//import javax.annotation.Resource;
-//import java.util.List;
-//
-//@Component
-//public class ProjectConsumer {
-//
-//    @Resource
-//    ProjectMapper projectMapper;
-//    @Resource
-//    TaskMapper taskMapper;
-//    @Resource
-//    SensorMapper sensorMapper;
-//    @Resource
-//    private KafkaTemplate<String, String> kafkaTemplate;
-//
-//    @KafkaListener(topics = "project")
-//    public void parseProject(ConsumerRecord<String, String> projectRecord) throws JsonProcessingException, JSONException {
-//        //1 读取 kafka 的 project 信息
-//        String projectJson = projectRecord.value();
-//
-//        JSONObject projectJsonObject = new JSONObject(projectJson);
-//        String projectId = projectJsonObject.optString("projectId");        // 项目 id
-//        String defaultTime = projectJsonObject.optString("defaultTime");    // 任务执行时间,一个工作里的所有任务公用
-//        String parallelism = projectJsonObject.optString("parallelism");    // 并行度,创建 pod 时使用
-//        //2 修改该 project 的状态为执行中
-//        projectMapper.updateProjectState(projectId, "2");
-//        //3 根据 project 信息获取 task 信息。task 信息包括算法、模型、场景数据包等
-//        List<TaskPO> taskPOList = taskMapper.selectByProjectId(projectId);
-//        //4 组装 task 消息
-//        for (TaskPO taskPO : taskPOList) {
-//            //4-1 组装 task 消息
-//            TaskDTO taskDTO = TaskDTO.builder()
-//                    .info(InfoDTO.builder()
-//                            .project_id(projectId)
-//                            .task_id(taskPO.getId())
-//                            .task_path(taskPO.getResultPath())
-//                            .build())
-//                    .scenario(ScenarioDTO.builder()
-//                            .scenario_osc(taskPO.getScenarioOsc())
-//                            .scenario_odr(taskPO.getScenarioOdr())
-//                            .scenario_osgb(taskPO.getScenarioOsgb())
-//                            .build())
-//                    .vehicle(VehicleDTO.builder()
-//                            .model(ModelDTO.builder().model_label(taskPO.getModelLabel()).build())
-//                            .dynamics(DynamicsDTO.builder()
-//                                    .dynamics_maxspeed(taskPO.getMaxSpeed())
-//                                    .dynamics_enginepower(taskPO.getEnginePower())
-//                                    .dynamics_maxdecel(taskPO.getMaxDeceleration())
-//                                    .dynamics_maxsteering(taskPO.getMaxSteeringAngle())
-//                                    .dynamics_mass(taskPO.getMass())
-//                                    .dynamics_frontsurfaceeffective(taskPO.getFrontSurfaceEffective())
-//                                    .dynamics_airdragcoefficient(taskPO.getAirDragCoefficient())
-//                                    .dynamics_rollingresistance(taskPO.getRollingResistanceCoefficient())
-//                                    .dynamics_wheeldiameter(taskPO.getWheelDiameter())
-//                                    .dynamics_wheeldrive(taskPO.getDrive())
-//                                    .dynamics_overallefficiency(taskPO.getOverallEfficiency())
-//                                    .dynamics_distfront(taskPO.getFrontDistance())
-//                                    .dynamics_distrear(taskPO.getRearDistance())
-//                                    .dynamics_distleft(taskPO.getLeftDistance())
-//                                    .dynamics_distright(taskPO.getRightDistance())
-//                                    .dynamics_distheight(taskPO.getHeight())
-//                                    .dynamics_wheelbase(taskPO.getDynamicsWheelbase())
-//                                    .build())
-//                            .sensors(SensorsDTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
-//                                    .camera(sensorMapper.selectCameraByVehicleId(taskPO.getVehicleId()))
-//                                    .OGT(sensorMapper.selectOgtByVehicleId(taskPO.getVehicleId()))
-//                                    .build())
-//                            .build())
-//                    .build();
-//            //4-4 将对象转成 json
-//            String taskJson = JsonUtil.beanToJson(taskDTO);
-//            //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
-//            kafkaTemplate.send(projectId, 0, taskPO.getId(), taskJson);
-//        }
-//        //4 创建 pod 开始执行。
-//
-//    }
-//}
+package com.css.simulation.resource.scheduler.consumer;
+
+
+import api.common.pojo.dto.ProjectMessageDTO;
+import api.common.util.JsonUtil;
+import com.css.simulation.resource.scheduler.mapper.ProjectMapper;
+import com.css.simulation.resource.scheduler.mapper.SensorMapper;
+import com.css.simulation.resource.scheduler.mapper.TaskMapper;
+import com.css.simulation.resource.scheduler.pojo.dto.task.*;
+import com.css.simulation.resource.scheduler.pojo.po.TaskPO;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+public class ProjectConsumer {
+
+    @Autowired
+    ProjectMapper projectMapper;
+    @Autowired
+    TaskMapper taskMapper;
+    @Autowired
+    SensorMapper sensorMapper;
+    @Autowired
+    private KafkaTemplate<String, String> kafkaTemplate;
+
+    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "test")
+    public void testConsumer(ConsumerRecord<String, String> projectRecord) {
+        System.out.println("------- 消费成功:" + projectRecord.value());
+    }
+
+    @KafkaListener(groupId = "simulation-resource-scheduler", topics = "project")
+    public void parseProject(ConsumerRecord<String, String> projectRecord) throws JsonProcessingException, JSONException {
+        //1 读取 kafka 的 project 信息
+        String projectJson = projectRecord.value();
+
+        ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
+        String projectId = projectMessageDTO.getProjectId();
+        String defaultTime = projectMessageDTO.getDefaultTime();
+        String parallelism = projectMessageDTO.getParallelism();
+        //2 修改该 project 的状态为执行中
+        projectMapper.updateProjectState(projectId, "2");
+        //3 根据 project 信息获取 task 信息。task 信息包括算法、模型、场景数据包等
+        List<TaskPO> taskPOList = taskMapper.selectByProjectId(projectId);
+        //4 组装 task 消息
+        for (TaskPO taskPO : taskPOList) {
+            //4-1 组装 task 消息
+            TaskDTO taskDTO = TaskDTO.builder()
+                    .info(InfoDTO.builder()
+                            .project_id(projectId)
+                            .task_id(taskPO.getId())
+                            .task_path(taskPO.getResultPath())
+                            .build())
+                    .scenario(ScenarioDTO.builder()
+                            .scenario_osc(taskPO.getScenarioOsc())
+                            .scenario_odr(taskPO.getScenarioOdr())
+                            .scenario_osgb(taskPO.getScenarioOsgb())
+                            .build())
+                    .vehicle(VehicleDTO.builder()
+                            .model(ModelDTO.builder()
+                                    .model_label(taskPO.getModelLabel())
+                                    .build())
+                            .dynamics(DynamicsDTO.builder()
+                                    .dynamics_maxspeed(taskPO.getMaxSpeed())
+                                    .dynamics_enginepower(taskPO.getEnginePower())
+                                    .dynamics_maxdecel(taskPO.getMaxDeceleration())
+                                    .dynamics_maxsteering(taskPO.getMaxSteeringAngle())
+                                    .dynamics_mass(taskPO.getMass())
+                                    .dynamics_frontsurfaceeffective(taskPO.getFrontSurfaceEffective())
+                                    .dynamics_airdragcoefficient(taskPO.getAirDragCoefficient())
+                                    .dynamics_rollingresistance(taskPO.getRollingResistanceCoefficient())
+                                    .dynamics_wheeldiameter(taskPO.getWheelDiameter())
+                                    .dynamics_wheeldrive(taskPO.getDrive())
+                                    .dynamics_overallefficiency(taskPO.getOverallEfficiency())
+                                    .dynamics_distfront(taskPO.getFrontDistance())
+                                    .dynamics_distrear(taskPO.getRearDistance())
+                                    .dynamics_distleft(taskPO.getLeftDistance())
+                                    .dynamics_distright(taskPO.getRightDistance())
+                                    .dynamics_distheight(taskPO.getHeight())
+                                    .dynamics_wheelbase(taskPO.getDynamicsWheelbase())
+                                    .build())
+                            .sensors(SensorsDTO.builder()   // 根据 vehicleId 查询绑定的传感器列表
+                                    .camera(sensorMapper.selectCameraByVehicleId(taskPO.getVehicleId()))
+                                    .OGT(sensorMapper.selectOgtByVehicleId(taskPO.getVehicleId()))
+                                    .build())
+                            .build())
+                    .build();
+            //4-4 将对象转成 json
+            String taskJson = JsonUtil.beanToJson(taskDTO);
+            //4-5 将 projectId 作为 topic 名称,发送 task 信息到 kafka
+            kafkaTemplate.send(projectId, 0, taskPO.getId(), taskJson);
+        }
+        //4 创建 pod 开始执行。
+
+    }
+}

+ 10 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/feign/DemoController.java

@@ -1,8 +1,10 @@
 package com.css.simulation.resource.scheduler.feign;
 
 import api.common.pojo.common.ResponseBodyVO;
+import api.common.pojo.param.KafkaParameter;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
+import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.multipart.MultipartFile;
 
@@ -20,4 +22,12 @@ public class DemoController {
 
         return demoService.upload(file,bucketName,objectName);
     }
+
+    @PostMapping(value = "/send", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
+    // -------------------------------- 带回调函数的发送 --------------------------------
+    @RequestMapping("/send")
+    public ResponseBodyVO<String> send(@RequestBody @Validated KafkaParameter kafkaParameter) {
+        return demoService.send(kafkaParameter);
+    }
+
 }

+ 6 - 1
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/feign/DemoService.java

@@ -1,18 +1,20 @@
 package com.css.simulation.resource.scheduler.feign;
 
 import api.common.pojo.common.ResponseBodyVO;
+import api.common.pojo.param.KafkaParameter;
 import com.css.simulation.resource.scheduler.configuration.feign.OpenFeignConfiguration;
 import com.css.simulation.resource.scheduler.feign.fallback.DemoServiceFallback;
 import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
+import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RequestPart;
 import org.springframework.web.multipart.MultipartFile;
 
 @Component
-//@FeignClient(value = "http://10.15.12.74:8001", fallback = DemoServiceFallback.class, configuration = OpenFeignConfiguration.class)
 @FeignClient(
         value = "simulation-resource-common",
         fallback = DemoServiceFallback.class,
@@ -25,5 +27,8 @@ public interface DemoService {
                                   @RequestParam("bucketName") String bucketName,
                                   @RequestParam("objectName") String objectName);
 
+    @PostMapping(value = "/simulation/resource/common/kafka/send", consumes = MediaType.APPLICATION_JSON_VALUE)
+    ResponseBodyVO<String> send(@RequestBody @Validated KafkaParameter kafkaParameter);
+
 
 }

+ 6 - 0
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/feign/fallback/DemoServiceFallback.java

@@ -1,6 +1,7 @@
 package com.css.simulation.resource.scheduler.feign.fallback;
 
 import api.common.pojo.common.ResponseBodyVO;
+import api.common.pojo.param.KafkaParameter;
 import com.css.simulation.resource.scheduler.feign.DemoService;
 import org.springframework.stereotype.Service;
 import org.springframework.web.bind.annotation.RequestParam;
@@ -16,4 +17,9 @@ public class DemoServiceFallback implements DemoService {
                                          @RequestParam("objectName") String objectName) {
         return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE);
     }
+
+    @Override
+    public ResponseBodyVO<String> send(KafkaParameter kafkaParameter) {
+        return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE);
+    }
 }