Pārlūkot izejas kodu

项目终止删除消息体

root 2 gadi atpakaļ
vecāks
revīzija
530bac9a7a

+ 4 - 4
simulation-resource-scheduler/src/main/java/com/css/simulation/resource/scheduler/consumer/ProjectConsumer.java

@@ -364,11 +364,11 @@ public class ProjectConsumer {
     public void parseProject(ProjectMessageDTO projectMessageDTO, String projectWaitingKey, String projectRunningKey,
                              String userId) {
         String projectId = projectMessageDTO.getProjectId();    // 项目 id
-        String projectType = projectMessageDTO.getType();   // 项目类型
+        // 项目类型
         int currentParallelism = projectMessageDTO.getCurrentParallelism();   // 当前并行度
-        String packageId = projectMessageDTO.getScenePackageId();   // 场景测试包 id
-        long videoTime = projectMessageDTO.getMaxSimulationTime(); // 结果视频的时长
-        String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
+        // 场景测试包 id
+        // 结果视频的时长
+        // 模型配置 id
         String algorithmId = projectMessageDTO.getAlgorithmId();    // 算法 id
         String projectPath = linuxTempPath + "project/" + projectId + "/";
         // -------------------------------- 0 准备 --------------------------------

+ 24 - 0
simulation-resource-server/src/main/java/com/css/simulation/resource/configuration/kafka/KafkaAdminConfiguration.java

@@ -0,0 +1,24 @@
+package com.css.simulation.resource.configuration.kafka;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Properties;
+
+@Configuration
+public class KafkaAdminConfiguration {
+
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;    // 服务器
+
+    @Bean("myKafkaAdmin")
+    public Admin admin() {
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        return KafkaAdminClient.create(properties);
+    }
+}

+ 16 - 11
simulation-resource-server/src/main/java/com/css/simulation/resource/project/impl/SimulationProjectServiceImpl.java

@@ -32,6 +32,7 @@ import com.css.simulation.resource.project.enums.SceneTypeEnum;
 import com.css.simulation.resource.project.mapper.*;
 import com.css.simulation.resource.project.service.SimulationProjectService;
 import com.css.simulation.resource.system.service.DictService;
+import com.css.simulation.resource.util.ApacheKafkaUtil;
 import com.css.simulation.resource.util.ProjectUtil;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.pagehelper.PageInfo;
@@ -44,6 +45,7 @@ import com.itextpdf.text.pdf.PdfWriter;
 import feign.Response;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.Admin;
 import org.springframework.beans.BeanUtils;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.scheduling.support.CronExpression;
@@ -70,6 +72,8 @@ import java.util.zip.ZipOutputStream;
 @Slf4j
 public class SimulationProjectServiceImpl implements SimulationProjectService {
 
+    @Resource(name = "myKafkaAdmin")
+    Admin kafkaAdminClient;
     @Resource
     SimulationProjectMapper simulationProjectMapper;
     @Resource
@@ -279,21 +283,21 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
         }
 
         // 已经完成的项目再次运行
-        if ("30".equals(po.getNowRunState()) && "20".equals(param.getNowRunState())) {
+        if (DictConstants.PROJECT_COMPLETED.equals(po.getNowRunState())) {
 
             po.createPo(AuthUtil.getCurrentUserId());
-            //生成id
+            // 生成id
             createProjectId(po);
-            //初始化数据
+            // 初始化数据
             po.setNowRunState(param.getNowRunState());
             po.setEvaluationLevel("");
             po.setStartTime(new Date());
             po.setFinishTime(null);
             int add = simulationProjectMapper.add(po);
             if (add <= 0) {
-                return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE, "生成新工作失败");
+                return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE, "生成新工作失败");
             }
-            //00000 查询项目详情信息并保存
+            // 00000 查询项目详情信息并保存
             String projectId = po.getId();
             ProjectDetailsVo info = selectProjectDetailsByIdBackUp(SimulationManualProjectParam.builder()
                     .id(projectId)
@@ -301,10 +305,14 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
             String infoJson = JsonUtil.beanToJson(info);
             log.info("updateProjectNowRunState() 项目 " + projectId + " 的详情信息为:" + infoJson);
             simulationProjectMapper.updateDetailsById(projectId, infoJson);
-            //Kafka推送消息
+            // Kafka推送消息
             projectRunToKafka(po);
 
-        } else {    //创建新的项目
+        } else if (DictConstants.PROJECT_TERMINATED.equals(param.getNowRunState())) {   //项目终止,推送到kafka
+            String projectId = param.getId();
+            ApacheKafkaUtil.deleteTopic(kafkaAdminClient, projectId);
+            projectStopToKafka(po);
+        } else {    // 创建新的项目
             // 查询项目详情信息并保存
             String projectId = param.getId();
             ProjectDetailsVo info = selectProjectDetailsByIdBackUp(SimulationManualProjectParam.builder()
@@ -327,10 +335,7 @@ public class SimulationProjectServiceImpl implements SimulationProjectService {
                 return new ResponseBodyVO<>(ResponseBodyVO.Response.SERVER_FAILURE);
             }
         }
-        //项目终止,推送到kafka
-        if (DictConstants.PROJECT_TERMINATED.equals(param.getNowRunState())) {
-            projectStopToKafka(po);
-        }
+
         return new ResponseBodyVO<>(ResponseBodyVO.Response.SUCCESS);
     }
 

+ 208 - 0
simulation-resource-server/src/main/java/com/css/simulation/resource/util/ApacheKafkaUtil.java

@@ -0,0 +1,208 @@
+package com.css.simulation.resource.util;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+@Slf4j
+public class ApacheKafkaUtil {
+
+    //* -------------------------------- Admin --------------------------------
+
+    /**
+     * 创建主题
+     *
+     * @param admin             管理员对象
+     * @param name              主题名
+     * @param numPartitions     分区数量
+     * @param replicationFactor 副本数量
+     */
+    public static void createTopic(Admin admin, String name, int numPartitions, short replicationFactor) {
+        NewTopic newTopic = new NewTopic(name, numPartitions, replicationFactor);
+        admin.createTopics(Collections.singleton(newTopic));
+        log.info("ApacheKafkaUtil--createTopic 创建主题 " + name + ",分区数为:" + numPartitions + ",副本数为:" + replicationFactor);
+    }
+
+    /**
+     * 删除主题
+     *
+     * @param admin  管理员对象
+     * @param topics 需要删除的所有主题序列
+     */
+    public static void deleteTopic(Admin admin, String... topics) {
+        admin.deleteTopics(Arrays.asList(topics));
+        log.info("ApacheKafkaUtil.deleteTopic() 删除主题:" + Arrays.toString(topics));
+    }
+
+
+    //* -------------------------------- Producer --------------------------------
+
+    /**
+     * 默认用异步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     */
+    public static void send(KafkaProducer<String, String> kafkaProducer, String topic, String value) {
+        sendAsync(kafkaProducer, topic, value);
+    }
+
+    /**
+     * 默认用异步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     */
+    public static void send(KafkaProducer<String, String> kafkaProducer, String topic, String key, String value) {
+        sendAsync(kafkaProducer, topic, key, value);
+    }
+
+    /**
+     * 默认用异步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     */
+    public static void send(KafkaProducer<String, String> kafkaProducer, String topic, int partition, String key, String value) {
+        sendAsync(kafkaProducer, topic, partition, key, value);
+    }
+
+    /**
+     * 异步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     */
+    public static void sendAsync(KafkaProducer<String, String> kafkaProducer, String topic, String value) {
+        Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord<>(topic, value), (recordMetadata, e) -> {
+                    if (e == null) {
+                        log.info("ApacheKafkaUtil--send 主题:" + recordMetadata.topic() + ",分区:" + recordMetadata.partition());
+                    } else {
+                        log.error("ApacheKafkaUtil--send 发送失败,主题:" + topic + ",值:" + value);
+                    }
+                }
+        );
+    }
+
+    /**
+     * 异步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     */
+    public static void sendAsync(KafkaProducer<String, String> kafkaProducer, String topic, String key, String value) {
+        Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord<>(topic, key, value), (recordMetadata, e) -> {
+                    if (e == null) {
+                        log.info("ApacheKafkaUtil--send 主题:" + recordMetadata.topic() + ",分区:" + recordMetadata.partition());
+                    } else {
+                        log.error("ApacheKafkaUtil--send 发送失败,主题:" + topic + ",值:" + value);
+                    }
+                }
+        );
+    }
+
+    /**
+     * 异步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     */
+    public static void sendAsync(KafkaProducer<String, String> kafkaProducer, String topic, int partition, String key, String value) {
+        Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord<>(topic, partition, key, value), (recordMetadata, e) -> {
+                    if (e == null) {
+                        log.info("ApacheKafkaUtil--send 主题:" + recordMetadata.topic() + ",分区:" + recordMetadata.partition());
+                    } else {
+                        log.error("ApacheKafkaUtil--send 发送失败,主题:" + topic + ",值:" + value);
+                    }
+                }
+        );
+    }
+
+
+    /**
+     * 同步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     * @return 元信息
+     */
+    @SneakyThrows
+    public static RecordMetadata sendSync(KafkaProducer<String, String> kafkaProducer, String topic, String value) {
+        return kafkaProducer.send(new ProducerRecord<>(topic, value), (recordMetadata, e) -> {
+                    if (e == null) {
+                        log.info("ApacheKafkaUtil--send 主题:" + recordMetadata.topic() + ",分区:" + recordMetadata.partition());
+                    } else {
+                        log.error("ApacheKafkaUtil--send 发送失败,主题:" + topic + ",值:" + value);
+                    }
+                }
+        ).get();
+    }
+
+    /**
+     * 异步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     */
+    @SneakyThrows
+    public static RecordMetadata sendSync(KafkaProducer<String, String> kafkaProducer, String topic, String key, String value) {
+        return kafkaProducer.send(new ProducerRecord<>(topic, key, value), (recordMetadata, e) -> {
+                    if (e == null) {
+                        log.info("ApacheKafkaUtil--send 主题:" + recordMetadata.topic() + ",分区:" + recordMetadata.partition());
+                    } else {
+                        log.error("ApacheKafkaUtil--send 发送失败,主题:" + topic + ",值:" + value);
+                    }
+                }
+        ).get();
+    }
+
+
+    /**
+     * 异步发送
+     *
+     * @param kafkaProducer 生产者对象
+     * @param topic         主题
+     * @param value         值
+     */
+    @SneakyThrows
+    public static RecordMetadata sendSync(KafkaProducer<String, String> kafkaProducer, String topic, int partition, String key, String value) {
+//        kafkaProducer.send(new ProducerRecord<>(topic, value));
+        return kafkaProducer.send(new ProducerRecord<>(topic, partition, key, value), (recordMetadata, e) -> {
+                    if (e == null) {
+                        log.info("ApacheKafkaUtil--send 主题:" + recordMetadata.topic() + ",分区:" + recordMetadata.partition());
+                    } else {
+                        log.error("ApacheKafkaUtil--send 发送失败,主题:" + topic + ",值:" + value);
+                    }
+                }
+        ).get();
+    }
+
+    //* -------------------------------- Consumer --------------------------------
+
+    public static void commitAsync(KafkaConsumer<String, String> kafkaConsumer) {
+        kafkaConsumer.commitAsync();
+    }
+
+    public static void commitSync(KafkaConsumer<String, String> kafkaConsumer) {
+        kafkaConsumer.commitSync();
+    }
+
+
+}