|
@@ -10,11 +10,9 @@ import com.css.simulation.resource.scheduler.pojo.to.*;
|
|
|
import com.css.simulation.resource.scheduler.service.ProjectService;
|
|
|
import com.css.simulation.resource.scheduler.util.MinioUtil;
|
|
|
import com.css.simulation.resource.scheduler.util.ProjectUtil;
|
|
|
-import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import io.minio.MinioClient;
|
|
|
-import io.minio.errors.*;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
@@ -24,9 +22,6 @@ import org.springframework.kafka.annotation.KafkaListener;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
-import java.io.IOException;
|
|
|
-import java.security.InvalidKeyException;
|
|
|
-import java.security.NoSuchAlgorithmException;
|
|
|
import java.util.*;
|
|
|
|
|
|
@Component
|
|
@@ -149,29 +144,39 @@ public class ProjectConsumer {
|
|
|
taskPO.setModifyTime(TimeUtil.getNowForMysql());
|
|
|
taskPO.setIsDeleted("0");
|
|
|
taskMapper.insert(taskPO);
|
|
|
- // 下载 xodr 和 osgb 供仿真后生成 xosc
|
|
|
+ // 将 xosc、xodr、osgb 全部上传到仿真结果路径
|
|
|
+ String scenarioOsc = scenePO.getScenarioOsc();
|
|
|
+ String[] splitXosc = scenarioOsc.split("/");
|
|
|
+ String xoscName = splitXosc[splitXosc.length - 1];
|
|
|
+ String[] xoscNameSplit = xoscName.split("\\.");
|
|
|
+ String xoscSuffix = xoscNameSplit[xoscNameSplit.length - 1];
|
|
|
+ String xoscPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xoscName;
|
|
|
+ String xoscPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + xoscSuffix;
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsc, xoscPathOfLinux);
|
|
|
+ MinioUtil.uploadFromFile(minioClient, xoscPathOfLinux, bucketName, xoscPathOfMinio);
|
|
|
+ log.info("ProjectService--sendTaskMessage 已经将 xodr 上传到 minio 的结果文件目录:" + xoscPathOfMinio);
|
|
|
+
|
|
|
String scenarioOdr = scenePO.getScenarioOdr();
|
|
|
- String scenarioOsgb = scenePO.getScenarioOsgb();
|
|
|
String[] splitXodr = scenarioOdr.split("/");
|
|
|
String xodrName = splitXodr[splitXodr.length - 1];
|
|
|
+ String[] xodrNameSplit = xodrName.split("\\.");
|
|
|
+ String xodrSuffix = xodrNameSplit[xodrNameSplit.length - 1];
|
|
|
+ String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
|
|
|
+ String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + xodrSuffix;
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
+ MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
+ log.info("ProjectService--sendTaskMessage 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
|
|
|
+
|
|
|
+ String scenarioOsgb = scenePO.getScenarioOsgb();
|
|
|
String[] splitOsgb = scenarioOsgb.split("/");
|
|
|
String osgbName = splitOsgb[splitOsgb.length - 1];
|
|
|
- try {
|
|
|
- String xodrPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + xodrName;
|
|
|
- String xodrPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + ".xodr";
|
|
|
- String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
|
|
|
- String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + ".osgb";
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, scenarioOdr, xodrPathOfLinux);
|
|
|
- MinioUtil.uploadFromFile(minioClient, xodrPathOfLinux, bucketName, xodrPathOfMinio);
|
|
|
- log.info("ProjectService--sendTaskMessage 已经将 xodr 上传到 minio 的结果文件目录:" + xodrPathOfMinio);
|
|
|
- MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
|
|
|
- MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
|
|
|
- log.info("ProjectService--sendTaskMessage 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
|
|
|
- } catch (IOException | ServerException | InsufficientDataException | ErrorResponseException |
|
|
|
- NoSuchAlgorithmException | InvalidKeyException | InvalidResponseException |
|
|
|
- XmlParserException | InternalException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
+ String[] osgbNameSplit = osgbName.split("\\.");
|
|
|
+ String osgbSuffix = osgbNameSplit[osgbNameSplit.length - 1];
|
|
|
+ String osgbPathOfLinux = linuxTempPath + "video/" + projectId + "/" + taskId + "/" + osgbName;
|
|
|
+ String osgbPathOfMinio = projectResultPathOfMinio + projectId + "/" + taskId + "/" + taskId + osgbSuffix;
|
|
|
+ MinioUtil.downloadToFile(minioClient, bucketName, scenarioOsgb, osgbPathOfLinux);
|
|
|
+ MinioUtil.uploadFromFile(minioClient, osgbPathOfLinux, bucketName, osgbPathOfMinio);
|
|
|
+ log.info("ProjectService--sendTaskMessage 已经将 osgb 上传到 minio 的结果文件目录:" + osgbPathOfMinio);
|
|
|
|
|
|
// 组装 task 消息
|
|
|
TaskTO taskTO = TaskTO.builder()
|
|
@@ -182,9 +187,9 @@ public class ProjectConsumer {
|
|
|
.default_time(videoTime)
|
|
|
.build())
|
|
|
.scenario(ScenarioTO.builder()
|
|
|
- .scenario_osc(scenePO.getScenarioOsc())
|
|
|
- .scenario_odr(scenarioOdr)
|
|
|
- .scenario_osgb(scenarioOsgb)
|
|
|
+ .scenario_osc(xoscPathOfMinio)
|
|
|
+ .scenario_odr(xodrPathOfMinio)
|
|
|
+ .scenario_osgb(osgbPathOfMinio)
|
|
|
.build())
|
|
|
.vehicle(VehicleTO.builder()
|
|
|
.model(ModelTO.builder()
|
|
@@ -216,15 +221,8 @@ public class ProjectConsumer {
|
|
|
.build())
|
|
|
.build();
|
|
|
|
|
|
- //4-4 将对象转成 json
|
|
|
- String taskJson = "";
|
|
|
- try {
|
|
|
- taskJson = JsonUtil.beanToJson(taskTO);
|
|
|
- } catch (JsonProcessingException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- //4-5 将 projectId 作为 topic 名称,根据 parallelism 分散发送 task 信息到 kafka
|
|
|
- String finalTaskJson = taskJson;
|
|
|
+ //4-4 将对象转成 json 保存到临时目录等待资源分配后执行
|
|
|
+ FileUtil.writeStringToLocalFile(JsonUtil.beanToJson(taskTO), projectPath + taskId + ".json");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -236,7 +234,6 @@ public class ProjectConsumer {
|
|
|
*
|
|
|
* @param projectRecord 项目启动消息
|
|
|
*/
|
|
|
- @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.start-topic}")
|
|
|
@SneakyThrows
|
|
|
public void cacheProject(ConsumerRecord<String, String> projectRecord) {
|
|
|
String initialProjectJson = projectRecord.value();
|