diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/AbstractInitDataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/AbstractInitDataBase.java index 1f479e8a11ec80238fa7a7231e747184b9590237..f8fc19bff53da3f4da46d853496e5b563ad7814e 100644 --- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/AbstractInitDataBase.java +++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/AbstractInitDataBase.java @@ -29,7 +29,7 @@ import java.util.Map; @Slf4j public abstract class AbstractInitDataBase implements InitDataBase { - private BaseVersionSQL baseVersionSQLUtil; + private final BaseVersionSQL baseVersionSQLUtil; public Connection commConn = null; @@ -37,9 +37,9 @@ public abstract class AbstractInitDataBase implements InitDataBase { public JSONArray dbConfigFile = new JSONArray(); - public DataBaseProperties dataBaseProperties; + public final DataBaseProperties dataBaseProperties; - public ConscriptProperties conscriptProperties; + public final ConscriptProperties conscriptProperties; public static Boolean flag = false; @@ -331,22 +331,7 @@ public abstract class AbstractInitDataBase implements InitDataBase { } - /** - * 预制建表语句 - * @return String - */ -// private String createVersionTableSQL(){ -// return "CREATE TABLE "+conscriptProperties.getTableName()+" (\n" + -// " version VARCHAR(10),\n" + -// " desc TEXT,\n" + -// " sql_name TEXT,\n" + -// " sql_size INT,\n" + -// " execute_time VARCHAR(50)\n" + ");"; -// } - -// private String initDataVersionTableSQL(){ -// return "INSERT INTO "+conscriptProperties.getTableName()+" VALUES ('0.0','init conscript','inner.sql',2,'1900-01-01 00:00:00');"; -// } + diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/dm/DaMengDataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/dm/DaMengDataBase.java index 2f729ede38d59156aa24b28f08da2f39e972ac05..f381b9806214da494846bfc4e258b04a46e5bae2 100644 --- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/dm/DaMengDataBase.java +++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/dm/DaMengDataBase.java @@ -5,10 +5,7 @@ import com.gcc.container.components.conscript.model.ConscriptProperties; import com.gcc.container.components.conscript.model.DataBaseProperties; import lombok.extern.slf4j.Slf4j; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; +import java.sql.*; @Slf4j public class DaMengDataBase extends AbstractInitDataBase { @@ -20,11 +17,10 @@ public class DaMengDataBase extends AbstractInitDataBase { @Override public void initCommConn() { try{ - Class.forName(dataBaseProperties.getDriverClassName()); String jdbcUrl = "jdbc:dm://" + dataBaseProperties.getHost() + ":"+dataBaseProperties.getDbport(); commConn = DriverManager.getConnection(jdbcUrl,dataBaseProperties.getRootUser(),dataBaseProperties.getRootPass()); commConn.setAutoCommit(true); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :data base is not connection....."); } } @@ -32,43 +28,40 @@ public class DaMengDataBase extends AbstractInitDataBase { @Override public void initDbConn() { try{ - Class.forName(dataBaseProperties.getDriverClassName()); String jdbcUrl = "jdbc:dm://" + dataBaseProperties.getHost() + ":"+dataBaseProperties.getDbport()+"?schema="+dataBaseProperties.getSchema(); dbConn = DriverManager.getConnection(jdbcUrl,dataBaseProperties.getUsername(),dataBaseProperties.getPassword()); dbConn.setAutoCommit(true); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :data base is not connection....."); } } @Override public boolean databaseIsExist(Connection connection) { - try { - Statement stmt = connection.createStatement(); - ResultSet res = stmt.executeQuery("SELECT COUNT(DISTINCT object_name) AS NUM FROM ALL_OBJECTS WHERE object_type = 'SCH' AND object_name = '"+dataBaseProperties.getSchema()+"'"); + boolean flag = true; + try( Statement stmt = connection.createStatement(); + ResultSet res = stmt.executeQuery("SELECT COUNT(DISTINCT object_name) AS NUM FROM ALL_OBJECTS WHERE object_type = 'SCH' AND object_name = '"+dataBaseProperties.getSchema()+"'")){ if(res.next() && res.getInt(1) == 0){ stmt.close(); - return false; + flag = false; } - return true; - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :database base query is error"); } - return false; + return flag; } @Override public boolean createDataBase() { - try { - Statement stmt = commConn.createStatement(); + boolean flag = true; + try(Statement stmt = commConn.createStatement()){ stmt.execute("CREATE SCHEMA \""+dataBaseProperties.getSchema()+"\" AUTHORIZATION \""+dataBaseProperties.getUsername()+"\" "); - stmt.close(); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Create {} Database Failed !!",dataBaseProperties.getDbName()); - return false; + flag = false; } - return true; + return flag; } } diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/kingbase8/KingBase8DataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/kingbase8/KingBase8DataBase.java index 17afb45fdd3e40d09394f4d773695b27075f4ee3..590187316135466ef7748da27076b81b1bba2385 100644 --- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/kingbase8/KingBase8DataBase.java +++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/kingbase8/KingBase8DataBase.java @@ -5,10 +5,7 @@ import com.gcc.container.components.conscript.model.ConscriptProperties; import com.gcc.container.components.conscript.model.DataBaseProperties; import lombok.extern.slf4j.Slf4j; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; +import java.sql.*; @Slf4j public class KingBase8DataBase extends AbstractInitDataBase { @@ -21,11 +18,10 @@ public class KingBase8DataBase extends AbstractInitDataBase { @Override public void initCommConn() { try{ - Class.forName(dataBaseProperties.getDriverClassName()); String jdbcUrl = "jdbc:kingbase8://" + dataBaseProperties.getHost() + ":"+dataBaseProperties.getDbport()+"/TEST"; commConn = DriverManager.getConnection(jdbcUrl,dataBaseProperties.getRootUser(),dataBaseProperties.getRootPass()); commConn.setAutoCommit(true); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :data base is not connection....."); } } @@ -33,42 +29,38 @@ public class KingBase8DataBase extends AbstractInitDataBase { @Override public void initDbConn() { try{ - Class.forName(dataBaseProperties.getDriverClassName()); String jdbcUrl = "jdbc:kingbase8://" + dataBaseProperties.getHost() + ":"+dataBaseProperties.getDbport()+"/"+dataBaseProperties.getDbName()+"??currentSchema="+dataBaseProperties.getSchema()+"&characterEncoding=utf-8"; dbConn = DriverManager.getConnection(jdbcUrl,dataBaseProperties.getUsername(),dataBaseProperties.getPassword()); dbConn.setAutoCommit(true); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :data base is not connection....."); } } @Override public boolean createDataBase() { - try { - Statement stmt = commConn.createStatement(); + boolean flag = true; + try( Statement stmt = commConn.createStatement()){ stmt.execute("CREATE DATABASE "+dataBaseProperties.getDbName()+" OWNER "+dataBaseProperties.getUsername()+" encoding 'utf8' CONNECTION LIMIT 10"); stmt.execute("CREATE SCHEMA \""+dataBaseProperties.getSchema()+"\" AUTHORIZATION \""+dataBaseProperties.getUsername()+"\" "); - stmt.close(); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Create {} Database Fail!",dataBaseProperties.getDbName()); - return false; + flag = false; } - return true; + return flag; } @Override public boolean databaseIsExist(Connection connection) { - try { - Statement stmt = connection.createStatement(); - ResultSet res = stmt.executeQuery("SELECT DATNAME FROM sys_database WHERE DATNAME = '"+dataBaseProperties.getDbName()+"'"); + boolean flag = true; + try( Statement stmt = connection.createStatement(); + ResultSet res = stmt.executeQuery("SELECT DATNAME FROM sys_database WHERE DATNAME = '"+dataBaseProperties.getDbName()+"'")){ if(!res.next()){ - stmt.close(); - return false; + flag = false; } - return true; - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :database base query is error"); } - return false; + return flag; } } diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/mysql/MySqlDataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/mysql/MySqlDataBase.java index 997fb40be4525b0036509e454505177b62c60693..f3c5a443323c16883c8e8a6ffd19406ab2d332d1 100644 --- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/mysql/MySqlDataBase.java +++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/mysql/MySqlDataBase.java @@ -5,10 +5,7 @@ import com.gcc.container.components.conscript.model.ConscriptProperties; import com.gcc.container.components.conscript.model.DataBaseProperties; import lombok.extern.slf4j.Slf4j; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; +import java.sql.*; /** * Mysql数据库初始化类 @@ -24,10 +21,9 @@ public class MySqlDataBase extends AbstractInitDataBase { @Override public void initCommConn() { try { - Class.forName(dataBaseProperties.getDriverClassName()); String jdbcUrl = "jdbc:mysql://" + dataBaseProperties.getHost() + ":" + dataBaseProperties.getDbport() + "/mysql?characterEncoding=utf8&serverTimezone=GMT%2B8"; commConn = DriverManager.getConnection(jdbcUrl, dataBaseProperties.getRootUser(), dataBaseProperties.getRootPass()); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :data base is not connection....."); } } @@ -44,31 +40,28 @@ public class MySqlDataBase extends AbstractInitDataBase { @Override public boolean databaseIsExist(Connection connection){ - try { - Statement stmt = connection.createStatement(); - ResultSet res = stmt.executeQuery("SELECT COUNT(*) FROM information_schema.schemata WHERE schema_name= \""+ dataBaseProperties.getDbName()+"\""); + boolean flag = true; + try(Statement stmt = connection.createStatement(); + ResultSet res = stmt.executeQuery("SELECT COUNT(*) FROM information_schema.schemata WHERE schema_name= \""+ dataBaseProperties.getDbName()+"\"")){ if(res.next() && res.getInt(1) == 0){ - stmt.close(); - return false; + flag = false; } - return true; - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :database base query is error"); } - return false; + return flag; } @Override public boolean createDataBase() { - try { - Statement stmt = commConn.createStatement(); + boolean flag = true; + try( Statement stmt = commConn.createStatement()){ stmt.execute("CREATE DATABASE IF NOT EXISTS "+ dataBaseProperties.getDbName()+" DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci"); - stmt.close(); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Create {} Database Failed !!",dataBaseProperties.getDbName()); - return false; + flag = false; } - return true; + return flag; } diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/pgsql/PostgreSQLDataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/pgsql/PostgreSQLDataBase.java index 44578484448d392da6ed2da93317641fc4a238cc..4c5b0caaf9a5ddccf3c7dd6bad818e86a3bc745e 100644 --- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/pgsql/PostgreSQLDataBase.java +++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/pgsql/PostgreSQLDataBase.java @@ -5,10 +5,7 @@ import com.gcc.container.components.conscript.model.ConscriptProperties; import com.gcc.container.components.conscript.model.DataBaseProperties; import lombok.extern.slf4j.Slf4j; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; +import java.sql.*; @Slf4j public class PostgreSQLDataBase extends AbstractInitDataBase { @@ -20,10 +17,9 @@ public class PostgreSQLDataBase extends AbstractInitDataBase { @Override public void initCommConn() { try { - Class.forName(dataBaseProperties.getDriverClassName()); String url = "jdbc:postgresql://"+dataBaseProperties.getHost()+":"+dataBaseProperties.getDbport()+"/postgres"; commConn = DriverManager.getConnection(url, dataBaseProperties.getRootUser(), dataBaseProperties.getRootPass()); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :data base is not connection....."); } } @@ -31,40 +27,36 @@ public class PostgreSQLDataBase extends AbstractInitDataBase { @Override public void initDbConn() { try { - Class.forName(dataBaseProperties.getDriverClassName()); dbConn = DriverManager.getConnection(dataBaseProperties.getUrl(), dataBaseProperties.getUsername(), dataBaseProperties.getPassword()); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :data base {} is not connection.....",dataBaseProperties.getDbName()); } } @Override public boolean databaseIsExist(Connection connection) { - try { - Statement stmt = connection.createStatement(); - ResultSet res = stmt.executeQuery("SELECT datname FROM pg_database WHERE datname =\'"+ dataBaseProperties.getDbName()+"\'"); - if(!res.next()){ - stmt.close(); - return false; + boolean flag = true; + try(Statement stmt = connection.createStatement(); + ResultSet res = stmt.executeQuery("SELECT datname FROM pg_database WHERE datname =\'"+ dataBaseProperties.getDbName()+"\'")){ + if(res.next() && res.getInt(1) == 0){ + flag = false; } - return true; - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Database initialization :database base query is error"); } - return false; + return flag; } @Override public boolean createDataBase() { - try { - Statement stmt = commConn.createStatement(); + boolean flag = true; + try( Statement stmt = commConn.createStatement()){ stmt.execute("CREATE DATABASE "+ dataBaseProperties.getDbName()+" WITH OWNER = "+dataBaseProperties.getUsername()+" ENCODING = 'UTF-8' "); - stmt.close(); - }catch (Exception e){ + }catch (SQLException e){ log.error("【Conscript】Create {} Database Failed !!",dataBaseProperties.getDbName()); - return false; + flag = false; } - return true; + return flag; } diff --git a/kafka-component/pom.xml b/kafka-component/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..bc6abae955f5c7e956f84f0bd83688afbfcabcef --- /dev/null +++ b/kafka-component/pom.xml @@ -0,0 +1,29 @@ + + 4.0.0 + + com.gcc.container + components + 1.0.0 + + com.gcc.container.components + kafka-component + ${component.version} + jar + + + + org.springframework.kafka + spring-kafka + + + + org.springframework.boot + spring-boot-starter-quartz + + + + + + + diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/ApplicationConfiguration.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/ApplicationConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..05ba1dffc3e55a768752e2938477211504fb5142 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/ApplicationConfiguration.java @@ -0,0 +1,12 @@ +package com.gcc.container.components.kafka; + +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; +@Configuration +@EnableScheduling +@EnableAsync +@ComponentScan(basePackages = "com.gcc.container.components.kafka.*") +public class ApplicationConfiguration { +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstracTopicConsumer.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstracTopicConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..aa580dcb27d79f81df36e7e80a5d569ebcca9bc1 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstracTopicConsumer.java @@ -0,0 +1,118 @@ +package com.gcc.container.components.kafka.component.consumer; + + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * 抽象消费者 + */ +@Slf4j +public abstract class AbstracTopicConsumer implements TopicConsumer,Runnable, DisposableBean { + /** + * 缓冲管道,防止Kafka消息堆积 + */ + private final BlockingQueue BUFFER_PIPE = new LinkedBlockingQueue<>(10000); + private final List topic = new ArrayList<>(0); + + /** + * 添加线程引用 + */ + private Thread consumerThread; + + private volatile boolean consumerInitialized = false; + public AbstracTopicConsumer(String ...topics) { + if(null != topics){ + Collections.addAll(topic, topics); + } + } + + public List getTopics() { + return topic; + } + + @Override + public void onMessage(String messageObj) { + try { + if(!BUFFER_PIPE.offer(messageObj)){ + log.warn("BUFFER_PIPE is full !this message will be discarded,message={},from{}",messageObj,this.getClass().getName()); + } + initConsumerThread(); + }catch (Exception e){ + log.error("consumer[{}] message of {} consume fail",this.getClass().getName(),messageObj); + } + } + + /** + * 实现Runnable接口的run方法 + */ + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + String msg = BUFFER_PIPE.take(); + dealMessage(msg); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Consumer thread interrupted, stopping processing for {}", + this.getClass().getSimpleName()); + break; + } catch (Exception e) { + log.error("Error processing message in consumer: {}", e.getMessage(), e); + } + } + } + + + @Override + public void destroy() throws Exception { + log.info("Shutting down consumer: {}", this.getClass().getSimpleName()); + shutdown(); + } + + /** + * 提供关闭消费者线程的方法 + */ + public void shutdown() { + if (consumerThread != null && consumerThread.isAlive()) { + consumerThread.interrupt(); + try { + consumerThread.join(5000); // 等待最多5秒 + log.info("Consumer thread stopped for {}", this.getClass().getSimpleName()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while waiting for consumer thread to stop"); + } + } + } + + private void initConsumerThread() { + if (!consumerInitialized) { + synchronized (this) { + if (!consumerInitialized) { + // 创建线程并传入this作为Runnable + consumerThread = new Thread(this); + consumerThread.setName("Consumer-" + this.getClass().getSimpleName()); + consumerThread.setDaemon(true); + consumerThread.start(); + consumerInitialized = true; + log.info("Consumer thread started for {}", this.getClass().getSimpleName()); + } + } + } + } + + /** + * 真正处理消息对象 + * @param messageObj 消息体 + */ + abstract void dealMessage(String messageObj); + + + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstractTopicObserver.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstractTopicObserver.java new file mode 100644 index 0000000000000000000000000000000000000000..fdf4ec6e9657f96217297ecb95289f2bc51e72b8 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstractTopicObserver.java @@ -0,0 +1,44 @@ +package com.gcc.container.components.kafka.component.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import java.util.ArrayList; +import java.util.List; + +public abstract class AbstractTopicObserver implements TopicObserver, InitializingBean { + private static final Logger log = LoggerFactory.getLogger("TopicObserver"); + private final String topic; + private final List listeners = new ArrayList<>(); + + public AbstractTopicObserver(String topic) { + this.topic = topic; + } + + public String getTopic(){ + return this.topic; + } + + @Override + public void addConsumer(TopicConsumer consumer) { + listeners.add(consumer); + } + + @Override + public void noticeConsumer(String message) { + log.debug("message :{}", message); + for (TopicConsumer consumer: listeners){ + String copyMessage = String.valueOf(message); + try { + consumer.onMessage(copyMessage); + }catch (Exception e){ + log.error("message of {} consume fail.message={}",topic,copyMessage); + } + } + } + + @Override + public void afterPropertiesSet() throws Exception { + RegistrationHelper.registerObserver(this); + } +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/InitConsumer.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/InitConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..fc3c20a1884609fb14348e7520d1f27b0a2b3f85 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/InitConsumer.java @@ -0,0 +1,30 @@ +package com.gcc.container.components.kafka.component.consumer; + +import com.gcc.container.components.kafka.configure.ApplicationContextHandler; +import lombok.AllArgsConstructor; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.stereotype.Component; +import java.util.Map; + +@Component +@AllArgsConstructor +public class InitConsumer { + + private final KafkaListenerEndpointRegistry registry; + + @EventListener + public void onApplicationEvent(ApplicationReadyEvent event){ + Map beans = ApplicationContextHandler.getApplicationContext().getBeansOfType(AbstracTopicConsumer.class); + for (Map.Entry entry : beans.entrySet()) { + RegistrationHelper.registerConsumer(entry.getValue()); + } + //开启所有的消费者 + for (MessageListenerContainer container : registry.getListenerContainers()) { + container.start(); + } + } + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/RegistrationHelper.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/RegistrationHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..12d97f43e34132e079f52980299c907ba2db5551 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/RegistrationHelper.java @@ -0,0 +1,35 @@ +package com.gcc.container.components.kafka.component.consumer; + +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * 消费与观察者注册 + * @author GCC + */ +@Slf4j +public class RegistrationHelper { + + private final static ConcurrentMap OBSERVER_CONTEXT = new ConcurrentHashMap<>(); + + + public static void registerObserver(AbstractTopicObserver observer){ + OBSERVER_CONTEXT.put(observer.getTopic(),observer); + } + + public static void registerConsumer(AbstracTopicConsumer consumer){ + List topics = consumer.getTopics(); + for(String topic:topics) { + if (OBSERVER_CONTEXT.containsKey(topic)) { + OBSERVER_CONTEXT.get(topic).addConsumer(consumer); + } else { + log.warn(" consumer of topic ‘{}’ register fail,no this observer",topic); + } + } + } + + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicConsumer.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..6695a20bc6826bc657b79203834549a32b54ee95 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicConsumer.java @@ -0,0 +1,18 @@ +package com.gcc.container.components.kafka.component.consumer; + + +/** + * 具体的消费者 + * @author GCC + */ +interface TopicConsumer { + /** + * 响应消息对象 + * @param messageObj 消息对象 + */ + void onMessage(String messageObj); + + + + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicObserver.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicObserver.java new file mode 100644 index 0000000000000000000000000000000000000000..6f83722fcd1c8a0a92c144f6864cbec5beaa2a21 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicObserver.java @@ -0,0 +1,17 @@ +package com.gcc.container.components.kafka.component.consumer; + +interface TopicObserver { + + /** + * 新增消费者 + * @param consumer 消费者 + */ + void addConsumer(TopicConsumer consumer); + + /** + * 通知消费之 + * @param message 消息 + */ + void noticeConsumer(String message); + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/AbstractDataStreamProcessor.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/AbstractDataStreamProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..66a0d30a97404421a983eb6beaecd73620893446 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/AbstractDataStreamProcessor.java @@ -0,0 +1,29 @@ +package com.gcc.container.components.kafka.component.datastream; + + +import java.util.concurrent.atomic.AtomicBoolean; + +public abstract class AbstractDataStreamProcessor implements DataStreamProcessor { + private DataStreamProcessor nextProcessor; + + @Override + public void setNext(DataStreamProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public void handle(Object data) { + AtomicBoolean flag = disposeData(data); + if(flag.get() && null != nextProcessor){ + nextProcessor.handle(data); + } + } + + /** + * 处理数据 + * @param data 数据 + * @return AtomicBoolean 如果返回为true,则代表继续向下处理,否则,则终止 + */ + abstract AtomicBoolean disposeData(Object data); + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/DataStreamProcessor.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/DataStreamProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..22995f61de21de31d9e71137b6d8b8bc59e3fe8a --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/DataStreamProcessor.java @@ -0,0 +1,9 @@ +package com.gcc.container.components.kafka.component.datastream; + +interface DataStreamProcessor { + + void setNext(DataStreamProcessor nextProcessor); + + void handle(Object data); + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/annotation/DataStream.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/annotation/DataStream.java new file mode 100644 index 0000000000000000000000000000000000000000..7d839170ad7855e023e2573a652d7d99cb079721 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/annotation/DataStream.java @@ -0,0 +1,15 @@ +package com.gcc.container.components.kafka.component.datastream.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface DataStream { + + String dataStreamName(); + + int order() default 0; +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaCallback.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaCallback.java new file mode 100644 index 0000000000000000000000000000000000000000..319d31e5f415c4cbc77fdc9c932750d634916c44 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaCallback.java @@ -0,0 +1,16 @@ +package com.gcc.container.components.kafka.component.product; + +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFutureCallback; + +public interface KafkaCallback extends ListenableFutureCallback> { + @Override + default void onSuccess(SendResult result) { + //回调处理 + } + + @Override + default void onFailure(Throwable ex) { + //错误处理 + } +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaProduct.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaProduct.java new file mode 100644 index 0000000000000000000000000000000000000000..b34c5f4c2c856f611d6a560d7b5cb054705888ee --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaProduct.java @@ -0,0 +1,42 @@ +package com.gcc.container.components.kafka.component.product; + +/** + * 生产者工具 + * @author GCC + */ +public interface KafkaProduct { + + /** + * 向Topic发送消息 + * @param topic topic + * @param message 消息信息 + */ + void sendMessage(final String topic, Object message); + + /** + * 向Topic发送消息 + * @param topic topic + * @param key key + * @param message 消息 + */ + void sendMessage(final String topic, String key, Object message); + + /** + * 异步发送消息并回调 + * @param topic topic + * @param message 消息 + * @param callback 回调函数 + */ + void sendAsyncMessage(final String topic, Object message, KafkaCallback callback); + + + /** + *异步发送消息并回调 + * @param topic topic + * @param key key + * @param message 消息 + * @param callback 回调函数 + */ + void sendAsyncMessage(String topic, String key, String message, KafkaCallback callback); + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/impl/KafkaProductImpl.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/impl/KafkaProductImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..7d698b25c89439b862e2e8c1b5d3688b0d358301 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/impl/KafkaProductImpl.java @@ -0,0 +1,38 @@ +package com.gcc.container.components.kafka.component.product.impl; + +import com.gcc.container.components.kafka.component.product.KafkaCallback; +import com.gcc.container.components.kafka.component.product.KafkaProduct; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; + +@Component +public class KafkaProductImpl implements KafkaProduct { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Override + public void sendMessage(final String topic, Object message) { + kafkaTemplate.send(topic, message); + } + + @Override + public void sendMessage(final String topic, String key, Object message) { + kafkaTemplate.send(topic, key, message); + } + + @Override + public void sendAsyncMessage(final String topic, Object message, KafkaCallback callback) { + ListenableFuture> future = kafkaTemplate.send(topic, message); + future.addCallback(callback); + } + + @Override + public void sendAsyncMessage(String topic, String key, String message, KafkaCallback callback) { + ListenableFuture> future = kafkaTemplate.send(topic, key, message); + future.addCallback(callback); + } +} \ No newline at end of file diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/ApplicationContextHandler.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/ApplicationContextHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..f701be157cb96f66f89a4639f0f2a03d7ce3be90 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/ApplicationContextHandler.java @@ -0,0 +1,31 @@ +package com.gcc.container.components.kafka.configure; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +@Component +public class ApplicationContextHandler implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + ApplicationContextHandler.applicationContext = applicationContext; + } + + public static ApplicationContext getApplicationContext() { + return applicationContext; + } + + + public static T getBean(Class clazz) { + return applicationContext.getBean(clazz); + } + + public static T getBean(String name, Class clazz) { + return applicationContext.getBean(name, clazz); + } + +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/DataStreamConfigure.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/DataStreamConfigure.java new file mode 100644 index 0000000000000000000000000000000000000000..f7db332847bbd36bdf5a56c09f04827982b1018a --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/DataStreamConfigure.java @@ -0,0 +1,92 @@ +package com.gcc.container.components.kafka.configure; + + +import com.gcc.container.components.kafka.component.datastream.AbstractDataStreamProcessor; +import com.gcc.container.components.kafka.component.datastream.annotation.DataStream; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.ReflectionUtils; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Configuration +public class DataStreamConfigure { + private final Map> dataStreamChains = new ConcurrentHashMap<>(); + + @Autowired + public void setDataStreamProcessors(Map processors) { + processors.forEach((beanName, processor) -> { + DataStream annotation = processor.getClass().getAnnotation(DataStream.class); + if (annotation != null) { + String dataStreamName = annotation.dataStreamName(); + int order = annotation.order(); + dataStreamChains.computeIfAbsent(dataStreamName, k -> new ArrayList<>()).add(processor); + } + }); + + dataStreamChains.forEach((dataStreamName, processorsList) -> { + Collections.sort(processorsList, (p1, p2) -> { + DataStream a1 = p1.getClass().getAnnotation(DataStream.class); + DataStream a2 = p2.getClass().getAnnotation(DataStream.class); + return Integer.compare(a1.order(), a2.order()); + }); + + // 构建责任链 + AbstractDataStreamProcessor current = null; + for (AbstractDataStreamProcessor processor : processorsList) { + if (current == null) { + current = processor; + } else { + current.setNext(processor); + current = processor; + } + } + }); + } + + @Bean + public BeanPostProcessor beanPostProcessor() { + return new BeanPostProcessor() { + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof AbstractDataStreamProcessor) { + Field field = ReflectionUtils.findField(bean.getClass(), "nextProcessor"); + if (field != null) { + ReflectionUtils.makeAccessible(field); + ReflectionUtils.setField(field, bean, getNextHandler((AbstractDataStreamProcessor) bean)); + } + } + return bean; + } + + private AbstractDataStreamProcessor getNextHandler(AbstractDataStreamProcessor processor) { + DataStream annotation = processor.getClass().getAnnotation(DataStream.class); + if (annotation != null) { + String dataStreamName = annotation.dataStreamName(); + List processorsList = dataStreamChains.get(dataStreamName); + if (processorsList != null) { + int currentIndex = processorsList.indexOf(processor); + if (currentIndex < processorsList.size() - 1) { + return processorsList.get(currentIndex + 1); + } + } + } + return null; + } + }; + } + + @Bean + public Map dataStreamProcessorMap() { + return dataStreamChains.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0))); + } +} diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/KafkaConfigure.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/KafkaConfigure.java new file mode 100644 index 0000000000000000000000000000000000000000..e17e6958f910015831b586511d6000b8c1d321d5 --- /dev/null +++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/KafkaConfigure.java @@ -0,0 +1,76 @@ +package com.gcc.container.components.kafka.configure; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.*; +import java.util.HashMap; +import java.util.Map; + +@Configuration +@EnableKafka +public class KafkaConfigure { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + + @Value("${spring.kafka.consumer.auto-offset-reset}") + private String autoOffsetReset; + + @Value("${spring.kafka.consumer.enable-auto-commit}") + private boolean enableAutoCommit; + + @Value("${spring.kafka.producer.key-serializer}") + private String keySerializer; + + @Value("${spring.kafka.producer.value-serializer}") + private String valueSerializer; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { + return new KafkaListenerEndpointRegistry(); + } + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } + + +} diff --git a/kafka-component/src/main/resources/META-INF/spring.factories b/kafka-component/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000000000000000000000000000000000..53cd526ed6753baef3931c08783936a3f10c9937 --- /dev/null +++ b/kafka-component/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.gcc.container.components.kafka.ApplicationConfiguration \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5b300adc2c06f7d92b9c7fbe974a56f5e3b498c8..65ce0e6dc3526b63ed0c15be7730dde2699c5f90 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,8 @@ initdatabse-component es-component task-component + kafka-component + quickfilter-component diff --git a/quickfilter-component/pom.xml b/quickfilter-component/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..f033f94da34500ccac1b4a3df083a484268f0ba8 --- /dev/null +++ b/quickfilter-component/pom.xml @@ -0,0 +1,24 @@ + + 4.0.0 + + com.gcc.container + components + 1.0.0 + + com.gcc.container.components + quickfilter-component + ${component.version} + jar + + + + + com.google.guava + guava + 33.2.1-jre + + + + + diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/ApplicationConfiguration.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/ApplicationConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..f77f529e2755cd873d6e379a54b39806b9ef7982 --- /dev/null +++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/ApplicationConfiguration.java @@ -0,0 +1,11 @@ +package com.gcc.container.components.quickfilter; + +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; + +@Configuration +@EnableScheduling +@ComponentScan(basePackages = "com.gcc.container.components.quickfilter.*") +public class ApplicationConfiguration { +} diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/QuickFilterFactory.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/QuickFilterFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..475931898a143e4ea262e3adb93490dd81facf8b --- /dev/null +++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/QuickFilterFactory.java @@ -0,0 +1,12 @@ +package com.gcc.container.components.quickfilter.factory; + +import com.gcc.container.components.quickfilter.filter.QuickFilter; + +/** + * 快速过滤器工厂 + */ +public interface QuickFilterFactory { + QuickFilter getQuickFilter(String filterName, Integer capacity, Double missProbability); + + void destoryQuickFilter(String filterName); +} diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/AbstractQuickFilterFactory.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/AbstractQuickFilterFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..ae89385ca9d6df7c4ddab948865de93c631194fe --- /dev/null +++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/AbstractQuickFilterFactory.java @@ -0,0 +1,31 @@ +package com.gcc.container.components.quickfilter.factory.impl; + + +import com.gcc.container.components.quickfilter.factory.QuickFilterFactory; +import com.gcc.container.components.quickfilter.filter.QuickFilter; + +import java.util.concurrent.ConcurrentHashMap; + +public abstract class AbstractQuickFilterFactory implements QuickFilterFactory { + + protected final ConcurrentHashMap bloomFilters = new ConcurrentHashMap<>(); + + abstract QuickFilter createBloomFilter(Integer expectedInsertions, Double fpp); + + @Override + public QuickFilter getQuickFilter(String filterName, Integer capacity, Double missProbability) { + if(!bloomFilters.containsKey(filterName)){ + QuickFilter obj = createBloomFilter(capacity,missProbability); + bloomFilters.put(filterName,obj); + } + return bloomFilters.get(filterName); + } + + @Override + public void destoryQuickFilter(String filterName) { + if(bloomFilters.containsKey(filterName)){ + bloomFilters.get(filterName).clear(); + bloomFilters.remove(filterName); + } + } +} diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/BloomFilterFactory.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/BloomFilterFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..0bb4d160e6f8b5cd8b8a04ebb8ce07e177efbc80 --- /dev/null +++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/BloomFilterFactory.java @@ -0,0 +1,27 @@ +package com.gcc.container.components.quickfilter.factory.impl; + + +import com.gcc.container.components.quickfilter.filter.QuickFilter; +import com.gcc.container.components.quickfilter.filter.impl.BloomFilterPlus; +import com.google.common.base.Charsets; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnel; +import com.google.common.hash.PrimitiveSink; +import org.springframework.stereotype.Component; + + +@Component("BloomFilter") +public class BloomFilterFactory extends AbstractQuickFilterFactory { + + @Override + public QuickFilter createBloomFilter(Integer expectedInsertions, Double fpp) { + Funnel funnel = new Funnel() { + @Override + public void funnel(String from, PrimitiveSink into) { + into.putString(from, Charsets.UTF_8); + } + }; + return new BloomFilterPlus(BloomFilter.create(funnel, expectedInsertions, fpp)); + } + +} diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/DailyBloomFilterFactory.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/DailyBloomFilterFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..e28bd9aca51580ec7a5bd75eb2ffb320355e1967 --- /dev/null +++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/DailyBloomFilterFactory.java @@ -0,0 +1,21 @@ +package com.gcc.container.components.quickfilter.factory.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component("DailyBloomFilter") +@Slf4j +public class DailyBloomFilterFactory extends BloomFilterFactory{ + @Scheduled(cron = "59 59 23 * * ?" ) + public void resetBloomFilter() { + if(!bloomFilters.isEmpty()) { + for (String name : bloomFilters.keySet()) { + bloomFilters.remove(name); + } + if (bloomFilters.size() == 0) { + log.info("所有Bloom已经结算"); + } + } + } +} diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/QuickFilter.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/QuickFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..b755fcd0be40ecd6b605769d7ba2c9e5b9508e13 --- /dev/null +++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/QuickFilter.java @@ -0,0 +1,40 @@ +package com.gcc.container.components.quickfilter.filter; + +/** + * 快速过滤器 + */ +public interface QuickFilter { + + /** + * 是否在其中 + * @param keyword 关键字 + * @return Boolean + */ + Boolean containsKeyword(String keyword); + + /** + * 追加关键字 + * @param keyword 关键字 + */ + void append(String keyword); + + /** + * 追加内容 + * @param keyword 关键字 + * @param content 内容 + */ + void append(String keyword,Object content); + + /** + * 获取内容 + * @param keyword 关键字 + * @return String + */ + String getContent(String keyword); + + /** + * 清空释放 + */ + void clear(); + +} diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/impl/BloomFilterPlus.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/impl/BloomFilterPlus.java new file mode 100644 index 0000000000000000000000000000000000000000..dc0f5467aa85787df5f5773a0d170d1c6a3c155a --- /dev/null +++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/impl/BloomFilterPlus.java @@ -0,0 +1,63 @@ +package com.gcc.container.components.quickfilter.filter.impl; + +import com.gcc.container.components.quickfilter.util.StringZipUtil; +import com.google.common.hash.BloomFilter; +import com.gcc.container.components.quickfilter.filter.QuickFilter; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 布隆快速过滤器 + */ +public class BloomFilterPlus implements QuickFilter { + + private final ConcurrentHashMap innerCache = new ConcurrentHashMap<>(); + + private BloomFilter bloomFilter; + + public BloomFilterPlus(BloomFilter bloomFilter) { + this.bloomFilter = bloomFilter; + } + + @Override + public Boolean containsKeyword(String keyword) { + return bloomFilter.mightContain(keyword); + } + + @Override + public void append(String keyword) { + bloomFilter.put(keyword); + } + + @Override + public void append(String keyword, Object content) { + bloomFilter.put(keyword); + try { + String context = content.toString(); + innerCache.put(keyword,StringZipUtil.compressString(context)); + }catch (IOException e){ + e.printStackTrace(); + } + } + + + @Override + public String getContent(String keyword) { + byte[] value = innerCache.get(keyword); + if(null != value) { + try { + return StringZipUtil.decompressString(value); + } catch (Exception e) { + e.printStackTrace(); + } + } + return null; + } + + @Override + public void clear() { + innerCache.clear(); + this.bloomFilter = null; + } +} diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/util/StringZipUtil.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/util/StringZipUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..d48590ea723d056463d24075c646cc985ced4f82 --- /dev/null +++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/util/StringZipUtil.java @@ -0,0 +1,114 @@ +package com.gcc.container.components.quickfilter.util; + +import cn.hutool.json.JSONObject; +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Base64; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; +@Slf4j +public class StringZipUtil { + + public static byte[] compressString(String input) throws IOException { + // 将输入字符串转化为字节数组 + byte[] inputBytes = input.getBytes("UTF-8"); + + // 创建Deflater对象并设置压缩级别 + Deflater deflater = new Deflater(); + deflater.setInput(inputBytes); + deflater.finish(); + + // 使用ByteArrayOutputStream捕获输出 + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(inputBytes.length); + byte[] buffer = new byte[1024]; + while (!deflater.finished()) { + int count = deflater.deflate(buffer); // 压缩 + outputStream.write(buffer, 0, count); + } + outputStream.close(); + byte[] output = outputStream.toByteArray(); + deflater.end(); + return output; + } + + public static String decompressString(byte[] compressedData) throws IOException, DataFormatException { + Inflater inflater = new Inflater(); + inflater.setInput(compressedData); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(compressedData.length); + byte[] buffer = new byte[1024]; + while (!inflater.finished()) { + int count = inflater.inflate(buffer); + outputStream.write(buffer, 0, count); + } + outputStream.close(); + byte[] result = outputStream.toByteArray(); + inflater.end(); + return new String(result, "UTF-8"); + } + + // 解压缩方法 + public static String decompressString(String compressedData) throws IOException, DataFormatException { + // 将Base64编码的字符串转换回字节数组 + byte[] compressedBytes = Base64.getDecoder().decode(compressedData); + + Inflater inflater = new Inflater(); + inflater.setInput(compressedBytes); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(compressedBytes.length); + byte[] buffer = new byte[1024]; + while (!inflater.finished()) { + int count = inflater.inflate(buffer); + outputStream.write(buffer, 0, count); + } + outputStream.close(); + byte[] result = outputStream.toByteArray(); + inflater.end(); + + return new String(result, "UTF-8"); + } + + // 辅助函数:将字节数组转换为十六进制字符串表示 + public static String bytesToHex(byte[] bytes) { + StringBuilder hexString = new StringBuilder(2 * bytes.length); + for (byte b : bytes) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } + + public static void main(String[] args){ + + JSONObject one = new JSONObject() + .set("name","张三") + .set("age",12) + .set("opNumber",1) + .set("classNem","中华人民共和国-山东省-济南市-历下区-黄铜四路与凤鸣路1123号-幸福小学7年纪12班") + .set("userId","DSFKJFKLDJSDKLFklwe-23msndf-3242341223"); + log.info("压缩前长度为:{},",one.toJSONString(0).length()); + byte[] zipStr = null; + try { + zipStr = compressString(one.toString()); + log.info("{}",zipStr); + log.info("压缩后长度为:{}",zipStr.length); + }catch (Exception e){ + e.printStackTrace(); + } + + try { + String con = decompressString(zipStr); + log.info("解压缩为:{}",con); + }catch (Exception e){ + e.printStackTrace(); + } + + } + +} diff --git a/quickfilter-component/src/main/resources/META-INF/spring.factories b/quickfilter-component/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000000000000000000000000000000000..4304a0efd2c0da73fc7463e181bab3b500f61070 --- /dev/null +++ b/quickfilter-component/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.gcc.container.components.quickfilter.ApplicationConfiguration \ No newline at end of file diff --git a/redis-component/src/main/java/com/gcc/container/components/redis/server/RedisCacheServer.java b/redis-component/src/main/java/com/gcc/container/components/redis/server/RedisCacheServer.java index 102eb62cf57170a79a07c9a67f47ddac3cf5d148..12b6836ef6c41a4c9d82267b73fca0d1192ce71a 100644 --- a/redis-component/src/main/java/com/gcc/container/components/redis/server/RedisCacheServer.java +++ b/redis-component/src/main/java/com/gcc/container/components/redis/server/RedisCacheServer.java @@ -44,12 +44,37 @@ public interface RedisCacheServer { */ void setHash(final String key, final Map map); + + /** + * 追加哈希值 + * @param key key + * @param hashKey 哈希键 + * @param hashValue 哈希值 + * @param 值 + */ + void appendHash(final String key,final String hashKey,T hashValue); + /** * 删除哈希表缓存 * @param key 键 */ void removeHash(final String key); + /** + * 删除哈希单个值 + * @param key key + * @param hashKey 键 + */ + void removeHash(final String key,String ...hashKey); + + /** + * 刷新缓存 + * @param key key + * @param newMap 新map + * @param 泛型 + */ + void refreshHash(final String key,final Map newMap); + /** * 读取哈希表 * @param key 键 @@ -67,18 +92,34 @@ public interface RedisCacheServer { /** - * 向列表中添加元素 + * 创建列表缓存 * @param key 键 * @param values 值 */ void addList(final String key, final List values); + /** + * 单个值追加 + * @param key key + * @param values 值 + * @param 泛型 + */ + void addList(final String key,final T ...values); + /** * 删除列表缓存 * @param key 键 */ void removeList(final String key); + /** + * 刷新列表缓存 + * @param key 键 + * @param newValues 新值 + * @param 泛型 + */ + void refreshList(final String key,List newValues); + /** * 获取全部数据 @@ -106,12 +147,30 @@ public interface RedisCacheServer { */ void addSet(final String key, final Set members); + + /** + * 向集合中添加成员 + * @param key 键 + * @param members 成员 + */ + void addSet(final String key, final T ...members); + /** * 删除集合缓存 * @param key 键 */ void removeSet(final String key); + + /** + * 刷新缓存 + * @param key 键 + * @param newMembers 新值 + * @param T + */ + void refreshSet(final String key,final Set newMembers); + + /** * 读取集合中的所有成员 * @param key 键 diff --git a/redis-component/src/main/java/com/gcc/container/components/redis/server/impl/RedisCacheServerImpl.java b/redis-component/src/main/java/com/gcc/container/components/redis/server/impl/RedisCacheServerImpl.java index 7648bbaa3f1fd3978821d8a03ad3172d4b1fca31..ac8672315b17995b20ac111e25efb219fe4d9e5a 100644 --- a/redis-component/src/main/java/com/gcc/container/components/redis/server/impl/RedisCacheServerImpl.java +++ b/redis-component/src/main/java/com/gcc/container/components/redis/server/impl/RedisCacheServerImpl.java @@ -49,11 +49,28 @@ public class RedisCacheServerImpl implements RedisCacheServer { redisTemplate.opsForHash().putAll(key, map); } + + @Override + public void appendHash(String key, String hashKey, T hashValue) { + redisTemplate.opsForHash().put(key,hashKey,hashValue); + } + @Override public void removeHash(final String key) { redisTemplate.delete(key); } + @Override + public void removeHash(String key, String ...hashKey) { + redisTemplate.opsForHash().delete(key,hashKey); + } + + @Override + public void refreshHash(String key, Map newMap) { + redisTemplate.delete(key); + redisTemplate.opsForHash().putAll(key, newMap); + } + @Override public Map getHash(final String key) { Map entries = redisTemplate.opsForHash().entries(key); @@ -79,11 +96,23 @@ public class RedisCacheServerImpl implements RedisCacheServer { redisTemplate.opsForList().rightPushAll(key, values); } + @Override + public void addList(String key, T... values) { + redisTemplate.opsForList().rightPushAll(key, values); + } + @Override public void removeList(final String key) { redisTemplate.delete(key); } + + @Override + public void refreshList(String key, List newValues) { + redisTemplate.delete(key); + redisTemplate.opsForList().rightPushAll(key, newValues); + } + @Override public List getList(final String key) { Long size = redisTemplate.opsForList().size(key); @@ -104,11 +133,22 @@ public class RedisCacheServerImpl implements RedisCacheServer { redisTemplate.opsForSet().add(key, members.toArray(new Object[0])); } + @Override + public void addSet(String key, T... members) { + redisTemplate.opsForSet().add(key,members); + } + @Override public void removeSet(final String key) { redisTemplate.delete(key); } + @Override + public void refreshSet(String key, Set newMembers) { + redisTemplate.delete(key); + redisTemplate.opsForSet().add(key, newMembers.toArray(new Object[0])); + } + @Override public Set getSet(final String key) { return (Set) redisTemplate.opsForSet().members(key); diff --git a/rest-component/README.md b/rest-component/README.md new file mode 100644 index 0000000000000000000000000000000000000000..0774821e3ebb0a1b8074b5837783516587e66e87 --- /dev/null +++ b/rest-component/README.md @@ -0,0 +1,62 @@ +# rest-component + +> 基于前后端分离的项目,后端提供统一REST API服务的 Web底座 + +## 组件依赖说明 + +组件整体继承自Components包,所依赖Spring相关组件及通用的公共组件依赖跟随Components版本进行调整管控,相关内容可查看Components的README文档 + +```xml + + com.gcc.container + components + 1.0.0 + +``` + +除基本的SpringAOP、autoconfigure外无其他依赖 + +## 使用说明 + +找到对应版本的Components源码,下载到本地,本地编译安装到私仓中,在你的项目中引入: + +```xml + + + com.gcc.container.components + rest-component + 1.0.0 + +``` + +### yaml配置 + +因该组件是基于包扫描去识别定时任务类和默认以JSON文件的形式去存储项目的定时任务信息,需要通过yaml配置文件来指定包路径和文件的存放路径,引入组件后,可在自己的yml文件中追加: + +```yml +rest-component: + api-log: # API接口调用时是否打印日志 + enable: true #true | false ,默认为true + api-doc: # API 接口文档管理 + enable: true # 是否启用 + packagePath: com.gcc.container.taskschedule.taskscheduling.controller.* #具体的API包路径 + title: XXX服务 #接口文档UI页面名称 + description: 提供XXXX服务 #接口文档UI页面描述 + version: 1.0.0 #版本 +``` + +此处的api-log 包含默认值,所以不进行yml的`rest-task`仍旧可以使用该组件 + +若想使用swagger的接口文档UI说明,建议详细配置api-doc相关内容 + +### 包含功能如下: + +#### API版本管理封装@ApiVersion + + + +#### 统一异常拦截 + + + +#### 接口文档引入 diff --git a/rest-component/src/main/java/com/gcc/container/components/web/aspect/ApiNoteAspect.java b/rest-component/src/main/java/com/gcc/container/components/web/aspect/ApiNoteAspect.java new file mode 100644 index 0000000000000000000000000000000000000000..30b8d8e76b89ae96fa052d93e076112a43038f40 --- /dev/null +++ b/rest-component/src/main/java/com/gcc/container/components/web/aspect/ApiNoteAspect.java @@ -0,0 +1,89 @@ +package com.gcc.container.components.web.aspect; + + + + +import javax.servlet.http.HttpServletRequest; + +import com.gcc.container.components.web.model.RequestLog; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.annotation.After; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; + +import java.lang.reflect.Method; + +@Component +@Aspect +@Slf4j(topic = "ApiRequest") +public class ApiNoteAspect { + + @Value("${rest-component.api-log.enable:true}") + private boolean enable; + private final static String LOG_PRE_STR = "\n0000-00-00 00:00:00 INFO [ api-info] ApiRequest : "; + + @After("@annotation(org.springframework.web.bind.annotation.PostMapping)") + public void afterAdvice(JoinPoint point){ + noteApiRequestInfo(point); + } + + @Before("@annotation(org.springframework.web.bind.annotation.GetMapping)") + public void beforeAdvice(JoinPoint point){ + noteApiRequestInfo(point); + } + + + private void noteApiRequestInfo(JoinPoint point){ + if(enable) { + HttpServletRequest request = ((ServletRequestAttributes) (RequestContextHolder.getRequestAttributes())).getRequest(); + RequestLog logEntity = new RequestLog(request); + MethodSignature signature = (MethodSignature) point.getSignature(); + Method method = signature.getMethod(); + ApiOperation desc = method.getAnnotation(ApiOperation.class); + if (null != desc) { + logEntity.setDesc(desc.value()); + } + logEntity.setParam(point.getArgs()); + printInfo(logEntity); + } + } + + + /** + * 输出日志 + * @param logEntity 日志记录 + */ + private void printInfo(RequestLog logEntity){ + + StringBuilder info = new StringBuilder(LOG_PRE_STR); + info.append("================【API Request Start】================") + .append(LOG_PRE_STR) + .append("request-uri :").append(logEntity.getUri()) + .append(LOG_PRE_STR) + .append("request-desc :").append(logEntity.getDesc()) + .append(LOG_PRE_STR) + .append("request-method :").append(logEntity.getMethod()) + .append(LOG_PRE_STR) + .append("request-host :").append(logEntity.getHost()) + .append(LOG_PRE_STR) + .append("request-param :").append(logEntity.getparams()) + .append(LOG_PRE_STR) + .append("================【API Request End】================\n"); + log.info(info.toString()); + } + + + + + + + + +} diff --git a/rest-component/src/main/java/com/gcc/container/components/web/exception/BaseMessException.java b/rest-component/src/main/java/com/gcc/container/components/web/exception/BaseMessException.java new file mode 100644 index 0000000000000000000000000000000000000000..4b29da1960cf5b8f2dadbb86c13b06b757284841 --- /dev/null +++ b/rest-component/src/main/java/com/gcc/container/components/web/exception/BaseMessException.java @@ -0,0 +1,20 @@ +package com.gcc.container.components.web.exception; + +/** + * 基础消息异常 + */ +public class BaseMessException extends Exception{ + + private final Integer code; + + + public BaseMessException( Integer code,String message) { + super(message); + this.code = code; + } + + + public Integer getCode() { + return code; + } +} diff --git a/rest-component/src/main/java/com/gcc/container/components/web/exception/ControllerExceptionAdvice.java b/rest-component/src/main/java/com/gcc/container/components/web/exception/ControllerExceptionAdvice.java index ac801d02eb143577e987757d1cb0112252b0ba98..4f26a001d8ef86cbb9ca3d002edd04a5d4674bce 100644 --- a/rest-component/src/main/java/com/gcc/container/components/web/exception/ControllerExceptionAdvice.java +++ b/rest-component/src/main/java/com/gcc/container/components/web/exception/ControllerExceptionAdvice.java @@ -1,5 +1,6 @@ package com.gcc.container.components.web.exception; +import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.gcc.container.components.web.model.enums.ResponseStatus; @@ -12,13 +13,11 @@ import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.bind.MethodArgumentNotValidException; import org.springframework.web.bind.annotation.ExceptionHandler; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestControllerAdvice; import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice; import java.util.LinkedHashMap; //仅支持自定义编写的API接口全局拦截 -@RestControllerAdvice(annotations = {GetMapping.class, PostMapping.class}) +@RestControllerAdvice public class ControllerExceptionAdvice implements ResponseBodyAdvice { @Override @@ -29,6 +28,11 @@ public class ControllerExceptionAdvice implements ResponseBodyAdvice { @Override public Object beforeBodyWrite(Object data, MethodParameter returnType, MediaType mediaType, Class> aClass, ServerHttpRequest request, ServerHttpResponse response) { + //释放swagger接口页面 + if(StrUtil.containsAny(request.getURI().toString(),"swagger-resources","api-docs")){ + return data; + } + // String类型不能直接包装 if (returnType.getGenericParameterType().equals(String.class)) { ObjectMapper objectMapper = new ObjectMapper(); @@ -68,4 +72,22 @@ public class ControllerExceptionAdvice implements ResponseBodyAdvice { return new Response(ResponseStatus.VALIDATE_ERROR,null); } + + //业务提示异常 + @ExceptionHandler({BaseMessException.class}) + public Response BaseMessException(BaseMessException e) + { + // 默认统一返回响应体,填写参数错误编码, 从异常对象中拿到错误信息 + return new Response(e.getCode(),e.getMessage(),null); + } + + //业务提示异常 + @ExceptionHandler({Exception.class}) + public Response allException(Exception e) + { + // 默认统一返回响应体,填写参数错误编码, 从异常对象中拿到错误信息 + return new Response(ResponseStatus.FAILED,null); + } + + } diff --git a/rest-component/src/main/java/com/gcc/container/components/web/model/RequestLog.java b/rest-component/src/main/java/com/gcc/container/components/web/model/RequestLog.java new file mode 100644 index 0000000000000000000000000000000000000000..8fb29a7be6b4d6521e886bc43ebb7daabfa35aa6 --- /dev/null +++ b/rest-component/src/main/java/com/gcc/container/components/web/model/RequestLog.java @@ -0,0 +1,76 @@ +package com.gcc.container.components.web.model; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.URLUtil; +import lombok.Data; +import javax.servlet.http.HttpServletRequest; +@Data +public class RequestLog { + + private String uri; + + private String desc; + + private String method; + + private String host; + + private Object[] param; + + + public RequestLog(HttpServletRequest request){ + this.uri = URLUtil.toURI(request.getRequestURI()).getPath(); + this.method = request.getMethod(); + this.host = buildIpAddress(request); + this.param = buildParam(request); + } + + + public String getparams(){ + StringBuilder stringBuilder = new StringBuilder(); + if(null != this.param){ + for(Object obj:this.param){ + stringBuilder.append(obj).append(" "); + } + } + return stringBuilder.toString(); + } + + private Object[] buildParam(HttpServletRequest request){ + if("get".equalsIgnoreCase(method)){ + String url = request.getRequestURL().toString(); + String query = URLUtil.toURI(url).getQuery(); + if(StrUtil.isNotBlank(query)) { + String[] body = query.split("&"); + return body; + } + } + return null; + } + + private String buildIpAddress(HttpServletRequest request){ + String ip = request.getHeader("x-forward-for"); + if(StrUtil.isBlank(ip) || "unknow".equalsIgnoreCase(ip) ){ + ip = request.getHeader("Proxy-Client-IP"); + } + + if(StrUtil.isBlank(ip) || "unknow".equalsIgnoreCase(ip) ){ + ip = request.getHeader("WL-Proxy-Client-IP"); + } + + if(StrUtil.isBlank(ip) || "unknow".equalsIgnoreCase(ip) ){ + ip = request.getHeader("clientrealip"); + } + + if(StrUtil.isBlank(ip) || "unknow".equalsIgnoreCase(ip) ){ + ip = request.getRemoteAddr(); + } + return ip; + } + + + + + + +} diff --git a/rest-component/src/main/java/com/gcc/container/components/web/model/param/DictionaryEnum.java b/rest-component/src/main/java/com/gcc/container/components/web/model/param/DictionaryEnum.java new file mode 100644 index 0000000000000000000000000000000000000000..542adb42d66eed8079546be8bf007501c40d5559 --- /dev/null +++ b/rest-component/src/main/java/com/gcc/container/components/web/model/param/DictionaryEnum.java @@ -0,0 +1,38 @@ +package com.gcc.container.components.web.model.param; + +import java.io.Serializable; + +/** + * 参数字典枚举 + * @author GCC + */ +public interface DictionaryEnum extends Serializable { + + /** + * 获取编码 + * @return Object + */ + Object getCode(); + + /** + * 获取值 + * @return String + */ + String getValue(); + + + /** + * 根据code获取字典值 + * @param code 编码 + * @return String + */ + String coverValue(Object code); + + /** + * 根据字典值获取字典编码 + * @param value 值 + * @return Object + */ + Object coverCode(String value); + +} diff --git a/rest-component/src/main/java/com/gcc/container/components/web/model/reponse/PageData.java b/rest-component/src/main/java/com/gcc/container/components/web/model/reponse/PageData.java new file mode 100644 index 0000000000000000000000000000000000000000..6ce0d5f61bdc8dc6470682379d755ad66c3c5011 --- /dev/null +++ b/rest-component/src/main/java/com/gcc/container/components/web/model/reponse/PageData.java @@ -0,0 +1,31 @@ +package com.gcc.container.components.web.model.reponse; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class PageData implements Serializable { + + private long total; + + private List data; + + private int pageNo; + + public PageData(long total, List data) { + this.total = total; + this.data = data; + } + + public PageData(int total, List data) { + this.total = total; + this.data = data; + } + + public PageData pageNo(int pageNo){ + this.pageNo = pageNo; + return this; + } +} diff --git a/rest-component/src/main/resources/application.yml b/rest-component/src/main/resources/application.yml index 26c9e5c36c0fb3375330e36003b53433b055f1a5..ccce1777c62cd2df8c1b2fbb0f6539f097f81a63 100644 --- a/rest-component/src/main/resources/application.yml +++ b/rest-component/src/main/resources/application.yml @@ -1,4 +1,6 @@ rest-component: + api-log: + enable: true api-doc: enable: true packagePath: com.gcc.container.taskschedule.taskscheduling.controller.* diff --git a/task-component/README.md b/task-component/README.md index 724b3c5b947bb1a405a96787a5c6b21907b11de1..7f39772721dfafbbc2f87c4cf4a30ed02d2f8117 100644 --- a/task-component/README.md +++ b/task-component/README.md @@ -364,19 +364,19 @@ public interface TaskEntityRepository { */ int save(TaskEntity entity); /** - * 删除任务ID + * 删除任务,根据TaskId删除 * @param id id * @return int */ int removeByTaskId(String id); - + /** * 更新任务(除taskId外,所有字段必须更新) * @param entity 实体 * @return int */ int update(TaskEntity entity); - + /** * 查询全部任务 * @return List @@ -423,8 +423,6 @@ public class TaskEntityReponsitoryImpl implements TaskEntityRepository { return taskDataMapper.selectList(new QueryWrapper<>()); } } - - ``` 只需要实现其中指定的几个持久化和查询方法即可替换component中的JSON文件持久化 diff --git a/task-component/src/main/java/com/gcc/container/components/task/compont/InitTaskSchedulingApplication.java b/task-component/src/main/java/com/gcc/container/components/task/compont/InitTaskSchedulingApplication.java index f9199ef98cf6c6071c934ac62a9713a90bf6bc6f..78cac8e2ff1868ec98a25bac704759c90ebeb239 100644 --- a/task-component/src/main/java/com/gcc/container/components/task/compont/InitTaskSchedulingApplication.java +++ b/task-component/src/main/java/com/gcc/container/components/task/compont/InitTaskSchedulingApplication.java @@ -1,6 +1,7 @@ package com.gcc.container.components.task.compont; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; @@ -15,23 +16,12 @@ import org.springframework.stereotype.Component; @Order(1) @Slf4j @Component +@AllArgsConstructor public class InitTaskSchedulingApplication implements ApplicationRunner { - private TaskSchedule taskSchedule; + private final TaskSchedule taskSchedule; - - private TaskFactory taskFactory; - - - @Autowired - public void setTaskScheduleServer(TaskSchedule taskSchedule) { - this.taskSchedule = taskSchedule; - } - - @Autowired - public void setTaskTempFactory(TaskFactory taskFactory) { - this.taskFactory = taskFactory; - } + private final TaskFactory taskFactory; @Override public void run(ApplicationArguments args) throws Exception { diff --git a/task-component/src/main/java/com/gcc/container/components/task/compont/TaskFactory.java b/task-component/src/main/java/com/gcc/container/components/task/compont/TaskFactory.java index 1cabd5ec3b46d5ffb6776dd5f0838b5121323997..f03ea323febf39230a661193e5408cf2c13c65a6 100644 --- a/task-component/src/main/java/com/gcc/container/components/task/compont/TaskFactory.java +++ b/task-component/src/main/java/com/gcc/container/components/task/compont/TaskFactory.java @@ -11,7 +11,7 @@ import com.gcc.container.components.task.dao.TaskRepository; import com.gcc.container.components.task.model.entity.TaskEntity; import com.gcc.container.components.task.model.enums.TaskLevel; import com.gcc.container.components.task.property.TaskDataProperties; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.AllArgsConstructor; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; @@ -24,38 +24,24 @@ import java.util.*; import java.util.stream.Collectors; @Component -class TaskFactory { +@AllArgsConstructor +class TaskFactory{ - private ApplicationContext context; + private final ApplicationContext context; - private TaskRepository taskRepository; + private final TaskRepository taskRepository; - private TaskDataProperties taskDataProperties; + private final TaskDataProperties taskDataProperties; public final String CLASS_PATH = "com.gcc.container.components.task.compont.BaseMethodLevelTask"; - private List tempContainer = new ArrayList<>(); + private final List tempContainer = new ArrayList<>(); - @Autowired - public void setContext(ApplicationContext context) { - this.context = context; - } - - @Autowired - public void setTaskRepository(TaskRepository taskRepository) { - this.taskRepository = taskRepository; - } - - @Autowired - public void setTaskDataProperties(TaskDataProperties taskDataProperties) { - this.taskDataProperties = taskDataProperties; - } - public void scanTaskInfo() throws Exception { //扫描类级任务 @@ -192,4 +178,6 @@ class TaskFactory { } return 0; } + + } diff --git a/task-component/src/main/java/com/gcc/container/components/task/compont/TaskScheduleImpl.java b/task-component/src/main/java/com/gcc/container/components/task/compont/TaskScheduleImpl.java index 12725008094522ec7dbf97f218035308f8795cdb..0f8b89a48c1646bd3cf2f205c226b07419cddbd0 100644 --- a/task-component/src/main/java/com/gcc/container/components/task/compont/TaskScheduleImpl.java +++ b/task-component/src/main/java/com/gcc/container/components/task/compont/TaskScheduleImpl.java @@ -1,16 +1,15 @@ package com.gcc.container.components.task.compont; - +import cn.hutool.core.util.NumberUtil; import com.gcc.container.components.task.dao.TaskRepository; import com.gcc.container.components.task.model.entity.TaskEntity; import com.gcc.container.components.task.model.enums.TaskLevel; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; - import java.lang.reflect.Constructor; import java.util.List; import java.util.Objects; @@ -19,43 +18,32 @@ import java.util.concurrent.ScheduledFuture; @Slf4j @Component +@AllArgsConstructor public class TaskScheduleImpl implements TaskSchedule { - //正在运行的任务 - private static ConcurrentHashMap runningTasks = new ConcurrentHashMap<>(); - - //线程池任务调度 - private ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); + /** + * 正在运行的任务 + */ + private final static ConcurrentHashMap runningTasks = new ConcurrentHashMap<>(); - //内存中任务对象 - private static ConcurrentHashMap ramTasks = new ConcurrentHashMap<>(); + /** + * 线程池任务调度 + */ + private final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); - //Spring容器池,用于注入Spring的bean - private ApplicationContext context; + /** + * 内存中任务对象 + */ + private final static ConcurrentHashMap ramTasks = new ConcurrentHashMap<>(); - private TaskRepository taskRepository; + private final TaskRepository taskRepository; - private TaskFactory taskFactory; + private final TaskFactory taskFactory; - @Autowired - public void setTaskRepository(TaskRepository taskRepository) { - this.taskRepository = taskRepository; - } - - @Autowired - public void setContext(ApplicationContext context) { - this.context = context; - } - - @Autowired - public void setTaskFactory(TaskFactory taskFactory) { - this.taskFactory = taskFactory; - } - - @Autowired + private final ApplicationContext context; @Override @@ -66,7 +54,8 @@ public class TaskScheduleImpl implements TaskSchedule { @Override public void initScheduling() { int num = taskFactory.mergeTaskEntities().size(); - num = num > 0 ? num:1; + num = NumberUtil.max(num,1); + num = NumberUtil.min(num,10); this.threadPoolTaskScheduler.setPoolSize(num); this.threadPoolTaskScheduler.setThreadNamePrefix("task-thread-"); this.threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); @@ -169,4 +158,6 @@ public class TaskScheduleImpl implements TaskSchedule { } } } + + }