|
@@ -6,9 +6,7 @@ import api.common.pojo.dto.ProjectMessageDTO;
|
|
import api.common.pojo.param.KafkaParameter;
|
|
import api.common.pojo.param.KafkaParameter;
|
|
import api.common.pojo.param.MinioParameter;
|
|
import api.common.pojo.param.MinioParameter;
|
|
import api.common.pojo.param.RedisParameter;
|
|
import api.common.pojo.param.RedisParameter;
|
|
-import api.common.util.JsonUtil;
|
|
|
|
-import api.common.util.StringUtil;
|
|
|
|
-import api.common.util.TimeUtil;
|
|
|
|
|
|
+import api.common.util.*;
|
|
import com.css.simulation.resource.scheduler.feign.CommonService;
|
|
import com.css.simulation.resource.scheduler.feign.CommonService;
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
import com.css.simulation.resource.scheduler.mapper.*;
|
|
import com.css.simulation.resource.scheduler.pojo.to.*;
|
|
import com.css.simulation.resource.scheduler.pojo.to.*;
|
|
@@ -37,7 +35,7 @@ import java.util.List;
|
|
@Slf4j
|
|
@Slf4j
|
|
public class ManualProjectConsumer {
|
|
public class ManualProjectConsumer {
|
|
|
|
|
|
- private final String USER_ID = "simulation-resource-scheduler";
|
|
|
|
|
|
+ private static final String USER_ID = "simulation-resource-scheduler";
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
ProjectMapper projectMapper;
|
|
ProjectMapper projectMapper;
|
|
@@ -54,25 +52,23 @@ public class ManualProjectConsumer {
|
|
@Autowired
|
|
@Autowired
|
|
SensorOgtMapper sensorOgtMapper;
|
|
SensorOgtMapper sensorOgtMapper;
|
|
@Autowired
|
|
@Autowired
|
|
- CommonService commonService;
|
|
|
|
|
|
+ CommonService commonService;
|
|
@Autowired
|
|
@Autowired
|
|
- AlgorithmMapper algorithmMapper;
|
|
|
|
|
|
+ AlgorithmMapper algorithmMapper;
|
|
@Autowired
|
|
@Autowired
|
|
- ApiClient apiClient;
|
|
|
|
- @Value("${spring.kafka.consumer.topic.manual-project}")
|
|
|
|
- String manualProjectTopic;
|
|
|
|
- @Value("${scheduler.manual-project.result-path-minio}")
|
|
|
|
- String manualProjectResultPath;
|
|
|
|
- @Value("${scheduler.manual-project.algorithm-tar-path-linux}")
|
|
|
|
- String algorithmTarPathLinux;
|
|
|
|
|
|
+ ApiClient apiClient;
|
|
|
|
+ @Value("${scheduler.manual-project.topic}")
|
|
|
|
+ String manualProjectTopic;
|
|
|
|
+ @Value("${scheduler.linux-temp-path}")
|
|
|
|
+ String linuxTempPath;
|
|
|
|
|
|
|
|
+//
|
|
|
|
+// @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
|
|
|
|
+// public void testConsumer(ConsumerRecord<String, String> projectRecord) {
|
|
|
|
+// System.out.println("------- 消费成功:" + projectRecord.value());
|
|
|
|
+// }
|
|
|
|
|
|
- @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${spring.kafka.consumer.topic.manual-project}")
|
|
|
|
- public void testConsumer(ConsumerRecord<String, String> projectRecord) {
|
|
|
|
- System.out.println("------- 消费成功:" + projectRecord.value());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @KafkaListener(groupId = "simulation-resource-scheduler", topics = "manualProject")
|
|
|
|
|
|
+ @KafkaListener(groupId = "simulation-resource-scheduler", topics = "${scheduler.manual-project.topic}")
|
|
public void parseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
|
|
public void parseProject(ConsumerRecord<String, String> projectRecord) throws IOException, ApiException {
|
|
//1 读取 kafka 的 project 信息
|
|
//1 读取 kafka 的 project 信息
|
|
/*
|
|
/*
|
|
@@ -133,7 +129,7 @@ public class ManualProjectConsumer {
|
|
|
|
|
|
for (ScenePO scenePO : sceneList) {
|
|
for (ScenePO scenePO : sceneList) {
|
|
String taskId = StringUtil.getRandomUUID();
|
|
String taskId = StringUtil.getRandomUUID();
|
|
- String resultPath = manualProjectResultPath + projectId + "/" + taskId;
|
|
|
|
|
|
+ String resultPath = linuxTempPath + projectId + "/" + taskId;
|
|
// 保存任务信息
|
|
// 保存任务信息
|
|
TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
|
|
TaskPO taskPO = TaskPO.builder() // run_start_time 和 run_end_time 不填
|
|
.id(taskId)
|
|
.id(taskId)
|
|
@@ -204,17 +200,26 @@ public class ManualProjectConsumer {
|
|
// -------------------------------- 4 算法(一期按单机版做) --------------------------------
|
|
// -------------------------------- 4 算法(一期按单机版做) --------------------------------
|
|
// 私有仓库导入算法镜像(搭建私有仓库)
|
|
// 私有仓库导入算法镜像(搭建私有仓库)
|
|
String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
String algorithmId = projectMessageDTO.getAlgorithmId(); // 算法 id
|
|
- //4-1 根据算法 id 获取算法文件地址
|
|
|
|
- String minioPath = algorithmMapper.selectMinioPathById(algorithmId);
|
|
|
|
- String linuxPath = algorithmTarPathLinux + minioPath.substring(1);
|
|
|
|
- // 下载算法文件到本地( 2 到仓库服务器)
|
|
|
|
- Response response = commonService.download(new MinioParameter(minioPath));
|
|
|
|
- InputStream inputStream = response.body().asInputStream();
|
|
|
|
- //4-2 本地执行 docker load 算法文件成镜像(二期用 docker-java 操作 或 ssh 连接)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+ //4-1 根据算法 id 获取算法文件地址、是否已导入成镜像。
|
|
|
|
+ AlgorithmPO algorithmPO = algorithmMapper.selectById(algorithmId);
|
|
|
|
+ String minioPath = algorithmPO.getMinioPath();
|
|
|
|
+ String dockerImage;
|
|
|
|
+ if ("0".equals(algorithmPO.getDockerImport())) {
|
|
|
|
+ dockerImage = "algorithm_" + algorithmId + ":latest";
|
|
|
|
+ String algorithmTarLinuxTempPath = linuxTempPath + minioPath;
|
|
|
|
+ // 下载算法文件到本地( 2 到仓库服务器)
|
|
|
|
+ Response response = commonService.download(new MinioParameter(minioPath));
|
|
|
|
+ InputStream inputStream = response.body().asInputStream();
|
|
|
|
+ FileUtil.writeInputStreamToLocalFile(inputStream, algorithmTarLinuxTempPath);
|
|
|
|
+ //4-2 本地执行 docker load 算法文件成镜像(集群版可改成用 docker-java 操作仓库)
|
|
|
|
+ LinuxUtil.execute("docker import " + algorithmTarLinuxTempPath + " " + dockerImage);
|
|
|
|
+ } else if ("1".equals(algorithmPO.getDockerImport()) && StringUtil.isNotEmpty(algorithmPO.getDockerImage())) {
|
|
|
|
+ dockerImage = algorithmPO.getDockerImage();
|
|
|
|
+ } else {
|
|
|
|
+ throw new RuntimeException("算法 " + algorithmId + "的 mysql 数据有误!");
|
|
|
|
+ }
|
|
// -------------------------------- 5 创建 pod 开始执行 --------------------------------
|
|
// -------------------------------- 5 创建 pod 开始执行 --------------------------------
|
|
- int completions = sceneList.size();
|
|
|
|
|
|
+ int completions = sceneList.size(); // 结束标
|
|
int parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
int parallelism = projectMessageDTO.getParallelism(); // 并行度
|
|
BatchV1Api batchV1Api = new BatchV1Api(apiClient);
|
|
BatchV1Api batchV1Api = new BatchV1Api(apiClient);
|
|
V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
|
|
V1Job yaml = (V1Job) Yaml.load(ResourceUtils.getFile("classpath:kubernetes/template/job-template.yaml"));
|
|
@@ -222,34 +227,29 @@ public class ManualProjectConsumer {
|
|
//2 kind
|
|
//2 kind
|
|
//3 metadata
|
|
//3 metadata
|
|
V1ObjectMeta metadata = yaml.getMetadata();
|
|
V1ObjectMeta metadata = yaml.getMetadata();
|
|
- metadata.setName("job2");
|
|
|
|
|
|
+ metadata.setName("project_" + projectId);
|
|
yaml.setMetadata(metadata);
|
|
yaml.setMetadata(metadata);
|
|
-
|
|
|
|
- //4 spec
|
|
|
|
- V1JobSpec v1JobSpec = yaml.getSpec();
|
|
|
|
- //4-1 job
|
|
|
|
- v1JobSpec.setCompletions(completions);
|
|
|
|
- v1JobSpec.setParallelism(parallelism);
|
|
|
|
- //4-2 pod
|
|
|
|
- V1PodTemplateSpec v1PodTemplateSpec = v1JobSpec.getTemplate();
|
|
|
|
-
|
|
|
|
- v1PodTemplateSpec.setMetadata(metadata);
|
|
|
|
- V1PodSpec v1PodSpec = v1PodTemplateSpec.getSpec();
|
|
|
|
- v1PodSpec.setRestartPolicy("Never");
|
|
|
|
- //4-3 container
|
|
|
|
- List<V1Container> containers = new ArrayList<>();
|
|
|
|
- V1Container v1Container = v1PodSpec.getContainers().get(0);
|
|
|
|
- v1Container.setImage("perl");
|
|
|
|
- v1Container.setImagePullPolicy("IfNotPresent");
|
|
|
|
- v1Container.setName("pi");
|
|
|
|
- v1Container.setCommand(Arrays.asList("perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"));
|
|
|
|
- containers.add(v1Container);
|
|
|
|
|
|
+ //4 job
|
|
|
|
+ V1JobSpec job = yaml.getSpec();
|
|
|
|
+ job.setCompletions(completions); // 这个标准是什么?
|
|
|
|
+ job.setParallelism(parallelism);
|
|
|
|
+ //5 pod
|
|
|
|
+ V1PodSpec v1PodSpec = job.getTemplate().getSpec();
|
|
|
|
+ //6 container
|
|
|
|
+ List<V1Container> containers = v1PodSpec.getContainers();
|
|
|
|
+ for (V1Container container : containers) {
|
|
|
|
+ String name = container.getName();
|
|
|
|
+ if ("vtd".equals(name)) {
|
|
|
|
+ container.setName("vtd_" + projectId);
|
|
|
|
+ }
|
|
|
|
+ if ("algorithm".equals(name)) {
|
|
|
|
+ container.setName("algorithm_" + projectId);
|
|
|
|
+ container.setImage(dockerImage);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
//4-4 创建
|
|
//4-4 创建
|
|
- v1PodSpec.setContainers(containers);
|
|
|
|
- v1PodTemplateSpec.setSpec(v1PodSpec);
|
|
|
|
- v1JobSpec.setTemplate(v1PodTemplateSpec);
|
|
|
|
- yaml.setSpec(v1JobSpec);
|
|
|
|
- batchV1Api.createNamespacedJob("test-namespace1", yaml, null, null, null);
|
|
|
|
|
|
+ yaml.setSpec(job);
|
|
|
|
+ batchV1Api.createNamespacedJob("simulation-task", yaml, null, null, null);
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|