From 9ba14d1ce72079359ed43d1404e9ba49168a22b0 Mon Sep 17 00:00:00 2001 From: kaede10 Date: Tue, 19 Mar 2024 11:55:54 +0800 Subject: [PATCH] update KafkaConsumer --- pom.xml | 5 ++ .../rpmpackage/RPMPackageServiceImpl.java | 2 + .../common/config/KafkaConsumerConfig.java | 55 +++++++++---------- .../gatewayimpl/RPMPackageGatewayImpl.java | 7 +++ .../easysoftware/kafka/AppPkgConsumer.java | 20 +++---- .../com/easysoftware/kafka/BaseConsumer.java | 42 +++++++++----- .../com/easysoftware/kafka/RpmConsumer.java | 10 ++-- .../easysoftware/kafka/VersionConsumer.java | 18 ++---- 8 files changed, 86 insertions(+), 73 deletions(-) diff --git a/pom.xml b/pom.xml index 169f8ac..c48bf62 100644 --- a/pom.xml +++ b/pom.xml @@ -123,6 +123,11 @@ 4.5.13 + + org.springframework.kafka + spring-kafka + + diff --git a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java index 287f86b..9ff221f 100644 --- a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java +++ b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java @@ -12,6 +12,7 @@ import org.springframework.context.annotation.Primary; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.easysoftware.application.applicationpackage.vo.ApplicationPackageMenuVo; @@ -161,6 +162,7 @@ public class RPMPackageServiceImpl extends ServiceImpl dataObject) { saveBatch(rPMPkgGateway.convertBatch(dataObject)); } diff --git a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java index b6373bd..a2b0ae0 100644 --- a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java +++ b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java @@ -1,45 +1,44 @@ package com.easysoftware.common.config; -import java.util.Properties; - -import org.apache.kafka.clients.consumer.KafkaConsumer; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @Configuration +@EnableKafka public class KafkaConsumerConfig { - @Value("${consumer.topic.name}") - String topicNames; - - @Value("${consumer.topic.offset}") - String topicOffset; @Value("${bootstrap.servers}") - String bootstrapServers; + private String bootstrapServers; @Value("${consumer.groupId}") - String groupId; - - @Value("${consumer.autoCommitIntervalMs}") - String autoCommitIntervalMs; - - @Value("${consumer.enableAutoCommit}") - String enableAutoCommit; + private String groupId; @Bean - public KafkaConsumer kafkaConsumer() { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServers); - props.put("group.id", groupId); - props.put("enable.auto.commit", enableAutoCommit); - props.put("auto.commit.interval.ms", autoCommitIntervalMs); - props.put("request.timeout.ms", "300000"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); + public ConsumerFactory consumerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + return new DefaultKafkaConsumerFactory<>(configProps); + } - return consumer; + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + + factory.setBatchListener(true); + return factory; } -} +} \ No newline at end of file diff --git a/src/main/java/com/easysoftware/infrastructure/rpmpackage/gatewayimpl/RPMPackageGatewayImpl.java b/src/main/java/com/easysoftware/infrastructure/rpmpackage/gatewayimpl/RPMPackageGatewayImpl.java index 95f8402..15c1467 100644 --- a/src/main/java/com/easysoftware/infrastructure/rpmpackage/gatewayimpl/RPMPackageGatewayImpl.java +++ b/src/main/java/com/easysoftware/infrastructure/rpmpackage/gatewayimpl/RPMPackageGatewayImpl.java @@ -10,6 +10,8 @@ import java.util.Map; import javax.management.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -49,6 +51,7 @@ public class RPMPackageGatewayImpl implements RPMPackageGateway { @Autowired private ObjectMapper objectMapper; + private static final Logger logger = LoggerFactory.getLogger(RPMPackageGatewayImpl.class); @Override public boolean delete(String id) { @@ -161,12 +164,16 @@ public class RPMPackageGatewayImpl implements RPMPackageGateway { @Override public Collection convertBatch(Collection dataObject){ + long startTime = System.nanoTime(); Collection ObjList = new ArrayList<>(); for (String obj : dataObject) { RPMPackage rpmPackage = ObjectMapperUtil.jsonToObject(obj, RPMPackage.class); RPMPackageDO rpmDO = RPMPackageConverter.toDataObjectForCreate(rpmPackage); ObjList.add(rpmDO); } + long endTime1 = System.nanoTime(); + long duration = (endTime1 - startTime) / 1000000; + logger.info("转换时间: " + duration + " 毫秒," + "数据量:" + dataObject.size()); return ObjList; } diff --git a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java index 31cf148..51e9dca 100644 --- a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java +++ b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java @@ -1,20 +1,14 @@ package com.easysoftware.kafka; -import org.springframework.beans.factory.annotation.Value; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; -import jakarta.annotation.PostConstruct; - -// @Service +@Service public class AppPkgConsumer extends BaseConsumer { - @Value("${consumer.topic.name}") - String topicName; - - @Value("${consumer.topic.offset}") - String topicOffset; - @PostConstruct - private void init() { - initConsumer(topicName + "_app", topicOffset); + @KafkaListener(topics = "software_test_app") + public void listen(ConsumerRecords records) { + dealDataToTableByBatch(records); } -} +} \ No newline at end of file diff --git a/src/main/java/com/easysoftware/kafka/BaseConsumer.java b/src/main/java/com/easysoftware/kafka/BaseConsumer.java index 71deaa8..dbc8b42 100644 --- a/src/main/java/com/easysoftware/kafka/BaseConsumer.java +++ b/src/main/java/com/easysoftware/kafka/BaseConsumer.java @@ -1,5 +1,6 @@ package com.easysoftware.kafka; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -11,15 +12,18 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; import com.easysoftware.application.BaseIService; import com.easysoftware.application.ServiceMap; import com.easysoftware.common.utils.ObjectMapperUtil; +// @Service public class BaseConsumer { - @Autowired - KafkaConsumer consumer; + // @Autowired + // KafkaConsumer consumer; @Autowired ServiceMap serviceMap; @@ -27,27 +31,28 @@ public class BaseConsumer { private static final Logger logger = LoggerFactory.getLogger(BaseConsumer.class); protected ArrayList> KafkaConsumerList = new ArrayList<>(); - @Scheduled(fixedRate = 30000) + // @Scheduled(fixedRate = 5000) public void tasks() { KafkaToMysql(); } - protected void initConsumer(String topicName, String topicOffset) { - String[] topciOffsets = topicOffset.split(","); - for (String topciOffset : topciOffsets) { - String[] tf = topciOffset.split(":"); - TopicPartition topicPartition = new TopicPartition(topicName, Integer.parseInt(tf[0])); - consumer.assign(Arrays.asList(topicPartition)); - consumer.seek(topicPartition, Integer.parseInt(tf[1])); - KafkaConsumerList.add(consumer); - } - } + + // protected void initConsumer(String topicName, String topicOffset) { + // String[] topciOffsets = topicOffset.split(","); + // for (String topciOffset : topciOffsets) { + // String[] tf = topciOffset.split(":"); + // TopicPartition topicPartition = new TopicPartition(topicName, Integer.parseInt(tf[0])); + // consumer.assign(Arrays.asList(topicPartition)); + // consumer.seek(topicPartition, Integer.parseInt(tf[1])); + // KafkaConsumerList.add(consumer); + // } + // } public void KafkaToMysql() { for (KafkaConsumer customer : KafkaConsumerList) { - ConsumerRecords poll = customer.poll(2); + ConsumerRecords poll = customer.poll(Duration.ofSeconds(5)); dealDataToTableByBatch(poll); - customer.commitSync(); + customer.commitAsync(); } } @@ -78,6 +83,7 @@ public class BaseConsumer { BaseIService baseIService = null; int partition = 0; long offset = 0; + long startTime = System.nanoTime(); for (ConsumerRecord record : records) { String value = record.value(); try { @@ -96,10 +102,16 @@ public class BaseConsumer { logger.error(e.getMessage() + ":" + value, e); } } + long endTime1 = System.nanoTime(); + long duration = (endTime1 - startTime) / 1000000; + logger.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size()); if (!appList.isEmpty()) { logger.info("partation: " + partition + ", offset: " + offset); baseIService.saveDataObjectBatch(appList); } + long endTime2 = System.nanoTime(); + duration = (endTime2 - endTime1) / 1000000; + logger.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size()); } // The data of a topic may be written to multiple tables diff --git a/src/main/java/com/easysoftware/kafka/RpmConsumer.java b/src/main/java/com/easysoftware/kafka/RpmConsumer.java index 9e6f958..2341a5e 100644 --- a/src/main/java/com/easysoftware/kafka/RpmConsumer.java +++ b/src/main/java/com/easysoftware/kafka/RpmConsumer.java @@ -1,10 +1,10 @@ package com.easysoftware.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; -import jakarta.annotation.PostConstruct; - @Service public class RpmConsumer extends BaseConsumer { @Value("${consumer.topic.name}") @@ -13,8 +13,8 @@ public class RpmConsumer extends BaseConsumer { @Value("${consumer.topic.offset}") String topicOffset; - @PostConstruct - private void init() { - initConsumer(topicName + "_rpm", topicOffset); + @KafkaListener(topics = "software_test_rpm", concurrency = "3") + public void listen(ConsumerRecords records) { + dealDataToTableByBatch(records); } } diff --git a/src/main/java/com/easysoftware/kafka/VersionConsumer.java b/src/main/java/com/easysoftware/kafka/VersionConsumer.java index 008511f..f36fbbd 100644 --- a/src/main/java/com/easysoftware/kafka/VersionConsumer.java +++ b/src/main/java/com/easysoftware/kafka/VersionConsumer.java @@ -1,20 +1,14 @@ package com.easysoftware.kafka; -import org.springframework.beans.factory.annotation.Value; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; -import jakarta.annotation.PostConstruct; - -// @Service +@Service public class VersionConsumer extends BaseConsumer { - @Value("${consumer.topic.name}") - String topicName; - - @Value("${consumer.topic.offset}") - String topicOffset; - @PostConstruct - private void init() { - initConsumer(topicName + "_version", topicOffset); + @KafkaListener(topics = "software_test_version") + public void listen(ConsumerRecords records) { + dealDataToTableByBatch(records); } } -- Gitee