|
@@ -47,7 +47,11 @@ public class ManualProjectConsumer {
|
|
@Autowired
|
|
@Autowired
|
|
StringRedisTemplate redisTemplate;
|
|
StringRedisTemplate redisTemplate;
|
|
@Autowired
|
|
@Autowired
|
|
- ProjectMapper projectMapper;
|
|
|
|
|
|
+ ManualProjectMapper manualProjectMapper;
|
|
|
|
+ @Autowired
|
|
|
|
+ AutoProjectMapper autoProjectMapper;
|
|
|
|
+ @Autowired
|
|
|
|
+ AutoSubProjectMapper autoSubProjectMapper;
|
|
@Autowired
|
|
@Autowired
|
|
TaskMapper taskMapper;
|
|
TaskMapper taskMapper;
|
|
@Autowired
|
|
@Autowired
|
|
@@ -64,7 +68,7 @@ public class ManualProjectConsumer {
|
|
SensorOgtMapper sensorOgtMapper;
|
|
SensorOgtMapper sensorOgtMapper;
|
|
@Autowired
|
|
@Autowired
|
|
AlgorithmMapper algorithmMapper;
|
|
AlgorithmMapper algorithmMapper;
|
|
-// @Autowired
|
|
|
|
|
|
+ // @Autowired
|
|
// ApiClient apiClient;
|
|
// ApiClient apiClient;
|
|
@Value("${scheduler.manual-project.topic}")
|
|
@Value("${scheduler.manual-project.topic}")
|
|
String manualProjectTopic;
|
|
String manualProjectTopic;
|
|
@@ -77,12 +81,6 @@ public class ManualProjectConsumer {
|
|
@Value("${scheduler.linux-temp-path}")
|
|
@Value("${scheduler.linux-temp-path}")
|
|
String linuxTempPath;
|
|
String linuxTempPath;
|
|
|
|
|
|
-
|
|
|
|
- @KafkaListener(groupId = "simulation-resource-scheduler", topics = "hello")
|
|
|
|
- public void testConsumer(ConsumerRecord<String, String> projectRecord) {
|
|
|
|
- log.info("------- testConsumer 消费成功:" + projectRecord.value());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
public void parseManualProject(ConsumerRecord<String, String> projectRecord) {
|
|
public void parseManualProject(ConsumerRecord<String, String> projectRecord) {
|
|
@@ -101,7 +99,9 @@ public class ManualProjectConsumer {
|
|
String projectJson = projectRecord.value();
|
|
String projectJson = projectRecord.value();
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
ProjectMessageDTO projectMessageDTO = JsonUtil.jsonToBean(projectJson, ProjectMessageDTO.class);
|
|
String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
String projectId = projectMessageDTO.getProjectId(); // 项目 id
|
|
- projectMapper.resetProjectState(projectId, DictConstants.PROJECT_RUNNING); // 修改该 project 的状态为执行中,同时将已完成任务重置为 0 方便测试。
|
|
|
|
|
|
+
|
|
|
|
+ redisTemplate.opsForValue().set(manualProjectTopic + ":" + projectId + ":taskCompleted", "0"); // 设置项目已完成任务为 0
|
|
|
|
+ manualProjectMapper.resetProjectState(projectId, DictConstants.PROJECT_RUNNING); // 修改该 project 的状态为执行中,同时将已完成任务重置为 0 方便测试。
|
|
taskMapper.updateStateByProjectId(projectId, DictConstants.TASK_PENDING); // 将该 project 下所有任务重置为待执行方便测试。
|
|
taskMapper.updateStateByProjectId(projectId, DictConstants.TASK_PENDING); // 将该 project 下所有任务重置为待执行方便测试。
|
|
|
|
|
|
|
|
|
|
@@ -144,7 +144,7 @@ public class ManualProjectConsumer {
|
|
int taskNumber = sceneList.size();
|
|
int taskNumber = sceneList.size();
|
|
Set<ScenePO> sceneSet = new HashSet<>(sceneList);
|
|
Set<ScenePO> sceneSet = new HashSet<>(sceneList);
|
|
log.info("------- ManualProjectConsumer 共有 " + taskNumber + " 个任务,对应 " + sceneSet.size() + " 个场景!");
|
|
log.info("------- ManualProjectConsumer 共有 " + taskNumber + " 个任务,对应 " + sceneSet.size() + " 个场景!");
|
|
- projectMapper.updateTaskNumber(projectId, taskNumber);
|
|
|
|
|
|
+ manualProjectMapper.updateTaskNumber(projectId, taskNumber);
|
|
// -------------------------------- 2 模型 --------------------------------
|
|
// -------------------------------- 2 模型 --------------------------------
|
|
// 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
// 根据车辆配置id vehicleConfigId, 获取 模型信息和传感器信息
|
|
String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
String vehicleConfigId = projectMessageDTO.getVehicleConfigId();// 模型配置 id
|
|
@@ -440,7 +440,7 @@ public class ManualProjectConsumer {
|
|
|
|
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.stop-topic}")
|
|
@KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.stop-topic}")
|
|
@SneakyThrows
|
|
@SneakyThrows
|
|
- public void stopProject(ConsumerRecord<String, String> stopRecord) {
|
|
|
|
|
|
+ public void stopManualProject(ConsumerRecord<String, String> stopRecord) {
|
|
log.info("------- ManualProjectConsumer 接收到的项目终止消息为:" + stopRecord);
|
|
log.info("------- ManualProjectConsumer 接收到的项目终止消息为:" + stopRecord);
|
|
//1 读取 kafka 的项目停止信息
|
|
//1 读取 kafka 的项目停止信息
|
|
/*
|
|
/*
|
|
@@ -452,8 +452,11 @@ public class ManualProjectConsumer {
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
|
JsonNode jsonNode = objectMapper.readTree(json);
|
|
JsonNode jsonNode = objectMapper.readTree(json);
|
|
String projectId = jsonNode.path("projectId").asText();
|
|
String projectId = jsonNode.path("projectId").asText();
|
|
- projectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
|
|
|
|
|
|
+ manualProjectMapper.updateProjectState(projectId, DictConstants.PROJECT_TERMINATED, TimeUtil.getNowForMysql());
|
|
LinuxUtil.execute("kubectl delete job project-" + projectId);
|
|
LinuxUtil.execute("kubectl delete job project-" + projectId);
|
|
redisTemplate.delete(manualProjectTopic + ":" + projectId + ":check");
|
|
redisTemplate.delete(manualProjectTopic + ":" + projectId + ":check");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
}
|
|
}
|