diff --git a/src/main/java/com/easysoftware/EasysoftwareApplication.java b/src/main/java/com/easysoftware/EasysoftwareApplication.java index 22cdce01a5e5196a353f5fe231d209b6ceb6bd4e..6866b3d3b29a68a89809e4e4ea854eed4714ac12 100644 --- a/src/main/java/com/easysoftware/EasysoftwareApplication.java +++ b/src/main/java/com/easysoftware/EasysoftwareApplication.java @@ -8,10 +8,12 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; +import org.springframework.scheduling.annotation.EnableScheduling; import com.baomidou.mybatisplus.autoconfigure.DdlApplicationRunner; @SpringBootApplication +@EnableScheduling @ComponentScan(basePackages = {"com.easysoftware.*"}) @MapperScan("com.easysoftware.infrastructure.mapper") public class EasysoftwareApplication { diff --git a/src/main/java/com/easysoftware/application/BaseIService.java b/src/main/java/com/easysoftware/application/BaseIService.java new file mode 100644 index 0000000000000000000000000000000000000000..7192a344e702665711aa288255b6ad9ea081037f --- /dev/null +++ b/src/main/java/com/easysoftware/application/BaseIService.java @@ -0,0 +1,11 @@ +package com.easysoftware.application; + +import java.util.ArrayList; + +import com.baomidou.mybatisplus.extension.service.IService; + +public interface BaseIService extends IService{ + boolean existApp(String name); + void saveDataObject(String dataObject); + void saveDataObjectBatch(ArrayList dataObject); +} diff --git a/src/main/java/com/easysoftware/application/ServiceMap.java b/src/main/java/com/easysoftware/application/ServiceMap.java new file mode 100644 index 0000000000000000000000000000000000000000..8b32e4fa2f0c4790afef25c2b5ee220b4378f5f3 --- /dev/null +++ b/src/main/java/com/easysoftware/application/ServiceMap.java @@ -0,0 +1,19 @@ +package com.easysoftware.application; + +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +@Repository +public class ServiceMap { + @Autowired + private Map serviceMap; + + @Autowired + BaseIService service; + + public BaseIService getIService(String type) { + return serviceMap.getOrDefault(type, service); + } +} diff --git a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionService.java b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionService.java index 7cff9e52cc928035fc67fb4f00665ed2fae302e3..4d8fa7f93b8db8194543a6ccba9e43ffb8553745 100644 --- a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionService.java +++ b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionService.java @@ -1,17 +1,21 @@ package com.easysoftware.application.applicationversion; +import java.util.ArrayList; import java.util.List; import org.springframework.http.ResponseEntity; -import com.baomidou.mybatisplus.extension.service.IService; +import com.easysoftware.application.BaseIService; import com.easysoftware.application.applicationversion.dto.ApplicationVersionSearchCondition; import com.easysoftware.application.applicationversion.dto.InputApplicationVersion; import com.easysoftware.infrastructure.applicationversion.gatewayimpl.dataobject.ApplicationVersionDO; -public interface ApplicationVersionService extends IService{ +public interface ApplicationVersionService extends BaseIService{ ResponseEntity insertAppVersion(InputApplicationVersion listApp); ResponseEntity updateAppVersion(InputApplicationVersion inputAppVersion); ResponseEntity deleteAppVersion(List names); ResponseEntity searchAppVersion(ApplicationVersionSearchCondition condition); + boolean existApp(String name); + void saveDataObject(String dataObject); + void saveDataObjectBatch(ArrayList dataObject); } diff --git a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java index ad4f0070a2e9ed601b2c7a910dd51f8463c253de..924356c93c5bc5f2a0318d4817cf0dbc4552ecbc 100644 --- a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java +++ b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java @@ -26,7 +26,7 @@ import com.easysoftware.kafka.Producer; import jakarta.annotation.Resource; -@Service +@Service("ApplicationVersionService") public class ApplicationVersionServiceImpl extends ServiceImpl implements ApplicationVersionService { @Autowired Producer kafkaProducer; @@ -55,7 +55,9 @@ public class ApplicationVersionServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(AppVersion); + kafkaMsg.put("table", "ApplicationVersion"); + kafkaProducer.sendMess(topicAppVersion + "_version", UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } @@ -126,4 +128,20 @@ public class ApplicationVersionServiceImpl extends ServiceImpl dataObject) { + saveBatch(AppVersionGateway.convertBatch(dataObject)); + } } diff --git a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..c4ec1a4900b78f2e906aff9429ec752fa5ee688d --- /dev/null +++ b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java @@ -0,0 +1,44 @@ +package com.easysoftware.common.config; + +import java.util.Properties; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class KafkaConsumerConfig { + @Value("${consumer.topic.name}") + String topicNames; + + @Value("${consumer.topic.offset}") + String topicOffset; + + @Value("${bootstrap.servers}") + String bootstrapServers; + + @Value("${consumer.groupId}") + String groupId; + + @Value("${consumer.autoCommitIntervalMs}") + String autoCommitIntervalMs; + + @Value("${consumer.enableAutoCommit}") + String enableAutoCommit; + + @Bean + public KafkaConsumer kafkaConsumer() { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("group.id", groupId); + props.put("enable.auto.commit", enableAutoCommit); + props.put("auto.commit.interval.ms", autoCommitIntervalMs); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + return consumer; + } +} diff --git a/src/main/java/com/easysoftware/common/utils/ObjectMapperUtil.java b/src/main/java/com/easysoftware/common/utils/ObjectMapperUtil.java index 56e100aff6c365fc44d7f18f81623dedd5166d0e..c79685a730e34954bb347de3ab5406282c0938b5 100644 --- a/src/main/java/com/easysoftware/common/utils/ObjectMapperUtil.java +++ b/src/main/java/com/easysoftware/common/utils/ObjectMapperUtil.java @@ -187,7 +187,7 @@ public class ObjectMapperUtil { } } - public static Map jsonToMap(JsonNode obj) { + public static Map jsonToMap(Object obj) { Map res = objectMapper.convertValue(obj, new TypeReference>() {}); return res; } diff --git a/src/main/java/com/easysoftware/domain/applicationversion/gateway/ApplicationVersionGateway.java b/src/main/java/com/easysoftware/domain/applicationversion/gateway/ApplicationVersionGateway.java index e546c37e5eb4e3fc0cfe78f3cbd1440afedbeb53..2df742744923bc6a7fda8509561424c3d654a492 100644 --- a/src/main/java/com/easysoftware/domain/applicationversion/gateway/ApplicationVersionGateway.java +++ b/src/main/java/com/easysoftware/domain/applicationversion/gateway/ApplicationVersionGateway.java @@ -1,10 +1,12 @@ package com.easysoftware.domain.applicationversion.gateway; +import java.util.Collection; import java.util.List; import java.util.Map; import com.easysoftware.application.applicationversion.dto.ApplicationVersionSearchCondition; import com.easysoftware.domain.applicationversion.ApplicationVersion; +import com.easysoftware.infrastructure.applicationversion.gatewayimpl.dataobject.ApplicationVersionDO; public interface ApplicationVersionGateway { @@ -13,5 +15,6 @@ public interface ApplicationVersionGateway { boolean update(ApplicationVersion appVersion); boolean delete(List names); Map queryByName(ApplicationVersionSearchCondition condition); + Collection convertBatch(Collection dataObject); } diff --git a/src/main/java/com/easysoftware/infrastructure/applicationversion/gatewayimpl/ApplicationVersionGatewayImpl.java b/src/main/java/com/easysoftware/infrastructure/applicationversion/gatewayimpl/ApplicationVersionGatewayImpl.java index 8a657bbbed2bad0513a87dc1cff77905404cf169..414fb4d5fdc8bb5ffd6c8138eff255ecc135e114 100644 --- a/src/main/java/com/easysoftware/infrastructure/applicationversion/gatewayimpl/ApplicationVersionGatewayImpl.java +++ b/src/main/java/com/easysoftware/infrastructure/applicationversion/gatewayimpl/ApplicationVersionGatewayImpl.java @@ -1,5 +1,7 @@ package com.easysoftware.infrastructure.applicationversion.gatewayimpl; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -9,6 +11,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.easysoftware.application.applicationversion.dto.ApplicationVersionSearchCondition; +import com.easysoftware.common.utils.ObjectMapperUtil; import com.easysoftware.domain.applicationversion.ApplicationVersion; import com.easysoftware.domain.applicationversion.gateway.ApplicationVersionGateway; import com.easysoftware.infrastructure.applicationversion.gatewayimpl.converter.ApplicationVersionConvertor; @@ -78,6 +81,17 @@ public class ApplicationVersionGatewayImpl implements ApplicationVersionGateway return res; } + + @Override + public Collection convertBatch(Collection dataObject){ + Collection ObjList = new ArrayList<>(); + for (String obj : dataObject) { + ApplicationVersion appVer = ObjectMapperUtil.jsonToObject(obj, ApplicationVersion.class); + ApplicationVersionDO appVersionDO = ApplicationVersionConvertor.toDataObjectForUpdate(appVer); + ObjList.add(appVersionDO); + } + return ObjList; + } } diff --git a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..31cf148c4098f2b478aef0686469f24d0e3f80df --- /dev/null +++ b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java @@ -0,0 +1,20 @@ +package com.easysoftware.kafka; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PostConstruct; + +// @Service +public class AppPkgConsumer extends BaseConsumer { + @Value("${consumer.topic.name}") + String topicName; + + @Value("${consumer.topic.offset}") + String topicOffset; + + @PostConstruct + private void init() { + initConsumer(topicName + "_app", topicOffset); + } +} diff --git a/src/main/java/com/easysoftware/kafka/MyConsumer.java b/src/main/java/com/easysoftware/kafka/BaseConsumer.java similarity index 40% rename from src/main/java/com/easysoftware/kafka/MyConsumer.java rename to src/main/java/com/easysoftware/kafka/BaseConsumer.java index d655bfd43522745eb2d3852a3dad22c9fc26e794..1295a748d9f7bea424fbb97cdbb3b60b169165b5 100644 --- a/src/main/java/com/easysoftware/kafka/MyConsumer.java +++ b/src/main/java/com/easysoftware/kafka/BaseConsumer.java @@ -2,82 +2,51 @@ package com.easysoftware.kafka; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - +import java.util.HashMap; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; +import org.springframework.scheduling.annotation.Scheduled; -import com.easysoftware.application.applicationversion.ApplicationVersionService; +import com.easysoftware.application.BaseIService; +import com.easysoftware.application.ServiceMap; import com.easysoftware.common.utils.ObjectMapperUtil; -import com.easysoftware.domain.applicationversion.ApplicationVersion; -import com.easysoftware.domain.applicationversion.gateway.ApplicationVersionGateway; -import com.easysoftware.infrastructure.applicationversion.gatewayimpl.converter.ApplicationVersionConvertor; -import com.easysoftware.infrastructure.applicationversion.gatewayimpl.dataobject.ApplicationVersionDO; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.Resource; - -@Service -public class MyConsumer { - @Value("${consumer.topic.name}") - String topicNames; - - @Value("${consumer.topic.offset}") - String topicOffset; - - @Value("${bootstrap.servers}") - String bootstrapServers; - - @Value("${consumer.groupId}") - String groupId; - - @Value("${consumer.autoCommitIntervalMs}") - String autoCommitIntervalMs; - - @Value("${consumer.enableAutoCommit}") - String enableAutoCommit; +public class BaseConsumer { @Autowired - private ApplicationVersionService appVersionService; + KafkaConsumer consumer; - @Resource - ApplicationVersionGateway AppVersionGateway; + @Autowired + ServiceMap serviceMap; - private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); - private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class); - protected ArrayList> KafkaConsumerList= new ArrayList<>(); - public static KafkaConsumer consumer; + private static final Logger logger = LoggerFactory.getLogger(BaseConsumer.class); + protected ArrayList> KafkaConsumerList = new ArrayList<>(); - @PostConstruct - public void init() { - initConsumer(); + @Scheduled(fixedRate = 60000) + public void tasks() { + KafkaToMysql(); } - MyConsumer() { - service.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - KafkaToMysql(); - } - }, 5, 5, TimeUnit.SECONDS); + protected void initConsumer(String topicName, String topicOffset) { + String[] topciOffsets = topicOffset.split(","); + for (String topciOffset : topciOffsets) { + String[] tf = topciOffset.split(":"); + TopicPartition topicPartition = new TopicPartition(topicName, Integer.parseInt(tf[0])); + consumer.assign(Arrays.asList(topicPartition)); + consumer.seek(topicPartition, Integer.parseInt(tf[1])); + KafkaConsumerList.add(consumer); + } } public void KafkaToMysql() { for (KafkaConsumer customer : KafkaConsumerList) { ConsumerRecords poll = customer.poll(2); - dealDataByBatch(poll); + dealDataToTableByBatch(poll); customer.commitSync(); } } @@ -85,14 +54,17 @@ public class MyConsumer { public void dealData(ConsumerRecords records) { for (ConsumerRecord record : records) { String value = record.value(); + logger.info(value); try { - ApplicationVersion appVersion = ObjectMapperUtil.jsonToObject(value, ApplicationVersion.class); - boolean found = AppVersionGateway.existApp(appVersion.getName()); - if (found) { - logger.info(String.format("The software %s is existed", appVersion.getName())); + Map dtoMap = ObjectMapperUtil.toMap(value); + String table = dtoMap.get("table").toString(); + String name = dtoMap.get("name").toString(); + BaseIService baseIService = serviceMap.getIService(table + "Service"); + if (baseIService.existApp(name)) { + logger.info(String.format("The software %s is existed", name)); continue; } - AppVersionGateway.save(appVersion); + baseIService.saveDataObject(value); logger.info("partation: " + record.partition() + ", offset: " + record.offset()); } catch (Exception e) { @@ -101,52 +73,75 @@ public class MyConsumer { } } - public void dealDataByBatch(ConsumerRecords records) { - Collection appList = new ArrayList<>(); + // The data of a topic can only be written to the same table + public void dealDataToTableByBatch(ConsumerRecords records) { + ArrayList appList = new ArrayList<>(); + BaseIService baseIService = null; int partition = 0; long offset = 0; for (ConsumerRecord record : records) { String value = record.value(); try { - ApplicationVersion appVersion = ObjectMapperUtil.jsonToObject(value, ApplicationVersion.class); - boolean found = AppVersionGateway.existApp(appVersion.getName()); - if (found) { - logger.info(String.format("The software %s is existed", appVersion.getName())); + Map dtoMap = ObjectMapperUtil.toMap(value); + String table = dtoMap.get("table").toString(); + String name = dtoMap.get("name").toString(); + baseIService = serviceMap.getIService(table + "Service"); + if (baseIService.existApp(name)) { + logger.info(String.format("The software %s is existed", name)); continue; } - ApplicationVersionDO appVersionDO = ApplicationVersionConvertor.toDataObjectForCreate(appVersion); - appList.add(appVersionDO); + appList.add(value); partition = record.partition(); offset = record.offset(); - } catch (Exception e) { logger.error(e.getMessage() + ":" + value, e); } } if (!appList.isEmpty()) { logger.info("partation: " + partition + ", offset: " + offset); - appVersionService.saveBatch(appList); + baseIService.saveDataObjectBatch(appList); } } - public void initConsumer() { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServers); - props.put("group.id", groupId); - props.put("enable.auto.commit", enableAutoCommit); - props.put("auto.commit.interval.ms", autoCommitIntervalMs); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - consumer = new KafkaConsumer<>(props); - - String[] topics = topicNames.split(","); - String[] topciOffsets = topicOffset.split(","); - for (String topciOffset : topciOffsets) { - String[] tf = topciOffset.split(":"); - TopicPartition topicPartition = new TopicPartition(topics[0], Integer.parseInt(tf[0])); - consumer.assign(Arrays.asList(topicPartition)); - consumer.seek(topicPartition, Integer.parseInt(tf[1])); - KafkaConsumerList.add(consumer); + // The data of a topic may be written to multiple tables + public void dealDataToMultipleTables(ConsumerRecords records) { + Map> resMap = new HashMap<>(); + int partition = 0; + long offset = 0; + + for (ConsumerRecord record : records) { + String value = record.value(); + try { + Map dtoMap = ObjectMapperUtil.toMap(value); + String table = dtoMap.get("table").toString(); + String name = dtoMap.get("name").toString(); + + BaseIService baseIService = serviceMap.getIService(table + "Service"); + if (baseIService.existApp(name)) { + logger.info(String.format("The software %s is already existed", name)); + continue; + } + + if (!resMap.containsKey(table)) { + resMap.put(table, new ArrayList<>()); + } + + ArrayList tmp = resMap.get(table); + tmp.add(value); + resMap.put(table, tmp); + + partition = record.partition(); + offset = record.offset(); + } catch (Exception e) { + logger.error(e.getMessage() + ": " + value, e); + } } + resMap.forEach((table, values) -> { + if (!values.isEmpty()) { + serviceMap.getIService(table + "Service").saveDataObjectBatch(values); + } + }); + logger.info("Partition: " + partition + ", Offset: " + offset); } + } diff --git a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..87254642863b2961cf3ff79bee7e66e09dfc5af9 --- /dev/null +++ b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java @@ -0,0 +1,5 @@ +package com.easysoftware.kafka; + +public class EpkgConsumer extends BaseConsumer { + +} diff --git a/src/main/java/com/easysoftware/kafka/RpmConsumer.java b/src/main/java/com/easysoftware/kafka/RpmConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..7eb43738fe7b1107304f52e2c85aae5df8477948 --- /dev/null +++ b/src/main/java/com/easysoftware/kafka/RpmConsumer.java @@ -0,0 +1,5 @@ +package com.easysoftware.kafka; + +public class RpmConsumer extends BaseConsumer { + +} diff --git a/src/main/java/com/easysoftware/kafka/VersionConsumer.java b/src/main/java/com/easysoftware/kafka/VersionConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..e2e02068528d54ed7876afa6603f92d9c2fffa50 --- /dev/null +++ b/src/main/java/com/easysoftware/kafka/VersionConsumer.java @@ -0,0 +1,20 @@ +package com.easysoftware.kafka; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PostConstruct; + +@Service +public class VersionConsumer extends BaseConsumer { + @Value("${consumer.topic.name}") + String topicName; + + @Value("${consumer.topic.offset}") + String topicOffset; + + @PostConstruct + private void init() { + initConsumer(topicName + "_version", topicOffset); + } +}