diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/AbstractInitDataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/AbstractInitDataBase.java
index 1f479e8a11ec80238fa7a7231e747184b9590237..f8fc19bff53da3f4da46d853496e5b563ad7814e 100644
--- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/AbstractInitDataBase.java
+++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/AbstractInitDataBase.java
@@ -29,7 +29,7 @@ import java.util.Map;
@Slf4j
public abstract class AbstractInitDataBase implements InitDataBase {
- private BaseVersionSQL baseVersionSQLUtil;
+ private final BaseVersionSQL baseVersionSQLUtil;
public Connection commConn = null;
@@ -37,9 +37,9 @@ public abstract class AbstractInitDataBase implements InitDataBase {
public JSONArray dbConfigFile = new JSONArray();
- public DataBaseProperties dataBaseProperties;
+ public final DataBaseProperties dataBaseProperties;
- public ConscriptProperties conscriptProperties;
+ public final ConscriptProperties conscriptProperties;
public static Boolean flag = false;
@@ -331,22 +331,7 @@ public abstract class AbstractInitDataBase implements InitDataBase {
}
- /**
- * 预制建表语句
- * @return String
- */
-// private String createVersionTableSQL(){
-// return "CREATE TABLE "+conscriptProperties.getTableName()+" (\n" +
-// " version VARCHAR(10),\n" +
-// " desc TEXT,\n" +
-// " sql_name TEXT,\n" +
-// " sql_size INT,\n" +
-// " execute_time VARCHAR(50)\n" + ");";
-// }
-
-// private String initDataVersionTableSQL(){
-// return "INSERT INTO "+conscriptProperties.getTableName()+" VALUES ('0.0','init conscript','inner.sql',2,'1900-01-01 00:00:00');";
-// }
+
diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/dm/DaMengDataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/dm/DaMengDataBase.java
index 2f729ede38d59156aa24b28f08da2f39e972ac05..f381b9806214da494846bfc4e258b04a46e5bae2 100644
--- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/dm/DaMengDataBase.java
+++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/dm/DaMengDataBase.java
@@ -5,10 +5,7 @@ import com.gcc.container.components.conscript.model.ConscriptProperties;
import com.gcc.container.components.conscript.model.DataBaseProperties;
import lombok.extern.slf4j.Slf4j;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
+import java.sql.*;
@Slf4j
public class DaMengDataBase extends AbstractInitDataBase {
@@ -20,11 +17,10 @@ public class DaMengDataBase extends AbstractInitDataBase {
@Override
public void initCommConn() {
try{
- Class.forName(dataBaseProperties.getDriverClassName());
String jdbcUrl = "jdbc:dm://" + dataBaseProperties.getHost() + ":"+dataBaseProperties.getDbport();
commConn = DriverManager.getConnection(jdbcUrl,dataBaseProperties.getRootUser(),dataBaseProperties.getRootPass());
commConn.setAutoCommit(true);
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :data base is not connection.....");
}
}
@@ -32,43 +28,40 @@ public class DaMengDataBase extends AbstractInitDataBase {
@Override
public void initDbConn() {
try{
- Class.forName(dataBaseProperties.getDriverClassName());
String jdbcUrl = "jdbc:dm://" + dataBaseProperties.getHost() + ":"+dataBaseProperties.getDbport()+"?schema="+dataBaseProperties.getSchema();
dbConn = DriverManager.getConnection(jdbcUrl,dataBaseProperties.getUsername(),dataBaseProperties.getPassword());
dbConn.setAutoCommit(true);
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :data base is not connection.....");
}
}
@Override
public boolean databaseIsExist(Connection connection) {
- try {
- Statement stmt = connection.createStatement();
- ResultSet res = stmt.executeQuery("SELECT COUNT(DISTINCT object_name) AS NUM FROM ALL_OBJECTS WHERE object_type = 'SCH' AND object_name = '"+dataBaseProperties.getSchema()+"'");
+ boolean flag = true;
+ try( Statement stmt = connection.createStatement();
+ ResultSet res = stmt.executeQuery("SELECT COUNT(DISTINCT object_name) AS NUM FROM ALL_OBJECTS WHERE object_type = 'SCH' AND object_name = '"+dataBaseProperties.getSchema()+"'")){
if(res.next() && res.getInt(1) == 0){
stmt.close();
- return false;
+ flag = false;
}
- return true;
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :database base query is error");
}
- return false;
+ return flag;
}
@Override
public boolean createDataBase() {
- try {
- Statement stmt = commConn.createStatement();
+ boolean flag = true;
+ try(Statement stmt = commConn.createStatement()){
stmt.execute("CREATE SCHEMA \""+dataBaseProperties.getSchema()+"\" AUTHORIZATION \""+dataBaseProperties.getUsername()+"\" ");
- stmt.close();
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Create {} Database Failed !!",dataBaseProperties.getDbName());
- return false;
+ flag = false;
}
- return true;
+ return flag;
}
}
diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/kingbase8/KingBase8DataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/kingbase8/KingBase8DataBase.java
index 17afb45fdd3e40d09394f4d773695b27075f4ee3..590187316135466ef7748da27076b81b1bba2385 100644
--- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/kingbase8/KingBase8DataBase.java
+++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/kingbase8/KingBase8DataBase.java
@@ -5,10 +5,7 @@ import com.gcc.container.components.conscript.model.ConscriptProperties;
import com.gcc.container.components.conscript.model.DataBaseProperties;
import lombok.extern.slf4j.Slf4j;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
+import java.sql.*;
@Slf4j
public class KingBase8DataBase extends AbstractInitDataBase {
@@ -21,11 +18,10 @@ public class KingBase8DataBase extends AbstractInitDataBase {
@Override
public void initCommConn() {
try{
- Class.forName(dataBaseProperties.getDriverClassName());
String jdbcUrl = "jdbc:kingbase8://" + dataBaseProperties.getHost() + ":"+dataBaseProperties.getDbport()+"/TEST";
commConn = DriverManager.getConnection(jdbcUrl,dataBaseProperties.getRootUser(),dataBaseProperties.getRootPass());
commConn.setAutoCommit(true);
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :data base is not connection.....");
}
}
@@ -33,42 +29,38 @@ public class KingBase8DataBase extends AbstractInitDataBase {
@Override
public void initDbConn() {
try{
- Class.forName(dataBaseProperties.getDriverClassName());
String jdbcUrl = "jdbc:kingbase8://" + dataBaseProperties.getHost() + ":"+dataBaseProperties.getDbport()+"/"+dataBaseProperties.getDbName()+"??currentSchema="+dataBaseProperties.getSchema()+"&characterEncoding=utf-8";
dbConn = DriverManager.getConnection(jdbcUrl,dataBaseProperties.getUsername(),dataBaseProperties.getPassword());
dbConn.setAutoCommit(true);
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :data base is not connection.....");
}
}
@Override
public boolean createDataBase() {
- try {
- Statement stmt = commConn.createStatement();
+ boolean flag = true;
+ try( Statement stmt = commConn.createStatement()){
stmt.execute("CREATE DATABASE "+dataBaseProperties.getDbName()+" OWNER "+dataBaseProperties.getUsername()+" encoding 'utf8' CONNECTION LIMIT 10");
stmt.execute("CREATE SCHEMA \""+dataBaseProperties.getSchema()+"\" AUTHORIZATION \""+dataBaseProperties.getUsername()+"\" ");
- stmt.close();
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Create {} Database Fail!",dataBaseProperties.getDbName());
- return false;
+ flag = false;
}
- return true;
+ return flag;
}
@Override
public boolean databaseIsExist(Connection connection) {
- try {
- Statement stmt = connection.createStatement();
- ResultSet res = stmt.executeQuery("SELECT DATNAME FROM sys_database WHERE DATNAME = '"+dataBaseProperties.getDbName()+"'");
+ boolean flag = true;
+ try( Statement stmt = connection.createStatement();
+ ResultSet res = stmt.executeQuery("SELECT DATNAME FROM sys_database WHERE DATNAME = '"+dataBaseProperties.getDbName()+"'")){
if(!res.next()){
- stmt.close();
- return false;
+ flag = false;
}
- return true;
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :database base query is error");
}
- return false;
+ return flag;
}
}
diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/mysql/MySqlDataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/mysql/MySqlDataBase.java
index 997fb40be4525b0036509e454505177b62c60693..f3c5a443323c16883c8e8a6ffd19406ab2d332d1 100644
--- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/mysql/MySqlDataBase.java
+++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/mysql/MySqlDataBase.java
@@ -5,10 +5,7 @@ import com.gcc.container.components.conscript.model.ConscriptProperties;
import com.gcc.container.components.conscript.model.DataBaseProperties;
import lombok.extern.slf4j.Slf4j;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
+import java.sql.*;
/**
* Mysql数据库初始化类
@@ -24,10 +21,9 @@ public class MySqlDataBase extends AbstractInitDataBase {
@Override
public void initCommConn() {
try {
- Class.forName(dataBaseProperties.getDriverClassName());
String jdbcUrl = "jdbc:mysql://" + dataBaseProperties.getHost() + ":" + dataBaseProperties.getDbport() + "/mysql?characterEncoding=utf8&serverTimezone=GMT%2B8";
commConn = DriverManager.getConnection(jdbcUrl, dataBaseProperties.getRootUser(), dataBaseProperties.getRootPass());
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :data base is not connection.....");
}
}
@@ -44,31 +40,28 @@ public class MySqlDataBase extends AbstractInitDataBase {
@Override
public boolean databaseIsExist(Connection connection){
- try {
- Statement stmt = connection.createStatement();
- ResultSet res = stmt.executeQuery("SELECT COUNT(*) FROM information_schema.schemata WHERE schema_name= \""+ dataBaseProperties.getDbName()+"\"");
+ boolean flag = true;
+ try(Statement stmt = connection.createStatement();
+ ResultSet res = stmt.executeQuery("SELECT COUNT(*) FROM information_schema.schemata WHERE schema_name= \""+ dataBaseProperties.getDbName()+"\"")){
if(res.next() && res.getInt(1) == 0){
- stmt.close();
- return false;
+ flag = false;
}
- return true;
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :database base query is error");
}
- return false;
+ return flag;
}
@Override
public boolean createDataBase() {
- try {
- Statement stmt = commConn.createStatement();
+ boolean flag = true;
+ try( Statement stmt = commConn.createStatement()){
stmt.execute("CREATE DATABASE IF NOT EXISTS "+ dataBaseProperties.getDbName()+" DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci");
- stmt.close();
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Create {} Database Failed !!",dataBaseProperties.getDbName());
- return false;
+ flag = false;
}
- return true;
+ return flag;
}
diff --git a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/pgsql/PostgreSQLDataBase.java b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/pgsql/PostgreSQLDataBase.java
index 44578484448d392da6ed2da93317641fc4a238cc..4c5b0caaf9a5ddccf3c7dd6bad818e86a3bc745e 100644
--- a/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/pgsql/PostgreSQLDataBase.java
+++ b/initdatabse-component/src/main/java/com/gcc/container/components/conscript/action/pgsql/PostgreSQLDataBase.java
@@ -5,10 +5,7 @@ import com.gcc.container.components.conscript.model.ConscriptProperties;
import com.gcc.container.components.conscript.model.DataBaseProperties;
import lombok.extern.slf4j.Slf4j;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
+import java.sql.*;
@Slf4j
public class PostgreSQLDataBase extends AbstractInitDataBase {
@@ -20,10 +17,9 @@ public class PostgreSQLDataBase extends AbstractInitDataBase {
@Override
public void initCommConn() {
try {
- Class.forName(dataBaseProperties.getDriverClassName());
String url = "jdbc:postgresql://"+dataBaseProperties.getHost()+":"+dataBaseProperties.getDbport()+"/postgres";
commConn = DriverManager.getConnection(url, dataBaseProperties.getRootUser(), dataBaseProperties.getRootPass());
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :data base is not connection.....");
}
}
@@ -31,40 +27,36 @@ public class PostgreSQLDataBase extends AbstractInitDataBase {
@Override
public void initDbConn() {
try {
- Class.forName(dataBaseProperties.getDriverClassName());
dbConn = DriverManager.getConnection(dataBaseProperties.getUrl(), dataBaseProperties.getUsername(), dataBaseProperties.getPassword());
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :data base {} is not connection.....",dataBaseProperties.getDbName());
}
}
@Override
public boolean databaseIsExist(Connection connection) {
- try {
- Statement stmt = connection.createStatement();
- ResultSet res = stmt.executeQuery("SELECT datname FROM pg_database WHERE datname =\'"+ dataBaseProperties.getDbName()+"\'");
- if(!res.next()){
- stmt.close();
- return false;
+ boolean flag = true;
+ try(Statement stmt = connection.createStatement();
+ ResultSet res = stmt.executeQuery("SELECT datname FROM pg_database WHERE datname =\'"+ dataBaseProperties.getDbName()+"\'")){
+ if(res.next() && res.getInt(1) == 0){
+ flag = false;
}
- return true;
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Database initialization :database base query is error");
}
- return false;
+ return flag;
}
@Override
public boolean createDataBase() {
- try {
- Statement stmt = commConn.createStatement();
+ boolean flag = true;
+ try( Statement stmt = commConn.createStatement()){
stmt.execute("CREATE DATABASE "+ dataBaseProperties.getDbName()+" WITH OWNER = "+dataBaseProperties.getUsername()+" ENCODING = 'UTF-8' ");
- stmt.close();
- }catch (Exception e){
+ }catch (SQLException e){
log.error("【Conscript】Create {} Database Failed !!",dataBaseProperties.getDbName());
- return false;
+ flag = false;
}
- return true;
+ return flag;
}
diff --git a/kafka-component/pom.xml b/kafka-component/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..bc6abae955f5c7e956f84f0bd83688afbfcabcef
--- /dev/null
+++ b/kafka-component/pom.xml
@@ -0,0 +1,29 @@
+
+ 4.0.0
+
+ com.gcc.container
+ components
+ 1.0.0
+
+ com.gcc.container.components
+ kafka-component
+ ${component.version}
+ jar
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+ org.springframework.boot
+ spring-boot-starter-quartz
+
+
+
+
+
+
+
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/ApplicationConfiguration.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/ApplicationConfiguration.java
new file mode 100644
index 0000000000000000000000000000000000000000..05ba1dffc3e55a768752e2938477211504fb5142
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/ApplicationConfiguration.java
@@ -0,0 +1,12 @@
+package com.gcc.container.components.kafka;
+
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
+@Configuration
+@EnableScheduling
+@EnableAsync
+@ComponentScan(basePackages = "com.gcc.container.components.kafka.*")
+public class ApplicationConfiguration {
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstracTopicConsumer.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstracTopicConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..aa580dcb27d79f81df36e7e80a5d569ebcca9bc1
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstracTopicConsumer.java
@@ -0,0 +1,118 @@
+package com.gcc.container.components.kafka.component.consumer;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.DisposableBean;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * 抽象消费者
+ */
+@Slf4j
+public abstract class AbstracTopicConsumer implements TopicConsumer,Runnable, DisposableBean {
+ /**
+ * 缓冲管道,防止Kafka消息堆积
+ */
+ private final BlockingQueue BUFFER_PIPE = new LinkedBlockingQueue<>(10000);
+ private final List topic = new ArrayList<>(0);
+
+ /**
+ * 添加线程引用
+ */
+ private Thread consumerThread;
+
+ private volatile boolean consumerInitialized = false;
+ public AbstracTopicConsumer(String ...topics) {
+ if(null != topics){
+ Collections.addAll(topic, topics);
+ }
+ }
+
+ public List getTopics() {
+ return topic;
+ }
+
+ @Override
+ public void onMessage(String messageObj) {
+ try {
+ if(!BUFFER_PIPE.offer(messageObj)){
+ log.warn("BUFFER_PIPE is full !this message will be discarded,message={},from{}",messageObj,this.getClass().getName());
+ }
+ initConsumerThread();
+ }catch (Exception e){
+ log.error("consumer[{}] message of {} consume fail",this.getClass().getName(),messageObj);
+ }
+ }
+
+ /**
+ * 实现Runnable接口的run方法
+ */
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ String msg = BUFFER_PIPE.take();
+ dealMessage(msg);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Consumer thread interrupted, stopping processing for {}",
+ this.getClass().getSimpleName());
+ break;
+ } catch (Exception e) {
+ log.error("Error processing message in consumer: {}", e.getMessage(), e);
+ }
+ }
+ }
+
+
+ @Override
+ public void destroy() throws Exception {
+ log.info("Shutting down consumer: {}", this.getClass().getSimpleName());
+ shutdown();
+ }
+
+ /**
+ * 提供关闭消费者线程的方法
+ */
+ public void shutdown() {
+ if (consumerThread != null && consumerThread.isAlive()) {
+ consumerThread.interrupt();
+ try {
+ consumerThread.join(5000); // 等待最多5秒
+ log.info("Consumer thread stopped for {}", this.getClass().getSimpleName());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Interrupted while waiting for consumer thread to stop");
+ }
+ }
+ }
+
+ private void initConsumerThread() {
+ if (!consumerInitialized) {
+ synchronized (this) {
+ if (!consumerInitialized) {
+ // 创建线程并传入this作为Runnable
+ consumerThread = new Thread(this);
+ consumerThread.setName("Consumer-" + this.getClass().getSimpleName());
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ consumerInitialized = true;
+ log.info("Consumer thread started for {}", this.getClass().getSimpleName());
+ }
+ }
+ }
+ }
+
+ /**
+ * 真正处理消息对象
+ * @param messageObj 消息体
+ */
+ abstract void dealMessage(String messageObj);
+
+
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstractTopicObserver.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstractTopicObserver.java
new file mode 100644
index 0000000000000000000000000000000000000000..fdf4ec6e9657f96217297ecb95289f2bc51e72b8
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/AbstractTopicObserver.java
@@ -0,0 +1,44 @@
+package com.gcc.container.components.kafka.component.consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractTopicObserver implements TopicObserver, InitializingBean {
+ private static final Logger log = LoggerFactory.getLogger("TopicObserver");
+ private final String topic;
+ private final List listeners = new ArrayList<>();
+
+ public AbstractTopicObserver(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTopic(){
+ return this.topic;
+ }
+
+ @Override
+ public void addConsumer(TopicConsumer consumer) {
+ listeners.add(consumer);
+ }
+
+ @Override
+ public void noticeConsumer(String message) {
+ log.debug("message :{}", message);
+ for (TopicConsumer consumer: listeners){
+ String copyMessage = String.valueOf(message);
+ try {
+ consumer.onMessage(copyMessage);
+ }catch (Exception e){
+ log.error("message of {} consume fail.message={}",topic,copyMessage);
+ }
+ }
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ RegistrationHelper.registerObserver(this);
+ }
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/InitConsumer.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/InitConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..fc3c20a1884609fb14348e7520d1f27b0a2b3f85
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/InitConsumer.java
@@ -0,0 +1,30 @@
+package com.gcc.container.components.kafka.component.consumer;
+
+import com.gcc.container.components.kafka.configure.ApplicationContextHandler;
+import lombok.AllArgsConstructor;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.listener.MessageListenerContainer;
+import org.springframework.stereotype.Component;
+import java.util.Map;
+
+@Component
+@AllArgsConstructor
+public class InitConsumer {
+
+ private final KafkaListenerEndpointRegistry registry;
+
+ @EventListener
+ public void onApplicationEvent(ApplicationReadyEvent event){
+ Map beans = ApplicationContextHandler.getApplicationContext().getBeansOfType(AbstracTopicConsumer.class);
+ for (Map.Entry entry : beans.entrySet()) {
+ RegistrationHelper.registerConsumer(entry.getValue());
+ }
+ //开启所有的消费者
+ for (MessageListenerContainer container : registry.getListenerContainers()) {
+ container.start();
+ }
+ }
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/RegistrationHelper.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/RegistrationHelper.java
new file mode 100644
index 0000000000000000000000000000000000000000..12d97f43e34132e079f52980299c907ba2db5551
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/RegistrationHelper.java
@@ -0,0 +1,35 @@
+package com.gcc.container.components.kafka.component.consumer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * 消费与观察者注册
+ * @author GCC
+ */
+@Slf4j
+public class RegistrationHelper {
+
+ private final static ConcurrentMap OBSERVER_CONTEXT = new ConcurrentHashMap<>();
+
+
+ public static void registerObserver(AbstractTopicObserver observer){
+ OBSERVER_CONTEXT.put(observer.getTopic(),observer);
+ }
+
+ public static void registerConsumer(AbstracTopicConsumer consumer){
+ List topics = consumer.getTopics();
+ for(String topic:topics) {
+ if (OBSERVER_CONTEXT.containsKey(topic)) {
+ OBSERVER_CONTEXT.get(topic).addConsumer(consumer);
+ } else {
+ log.warn(" consumer of topic ‘{}’ register fail,no this observer",topic);
+ }
+ }
+ }
+
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicConsumer.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..6695a20bc6826bc657b79203834549a32b54ee95
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicConsumer.java
@@ -0,0 +1,18 @@
+package com.gcc.container.components.kafka.component.consumer;
+
+
+/**
+ * 具体的消费者
+ * @author GCC
+ */
+interface TopicConsumer {
+ /**
+ * 响应消息对象
+ * @param messageObj 消息对象
+ */
+ void onMessage(String messageObj);
+
+
+
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicObserver.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicObserver.java
new file mode 100644
index 0000000000000000000000000000000000000000..6f83722fcd1c8a0a92c144f6864cbec5beaa2a21
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/consumer/TopicObserver.java
@@ -0,0 +1,17 @@
+package com.gcc.container.components.kafka.component.consumer;
+
+interface TopicObserver {
+
+ /**
+ * 新增消费者
+ * @param consumer 消费者
+ */
+ void addConsumer(TopicConsumer consumer);
+
+ /**
+ * 通知消费之
+ * @param message 消息
+ */
+ void noticeConsumer(String message);
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/AbstractDataStreamProcessor.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/AbstractDataStreamProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..66a0d30a97404421a983eb6beaecd73620893446
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/AbstractDataStreamProcessor.java
@@ -0,0 +1,29 @@
+package com.gcc.container.components.kafka.component.datastream;
+
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class AbstractDataStreamProcessor implements DataStreamProcessor {
+ private DataStreamProcessor nextProcessor;
+
+ @Override
+ public void setNext(DataStreamProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public void handle(Object data) {
+ AtomicBoolean flag = disposeData(data);
+ if(flag.get() && null != nextProcessor){
+ nextProcessor.handle(data);
+ }
+ }
+
+ /**
+ * 处理数据
+ * @param data 数据
+ * @return AtomicBoolean 如果返回为true,则代表继续向下处理,否则,则终止
+ */
+ abstract AtomicBoolean disposeData(Object data);
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/DataStreamProcessor.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/DataStreamProcessor.java
new file mode 100644
index 0000000000000000000000000000000000000000..22995f61de21de31d9e71137b6d8b8bc59e3fe8a
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/DataStreamProcessor.java
@@ -0,0 +1,9 @@
+package com.gcc.container.components.kafka.component.datastream;
+
+interface DataStreamProcessor {
+
+ void setNext(DataStreamProcessor nextProcessor);
+
+ void handle(Object data);
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/annotation/DataStream.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/annotation/DataStream.java
new file mode 100644
index 0000000000000000000000000000000000000000..7d839170ad7855e023e2573a652d7d99cb079721
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/datastream/annotation/DataStream.java
@@ -0,0 +1,15 @@
+package com.gcc.container.components.kafka.component.datastream.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DataStream {
+
+ String dataStreamName();
+
+ int order() default 0;
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaCallback.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaCallback.java
new file mode 100644
index 0000000000000000000000000000000000000000..319d31e5f415c4cbc77fdc9c932750d634916c44
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaCallback.java
@@ -0,0 +1,16 @@
+package com.gcc.container.components.kafka.component.product;
+
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+public interface KafkaCallback extends ListenableFutureCallback> {
+ @Override
+ default void onSuccess(SendResult result) {
+ //回调处理
+ }
+
+ @Override
+ default void onFailure(Throwable ex) {
+ //错误处理
+ }
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaProduct.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaProduct.java
new file mode 100644
index 0000000000000000000000000000000000000000..b34c5f4c2c856f611d6a560d7b5cb054705888ee
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/KafkaProduct.java
@@ -0,0 +1,42 @@
+package com.gcc.container.components.kafka.component.product;
+
+/**
+ * 生产者工具
+ * @author GCC
+ */
+public interface KafkaProduct {
+
+ /**
+ * 向Topic发送消息
+ * @param topic topic
+ * @param message 消息信息
+ */
+ void sendMessage(final String topic, Object message);
+
+ /**
+ * 向Topic发送消息
+ * @param topic topic
+ * @param key key
+ * @param message 消息
+ */
+ void sendMessage(final String topic, String key, Object message);
+
+ /**
+ * 异步发送消息并回调
+ * @param topic topic
+ * @param message 消息
+ * @param callback 回调函数
+ */
+ void sendAsyncMessage(final String topic, Object message, KafkaCallback callback);
+
+
+ /**
+ *异步发送消息并回调
+ * @param topic topic
+ * @param key key
+ * @param message 消息
+ * @param callback 回调函数
+ */
+ void sendAsyncMessage(String topic, String key, String message, KafkaCallback callback);
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/impl/KafkaProductImpl.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/impl/KafkaProductImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..7d698b25c89439b862e2e8c1b5d3688b0d358301
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/component/product/impl/KafkaProductImpl.java
@@ -0,0 +1,38 @@
+package com.gcc.container.components.kafka.component.product.impl;
+
+import com.gcc.container.components.kafka.component.product.KafkaCallback;
+import com.gcc.container.components.kafka.component.product.KafkaProduct;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+
+@Component
+public class KafkaProductImpl implements KafkaProduct {
+
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
+ @Override
+ public void sendMessage(final String topic, Object message) {
+ kafkaTemplate.send(topic, message);
+ }
+
+ @Override
+ public void sendMessage(final String topic, String key, Object message) {
+ kafkaTemplate.send(topic, key, message);
+ }
+
+ @Override
+ public void sendAsyncMessage(final String topic, Object message, KafkaCallback callback) {
+ ListenableFuture> future = kafkaTemplate.send(topic, message);
+ future.addCallback(callback);
+ }
+
+ @Override
+ public void sendAsyncMessage(String topic, String key, String message, KafkaCallback callback) {
+ ListenableFuture> future = kafkaTemplate.send(topic, key, message);
+ future.addCallback(callback);
+ }
+}
\ No newline at end of file
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/ApplicationContextHandler.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/ApplicationContextHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..f701be157cb96f66f89a4639f0f2a03d7ce3be90
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/ApplicationContextHandler.java
@@ -0,0 +1,31 @@
+package com.gcc.container.components.kafka.configure;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ApplicationContextHandler implements ApplicationContextAware {
+
+ private static ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ ApplicationContextHandler.applicationContext = applicationContext;
+ }
+
+ public static ApplicationContext getApplicationContext() {
+ return applicationContext;
+ }
+
+
+ public static T getBean(Class clazz) {
+ return applicationContext.getBean(clazz);
+ }
+
+ public static T getBean(String name, Class clazz) {
+ return applicationContext.getBean(name, clazz);
+ }
+
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/DataStreamConfigure.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/DataStreamConfigure.java
new file mode 100644
index 0000000000000000000000000000000000000000..f7db332847bbd36bdf5a56c09f04827982b1018a
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/DataStreamConfigure.java
@@ -0,0 +1,92 @@
+package com.gcc.container.components.kafka.configure;
+
+
+import com.gcc.container.components.kafka.component.datastream.AbstractDataStreamProcessor;
+import com.gcc.container.components.kafka.component.datastream.annotation.DataStream;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.util.ReflectionUtils;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+@Configuration
+public class DataStreamConfigure {
+ private final Map> dataStreamChains = new ConcurrentHashMap<>();
+
+ @Autowired
+ public void setDataStreamProcessors(Map processors) {
+ processors.forEach((beanName, processor) -> {
+ DataStream annotation = processor.getClass().getAnnotation(DataStream.class);
+ if (annotation != null) {
+ String dataStreamName = annotation.dataStreamName();
+ int order = annotation.order();
+ dataStreamChains.computeIfAbsent(dataStreamName, k -> new ArrayList<>()).add(processor);
+ }
+ });
+
+ dataStreamChains.forEach((dataStreamName, processorsList) -> {
+ Collections.sort(processorsList, (p1, p2) -> {
+ DataStream a1 = p1.getClass().getAnnotation(DataStream.class);
+ DataStream a2 = p2.getClass().getAnnotation(DataStream.class);
+ return Integer.compare(a1.order(), a2.order());
+ });
+
+ // 构建责任链
+ AbstractDataStreamProcessor current = null;
+ for (AbstractDataStreamProcessor processor : processorsList) {
+ if (current == null) {
+ current = processor;
+ } else {
+ current.setNext(processor);
+ current = processor;
+ }
+ }
+ });
+ }
+
+ @Bean
+ public BeanPostProcessor beanPostProcessor() {
+ return new BeanPostProcessor() {
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ if (bean instanceof AbstractDataStreamProcessor) {
+ Field field = ReflectionUtils.findField(bean.getClass(), "nextProcessor");
+ if (field != null) {
+ ReflectionUtils.makeAccessible(field);
+ ReflectionUtils.setField(field, bean, getNextHandler((AbstractDataStreamProcessor) bean));
+ }
+ }
+ return bean;
+ }
+
+ private AbstractDataStreamProcessor getNextHandler(AbstractDataStreamProcessor processor) {
+ DataStream annotation = processor.getClass().getAnnotation(DataStream.class);
+ if (annotation != null) {
+ String dataStreamName = annotation.dataStreamName();
+ List processorsList = dataStreamChains.get(dataStreamName);
+ if (processorsList != null) {
+ int currentIndex = processorsList.indexOf(processor);
+ if (currentIndex < processorsList.size() - 1) {
+ return processorsList.get(currentIndex + 1);
+ }
+ }
+ }
+ return null;
+ }
+ };
+ }
+
+ @Bean
+ public Map dataStreamProcessorMap() {
+ return dataStreamChains.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0)));
+ }
+}
diff --git a/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/KafkaConfigure.java b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/KafkaConfigure.java
new file mode 100644
index 0000000000000000000000000000000000000000..e17e6958f910015831b586511d6000b8c1d321d5
--- /dev/null
+++ b/kafka-component/src/main/java/com/gcc/container/components/kafka/configure/KafkaConfigure.java
@@ -0,0 +1,76 @@
+package com.gcc.container.components.kafka.configure;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.core.*;
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableKafka
+public class KafkaConfigure {
+
+ @Value("${spring.kafka.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Value("${spring.kafka.consumer.group-id}")
+ private String groupId;
+
+ @Value("${spring.kafka.consumer.auto-offset-reset}")
+ private String autoOffsetReset;
+
+ @Value("${spring.kafka.consumer.enable-auto-commit}")
+ private boolean enableAutoCommit;
+
+ @Value("${spring.kafka.producer.key-serializer}")
+ private String keySerializer;
+
+ @Value("${spring.kafka.producer.value-serializer}")
+ private String valueSerializer;
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+ return new DefaultKafkaProducerFactory<>(configProps);
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ @Bean
+ public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
+ return new KafkaListenerEndpointRegistry();
+ }
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ return factory;
+ }
+
+
+}
diff --git a/kafka-component/src/main/resources/META-INF/spring.factories b/kafka-component/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000000000000000000000000000000000000..53cd526ed6753baef3931c08783936a3f10c9937
--- /dev/null
+++ b/kafka-component/src/main/resources/META-INF/spring.factories
@@ -0,0 +1 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.gcc.container.components.kafka.ApplicationConfiguration
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 5b300adc2c06f7d92b9c7fbe974a56f5e3b498c8..65ce0e6dc3526b63ed0c15be7730dde2699c5f90 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,6 +22,8 @@
initdatabse-component
es-component
task-component
+ kafka-component
+ quickfilter-component
diff --git a/quickfilter-component/pom.xml b/quickfilter-component/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..f033f94da34500ccac1b4a3df083a484268f0ba8
--- /dev/null
+++ b/quickfilter-component/pom.xml
@@ -0,0 +1,24 @@
+
+ 4.0.0
+
+ com.gcc.container
+ components
+ 1.0.0
+
+ com.gcc.container.components
+ quickfilter-component
+ ${component.version}
+ jar
+
+
+
+
+ com.google.guava
+ guava
+ 33.2.1-jre
+
+
+
+
+
diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/ApplicationConfiguration.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/ApplicationConfiguration.java
new file mode 100644
index 0000000000000000000000000000000000000000..f77f529e2755cd873d6e379a54b39806b9ef7982
--- /dev/null
+++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/ApplicationConfiguration.java
@@ -0,0 +1,11 @@
+package com.gcc.container.components.quickfilter;
+
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@Configuration
+@EnableScheduling
+@ComponentScan(basePackages = "com.gcc.container.components.quickfilter.*")
+public class ApplicationConfiguration {
+}
diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/QuickFilterFactory.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/QuickFilterFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..475931898a143e4ea262e3adb93490dd81facf8b
--- /dev/null
+++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/QuickFilterFactory.java
@@ -0,0 +1,12 @@
+package com.gcc.container.components.quickfilter.factory;
+
+import com.gcc.container.components.quickfilter.filter.QuickFilter;
+
+/**
+ * 快速过滤器工厂
+ */
+public interface QuickFilterFactory {
+ QuickFilter getQuickFilter(String filterName, Integer capacity, Double missProbability);
+
+ void destoryQuickFilter(String filterName);
+}
diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/AbstractQuickFilterFactory.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/AbstractQuickFilterFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..ae89385ca9d6df7c4ddab948865de93c631194fe
--- /dev/null
+++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/AbstractQuickFilterFactory.java
@@ -0,0 +1,31 @@
+package com.gcc.container.components.quickfilter.factory.impl;
+
+
+import com.gcc.container.components.quickfilter.factory.QuickFilterFactory;
+import com.gcc.container.components.quickfilter.filter.QuickFilter;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AbstractQuickFilterFactory implements QuickFilterFactory {
+
+ protected final ConcurrentHashMap bloomFilters = new ConcurrentHashMap<>();
+
+ abstract QuickFilter createBloomFilter(Integer expectedInsertions, Double fpp);
+
+ @Override
+ public QuickFilter getQuickFilter(String filterName, Integer capacity, Double missProbability) {
+ if(!bloomFilters.containsKey(filterName)){
+ QuickFilter obj = createBloomFilter(capacity,missProbability);
+ bloomFilters.put(filterName,obj);
+ }
+ return bloomFilters.get(filterName);
+ }
+
+ @Override
+ public void destoryQuickFilter(String filterName) {
+ if(bloomFilters.containsKey(filterName)){
+ bloomFilters.get(filterName).clear();
+ bloomFilters.remove(filterName);
+ }
+ }
+}
diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/BloomFilterFactory.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/BloomFilterFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..0bb4d160e6f8b5cd8b8a04ebb8ce07e177efbc80
--- /dev/null
+++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/BloomFilterFactory.java
@@ -0,0 +1,27 @@
+package com.gcc.container.components.quickfilter.factory.impl;
+
+
+import com.gcc.container.components.quickfilter.filter.QuickFilter;
+import com.gcc.container.components.quickfilter.filter.impl.BloomFilterPlus;
+import com.google.common.base.Charsets;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.PrimitiveSink;
+import org.springframework.stereotype.Component;
+
+
+@Component("BloomFilter")
+public class BloomFilterFactory extends AbstractQuickFilterFactory {
+
+ @Override
+ public QuickFilter createBloomFilter(Integer expectedInsertions, Double fpp) {
+ Funnel funnel = new Funnel() {
+ @Override
+ public void funnel(String from, PrimitiveSink into) {
+ into.putString(from, Charsets.UTF_8);
+ }
+ };
+ return new BloomFilterPlus(BloomFilter.create(funnel, expectedInsertions, fpp));
+ }
+
+}
diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/DailyBloomFilterFactory.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/DailyBloomFilterFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..e28bd9aca51580ec7a5bd75eb2ffb320355e1967
--- /dev/null
+++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/factory/impl/DailyBloomFilterFactory.java
@@ -0,0 +1,21 @@
+package com.gcc.container.components.quickfilter.factory.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Component("DailyBloomFilter")
+@Slf4j
+public class DailyBloomFilterFactory extends BloomFilterFactory{
+ @Scheduled(cron = "59 59 23 * * ?" )
+ public void resetBloomFilter() {
+ if(!bloomFilters.isEmpty()) {
+ for (String name : bloomFilters.keySet()) {
+ bloomFilters.remove(name);
+ }
+ if (bloomFilters.size() == 0) {
+ log.info("所有Bloom已经结算");
+ }
+ }
+ }
+}
diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/QuickFilter.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/QuickFilter.java
new file mode 100644
index 0000000000000000000000000000000000000000..b755fcd0be40ecd6b605769d7ba2c9e5b9508e13
--- /dev/null
+++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/QuickFilter.java
@@ -0,0 +1,40 @@
+package com.gcc.container.components.quickfilter.filter;
+
+/**
+ * 快速过滤器
+ */
+public interface QuickFilter {
+
+ /**
+ * 是否在其中
+ * @param keyword 关键字
+ * @return Boolean
+ */
+ Boolean containsKeyword(String keyword);
+
+ /**
+ * 追加关键字
+ * @param keyword 关键字
+ */
+ void append(String keyword);
+
+ /**
+ * 追加内容
+ * @param keyword 关键字
+ * @param content 内容
+ */
+ void append(String keyword,Object content);
+
+ /**
+ * 获取内容
+ * @param keyword 关键字
+ * @return String
+ */
+ String getContent(String keyword);
+
+ /**
+ * 清空释放
+ */
+ void clear();
+
+}
diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/impl/BloomFilterPlus.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/impl/BloomFilterPlus.java
new file mode 100644
index 0000000000000000000000000000000000000000..dc0f5467aa85787df5f5773a0d170d1c6a3c155a
--- /dev/null
+++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/filter/impl/BloomFilterPlus.java
@@ -0,0 +1,63 @@
+package com.gcc.container.components.quickfilter.filter.impl;
+
+import com.gcc.container.components.quickfilter.util.StringZipUtil;
+import com.google.common.hash.BloomFilter;
+import com.gcc.container.components.quickfilter.filter.QuickFilter;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 布隆快速过滤器
+ */
+public class BloomFilterPlus implements QuickFilter {
+
+ private final ConcurrentHashMap innerCache = new ConcurrentHashMap<>();
+
+ private BloomFilter bloomFilter;
+
+ public BloomFilterPlus(BloomFilter bloomFilter) {
+ this.bloomFilter = bloomFilter;
+ }
+
+ @Override
+ public Boolean containsKeyword(String keyword) {
+ return bloomFilter.mightContain(keyword);
+ }
+
+ @Override
+ public void append(String keyword) {
+ bloomFilter.put(keyword);
+ }
+
+ @Override
+ public void append(String keyword, Object content) {
+ bloomFilter.put(keyword);
+ try {
+ String context = content.toString();
+ innerCache.put(keyword,StringZipUtil.compressString(context));
+ }catch (IOException e){
+ e.printStackTrace();
+ }
+ }
+
+
+ @Override
+ public String getContent(String keyword) {
+ byte[] value = innerCache.get(keyword);
+ if(null != value) {
+ try {
+ return StringZipUtil.decompressString(value);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void clear() {
+ innerCache.clear();
+ this.bloomFilter = null;
+ }
+}
diff --git a/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/util/StringZipUtil.java b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/util/StringZipUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..d48590ea723d056463d24075c646cc985ced4f82
--- /dev/null
+++ b/quickfilter-component/src/main/java/com/gcc/container/components/quickfilter/util/StringZipUtil.java
@@ -0,0 +1,114 @@
+package com.gcc.container.components.quickfilter.util;
+
+import cn.hutool.json.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Base64;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+@Slf4j
+public class StringZipUtil {
+
+ public static byte[] compressString(String input) throws IOException {
+ // 将输入字符串转化为字节数组
+ byte[] inputBytes = input.getBytes("UTF-8");
+
+ // 创建Deflater对象并设置压缩级别
+ Deflater deflater = new Deflater();
+ deflater.setInput(inputBytes);
+ deflater.finish();
+
+ // 使用ByteArrayOutputStream捕获输出
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(inputBytes.length);
+ byte[] buffer = new byte[1024];
+ while (!deflater.finished()) {
+ int count = deflater.deflate(buffer); // 压缩
+ outputStream.write(buffer, 0, count);
+ }
+ outputStream.close();
+ byte[] output = outputStream.toByteArray();
+ deflater.end();
+ return output;
+ }
+
+ public static String decompressString(byte[] compressedData) throws IOException, DataFormatException {
+ Inflater inflater = new Inflater();
+ inflater.setInput(compressedData);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(compressedData.length);
+ byte[] buffer = new byte[1024];
+ while (!inflater.finished()) {
+ int count = inflater.inflate(buffer);
+ outputStream.write(buffer, 0, count);
+ }
+ outputStream.close();
+ byte[] result = outputStream.toByteArray();
+ inflater.end();
+ return new String(result, "UTF-8");
+ }
+
+ // 解压缩方法
+ public static String decompressString(String compressedData) throws IOException, DataFormatException {
+ // 将Base64编码的字符串转换回字节数组
+ byte[] compressedBytes = Base64.getDecoder().decode(compressedData);
+
+ Inflater inflater = new Inflater();
+ inflater.setInput(compressedBytes);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(compressedBytes.length);
+ byte[] buffer = new byte[1024];
+ while (!inflater.finished()) {
+ int count = inflater.inflate(buffer);
+ outputStream.write(buffer, 0, count);
+ }
+ outputStream.close();
+ byte[] result = outputStream.toByteArray();
+ inflater.end();
+
+ return new String(result, "UTF-8");
+ }
+
+ // 辅助函数:将字节数组转换为十六进制字符串表示
+ public static String bytesToHex(byte[] bytes) {
+ StringBuilder hexString = new StringBuilder(2 * bytes.length);
+ for (byte b : bytes) {
+ String hex = Integer.toHexString(0xff & b);
+ if (hex.length() == 1) {
+ hexString.append('0');
+ }
+ hexString.append(hex);
+ }
+ return hexString.toString();
+ }
+
+ public static void main(String[] args){
+
+ JSONObject one = new JSONObject()
+ .set("name","张三")
+ .set("age",12)
+ .set("opNumber",1)
+ .set("classNem","中华人民共和国-山东省-济南市-历下区-黄铜四路与凤鸣路1123号-幸福小学7年纪12班")
+ .set("userId","DSFKJFKLDJSDKLFklwe-23msndf-3242341223");
+ log.info("压缩前长度为:{},",one.toJSONString(0).length());
+ byte[] zipStr = null;
+ try {
+ zipStr = compressString(one.toString());
+ log.info("{}",zipStr);
+ log.info("压缩后长度为:{}",zipStr.length);
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+
+ try {
+ String con = decompressString(zipStr);
+ log.info("解压缩为:{}",con);
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+
+ }
+
+}
diff --git a/quickfilter-component/src/main/resources/META-INF/spring.factories b/quickfilter-component/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000000000000000000000000000000000000..4304a0efd2c0da73fc7463e181bab3b500f61070
--- /dev/null
+++ b/quickfilter-component/src/main/resources/META-INF/spring.factories
@@ -0,0 +1 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.gcc.container.components.quickfilter.ApplicationConfiguration
\ No newline at end of file
diff --git a/redis-component/src/main/java/com/gcc/container/components/redis/server/RedisCacheServer.java b/redis-component/src/main/java/com/gcc/container/components/redis/server/RedisCacheServer.java
index 102eb62cf57170a79a07c9a67f47ddac3cf5d148..12b6836ef6c41a4c9d82267b73fca0d1192ce71a 100644
--- a/redis-component/src/main/java/com/gcc/container/components/redis/server/RedisCacheServer.java
+++ b/redis-component/src/main/java/com/gcc/container/components/redis/server/RedisCacheServer.java
@@ -44,12 +44,37 @@ public interface RedisCacheServer {
*/
void setHash(final String key, final Map map);
+
+ /**
+ * 追加哈希值
+ * @param key key
+ * @param hashKey 哈希键
+ * @param hashValue 哈希值
+ * @param 值
+ */
+ void appendHash(final String key,final String hashKey,T hashValue);
+
/**
* 删除哈希表缓存
* @param key 键
*/
void removeHash(final String key);
+ /**
+ * 删除哈希单个值
+ * @param key key
+ * @param hashKey 键
+ */
+ void removeHash(final String key,String ...hashKey);
+
+ /**
+ * 刷新缓存
+ * @param key key
+ * @param newMap 新map
+ * @param 泛型
+ */
+ void refreshHash(final String key,final Map newMap);
+
/**
* 读取哈希表
* @param key 键
@@ -67,18 +92,34 @@ public interface RedisCacheServer {
/**
- * 向列表中添加元素
+ * 创建列表缓存
* @param key 键
* @param values 值
*/
void addList(final String key, final List values);
+ /**
+ * 单个值追加
+ * @param key key
+ * @param values 值
+ * @param 泛型
+ */
+ void addList(final String key,final T ...values);
+
/**
* 删除列表缓存
* @param key 键
*/
void removeList(final String key);
+ /**
+ * 刷新列表缓存
+ * @param key 键
+ * @param newValues 新值
+ * @param 泛型
+ */
+ void refreshList(final String key,List newValues);
+
/**
* 获取全部数据
@@ -106,12 +147,30 @@ public interface RedisCacheServer {
*/
void addSet(final String key, final Set members);
+
+ /**
+ * 向集合中添加成员
+ * @param key 键
+ * @param members 成员
+ */
+ void addSet(final String key, final T ...members);
+
/**
* 删除集合缓存
* @param key 键
*/
void removeSet(final String key);
+
+ /**
+ * 刷新缓存
+ * @param key 键
+ * @param newMembers 新值
+ * @param T
+ */
+ void refreshSet(final String key,final Set newMembers);
+
+
/**
* 读取集合中的所有成员
* @param key 键
diff --git a/redis-component/src/main/java/com/gcc/container/components/redis/server/impl/RedisCacheServerImpl.java b/redis-component/src/main/java/com/gcc/container/components/redis/server/impl/RedisCacheServerImpl.java
index 7648bbaa3f1fd3978821d8a03ad3172d4b1fca31..ac8672315b17995b20ac111e25efb219fe4d9e5a 100644
--- a/redis-component/src/main/java/com/gcc/container/components/redis/server/impl/RedisCacheServerImpl.java
+++ b/redis-component/src/main/java/com/gcc/container/components/redis/server/impl/RedisCacheServerImpl.java
@@ -49,11 +49,28 @@ public class RedisCacheServerImpl implements RedisCacheServer {
redisTemplate.opsForHash().putAll(key, map);
}
+
+ @Override
+ public void appendHash(String key, String hashKey, T hashValue) {
+ redisTemplate.opsForHash().put(key,hashKey,hashValue);
+ }
+
@Override
public void removeHash(final String key) {
redisTemplate.delete(key);
}
+ @Override
+ public void removeHash(String key, String ...hashKey) {
+ redisTemplate.opsForHash().delete(key,hashKey);
+ }
+
+ @Override
+ public void refreshHash(String key, Map newMap) {
+ redisTemplate.delete(key);
+ redisTemplate.opsForHash().putAll(key, newMap);
+ }
+
@Override
public Map getHash(final String key) {
Map