Redis应用—7.大Value处理方案

Redis应用—7.大Value处理方案大纲 1 案设计 2 安装与配置环境 1 案设计步骤一 首先需要配置一个 crontab 定时调度 shell 脚本 然后该脚本每天凌晨会通过 rdbtools 具解析 Redis 的 RDB 件 接着对解析出的内容进行过滤 把 RDB 件中的 key 导出到 C

欢迎大家来到IT世界,在知识的湖畔探索吧!

大纲

1.⽅案设计

2.安装与配置环境

1.⽅案设计

Redis应用—7.大Value处理方案

步骤一:首先需要配置一个crontab定时调度shell脚本,然后该脚本每天凌晨会通过rdbtools⼯具解析Redis的RDB⽂件,接着对解析出的内容进行过滤,把RDB⽂件中的⼤key导出到CSV⽂件

步骤二:使⽤SQL导⼊CSV⽂件到MySQL数据库中,同时使⽤Canal监听MySQL的binlog⽇志。

步骤三:Canal会发送增量的大key数据消息到RocketMQ,RocketMQ的消费者系统会对增量的大key数据消息进⾏消费,消息中便会包含⼤key的详情信息。这样消费者就可以将⼤key的信息通过邮件等⽅式,通知开发⼈员

为什么要把⼤key的CSV⽂件导⼊到MySQL存储?为什么不直接监听⼤key的CSV⽂件进⾏通知?

原因一:如果不导⼊MySQL,那么就⽆法使⽤Canal来监听。这样就要开发⼀个程序,定时去扫描Redis节点下解析出来的CSV⽂件。如果Redis集群中有多个节点,那么每⼀个节点都要去扫描。⽽将CSV导⼊到MySQL后,只需要使⽤Canal去监听MySQL表的binlog,就可以把增量数据同步到RocketMQ中,由消费者统⼀进⾏处理。

原因二:解析CSV⽂件⽐直接从MySQL中查询复杂很多,尤其是需要进行信息过滤。导⼊到MySQL后可以通过SQL轻松的对⼤key的记录进⾏条件筛选,并且可以对每天产⽣的⼤key数据进⾏存储分析。

RDB解析⽣成的CSV⽂件结构如下:

database,type,key,size_in_bytes,encoding,num_elements,len_largest_element, expiry 0,string,key1-string,20536,string,17280,17280, 0,list,key1-list,4006,quicklist,24,1530,

欢迎大家来到IT世界,在知识的湖畔探索吧!

2.安装与配置环境

(1)依赖环境

(2)安装Python3 & pip3

(3)安装rdb-tools

(4)安装RocketMQ

(5)安装Canal

(6)rdbtools扫描RDB⽂件

(7)将CSV⽂件导⼊MySQL

(1)依赖环境

Python3、pip3、rdb-tools、Redis、MySQL、JDK、RocketMQ、Canal。

rdb-tools是开源的⼀个python项⽬,它可以⽤来解析Redis的RDB⽂件,但是要先安装Python环境。

欢迎大家来到IT世界,在知识的湖畔探索吧!连接地址: https://github.com/sripathikrishnan/redis-rdb-tools

pip是Python的包管理⼯具,安装Python后,这个⼯具就会配套安装好。

(2)安装Python3 & pip3

# 安装编译⼯具 $ yum -y groupinstall "Development tools" $ yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel $ yum install libffi-devel -y # 下载python3.7.0 $ cd /usr/local $ wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tar.xz $ tar -xvJf Python-3.7.0.tar.xz # 编译 $ mkdir /usr/local/python3 $ cd Python-3.7.0 $ ./configure --prefix=/usr/local/python3 $ make && make install

(3)安装rdb-tools

欢迎大家来到IT世界,在知识的湖畔探索吧!# 使⽤pip包管理程序安装rdb-tools $ pip3 install rdbtools python-lzf # 配置环境变量 $ vim /etc/profile # 在⽂件底部最末尾,追加如下两⾏内容 PATH=/usr/local/python3/bin:$PATH export PATH

验证安装:

# 验证rdbtools是否安装成功 $ rdb -h

(4)安装RocketMQ

一.下载安装包

# 下载安装包,要注意版本与项⽬中依赖的RocketMQ版本兼容 $ wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

二.修改默认配置

# 解压 $ unzip rocketmq-all-4.7.1-bin-release.zip # 切换⽬录 $ cd /usr/local/rocketmq-all-4.7.1-bin-release # 修改nameserver默认堆栈⼤⼩ $ vim ./bin/runserver.sh # 修改brokerserver默认堆栈⼤⼩ $ vim bin/runbroker.sh

NameServer默认配置如下:

 -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspacesSize=128m -XX:MaxMetaspaceSize=320m

BrokerServer默认配置如下:

 -server -Xms8g -Xmx8g -Xmn4g

三.修改BrokerServer的IP地址

$ vim /usr/local/rocketmq-all-4.7.1-bin-release/conf/broker.conf

在broker.conf⽂件中追加如下内容:

# brokerserver所在机器的公⽹IP地址 brokerIP1=192.168.95.129

四.启动RocketMQ

# 启动nameserver $ nohup sh ./bin/mqnamesrv & # 查看nameserver启动⽇志 $ tailf ~/logs/rocketmqlogs/namesrv.log # 启动brokerserver $ nohup sh bin/mqbroker -n 127.0.0.1:9876 -c conf/broker.conf & # 查看brokerserver启动⽇志 $ tailf ~/logs/rocketmqlogs/broker.log

五.启动RocketMQ控制台

github地址:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0

可以从github上clone下载,然后使⽤maven命令打包,然后如下启动:

$ nohup java -jar -server -Xms256m -Xmx256m \ -Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Dserver.port=8080 \ /usr/local/rocketmq-console-ng-1.0.1.jar &

(5)安装Canal

一.下载安装包

# 下载canal-admin $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz # 下载canal-deployer $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

二.安装canal-admin

# 创建解压⽬录 $ mkdir /usr/local/canal-admin # 解压 $ tar -zxvf canal.admin-1.1.5.tar.gz -c /usr/local/canal-admin

目录结构如下:

Redis应用—7.大Value处理方案

三.初始化Canal数据库

执⾏conf⽬录下的canal_manager.sql⽂件

# 执⾏conf⽬录下的canal_manager.sql⽂件 $ cd /usr/local/canal-admin/conf $ mysql -u⽤户名 -p密码 -hIP地址 -P端⼝号 < canal_manager.sql

canal_manger.sql⽂件内容如下:

CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */; USE `canal_manager`; SET NAMES utf8; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for canal_adapter_config -- ---------------------------- DROP TABLE IF EXISTS `canal_adapter_config`; CREATE TABLE `canal_adapter_config` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `category` varchar(45) NOT NULL, `name` varchar(45) NOT NULL, `status` varchar(45) DEFAULT NULL, `content` text NOT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_cluster -- ---------------------------- DROP TABLE IF EXISTS `canal_cluster`; CREATE TABLE `canal_cluster` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(63) NOT NULL, `zk_hosts` varchar(255) NOT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_config -- ---------------------------- DROP TABLE IF EXISTS `canal_config`; CREATE TABLE `canal_config` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `cluster_id` bigint(20) DEFAULT NULL, `server_id` bigint(20) DEFAULT NULL, `name` varchar(45) NOT NULL, `status` varchar(45) DEFAULT NULL, `content` text NOT NULL, `content_md5` varchar(128) NOT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `sid_UNIQUE` (`server_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_instance_config -- ---------------------------- DROP TABLE IF EXISTS `canal_instance_config`; CREATE TABLE `canal_instance_config` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `cluster_id` bigint(20) DEFAULT NULL, `server_id` bigint(20) DEFAULT NULL, `name` varchar(45) NOT NULL, `status` varchar(45) DEFAULT NULL, `content` text NOT NULL, `content_md5` varchar(128) DEFAULT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `name_UNIQUE` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_node_server -- ---------------------------- DROP TABLE IF EXISTS `canal_node_server`; CREATE TABLE `canal_node_server` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `cluster_id` bigint(20) DEFAULT NULL, `name` varchar(63) NOT NULL, `ip` varchar(63) NOT NULL, `admin_port` int(11) DEFAULT NULL, `tcp_port` int(11) DEFAULT NULL, `metric_port` int(11) DEFAULT NULL, `status` varchar(45) DEFAULT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_user -- ---------------------------- DROP TABLE IF EXISTS `canal_user`; CREATE TABLE `canal_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `username` varchar(31) NOT NULL, `password` varchar(128) NOT NULL, `name` varchar(31) NOT NULL, `roles` varchar(31) NOT NULL, `introduction` varchar(255) DEFAULT NULL, `avatar` varchar(255) DEFAULT NULL, `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; SET FOREIGN_KEY_CHECKS = 1; -- ---------------------------- -- Records of canal_user -- ---------------------------- BEGIN; INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EBEE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28'); COMMIT; SET FOREIGN_KEY_CHECKS = 1; 

四.修改conf⽬录下的application.yml⽂件

# 修改conf⽬录下的application.yml⽂件 $ vim /usr/local/canal-admin/conf/application.yml

application.yml⽂件内容如下:

server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin

五.启动canal-admin

$ cd /usr/local/canal-admin $ bin/startup.sh

然后访问Canal管理控制台:http://ip:8089,⽤户名和密码分别是:admin | 。

此时登录进⼊后,会发现⽬前什么数据都没有。但这没有关系,接着会启动canal-server,因为要⽤canal-admin来管理每个canal-server的实例

采⽤canal-admin来管理canal-server:当canal-server启动时,canal-server是会⾃动注册到canal-admin上的

六.关闭canal-admin

$ cd /usr/local/canal.admin-1.1.5 $ bin/stop.sh

注意:关闭时,请不要使⽤kill进程号的⽅式来关闭,而使⽤执⾏脚本的⽅式关闭。因为如果kill进程后,下次再次执⾏启动脚本时,会出现found admin.pid , Please run stop.sh first ,then startup.sh的提示。当然,出现这种情况的时候,可以到bin⽬录下将admin.pid删除掉。

七.安装canal-server

# 创建解压⽬录 $ mkdir /usr/local/canal-server # 解压 $ tar -zxvf canal.deployer-1.1.5.tar.gz -c /usr/local/canal-server

目录结构如下:

Redis应用—7.大Value处理方案

八.修改conf⽬录下的canal_local.properties

# 修改conf⽬录下的canal_local.properties⽂件 $ vim /usr/local/canal-server/conf/canal_local.properties

canal_local.properties⽂件内容如下:

# register ip 这⾥的ip选择您本机的ip(也就是启动canal-server机器的所在ip地址) canal.register.ip = 192.168.95.129 # canal admin config 这⾥是部署canal-admin的所在机器的ip,当然也可以把canal-admin和canal-server部署到⼀台机器 canal.admin.manager = 192.168.95.129:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 6BB4837EBEE4568DDA7DC67ED2CA2AD9 # admin auto register 这⾥⼀定得是true,否则⽆法在启动canal-server时候注册到canal-admin上 canal.admin.register.auto = true canal.admin.register.cluster = # canal-server注册到canal-admin控制台的名称,这⾥⽤canal-server的ip地址,注意改ip canal.admin.register.name = 192.168.95.129

九.启动canal-server

$ cd /usr/local/canal-server # 切记后⾯加⼊local的参数 $ bin/startup.sh local

这时可以到canal-admin的界面中查看canal-server是否已经注册成功。

十.在canal-admin中配置Canal Instance来监听数据库

⾸先点击界面右侧的Instance管理,再点击新建Instance来创建实例。

 mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info # 需要修改 # 需要监听数据库的ip加端⼝ canal.instance.master.address=192.168.95.129:3306 # mysql主库连接时起始的binlog⽂件,这⾥可以不写,默认为mysql-bin canal.instance.master.journal.name= # binlog⽇志的位置 canal.instance.master.position= # 开始同步binlog⽇志的时间戳,也就是从哪个时间点开始同步binlog⽇志,13位时间戳格式 canal.instance.master.timestamp= # 如果数据库开启了gtid模式,这⾥填写master节点的gtid我们这⾥不写也是可以的,如果要开启,记得将canal.instance.gtidon改为true canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # 需要修改 # username/password这⾥填写要连接数据库的⽤户名和密码 canal.instance.dbUsername=root # 需要修改 # canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # 需要修改 # table regex,这⾥填写订阅的数据库的库与表的相关正则表达式 canal.instance.filter.regex=careerplan_eshop_redis.redis_large_key_log # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # 需要修改 # MQ Config,这⾥填写实例名称即可 # 如果canal.serverMode选择的不是tcp模式,这⾥填写相关的topic的名称,kafka和rocketmq默认的主题为example # 同时需要注意,如果期望使⽤的canal-server的⼯作模式是MQ的⽅式来运⾏,那么需要修改canal.properties的配置 canal.mq.topic=binlog_monitor_large_key_topic # dynamic topic route by schema or table regex canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* 

接着点击保存,就会跳转到列表页,然后在新创建的canal-instance后⾯点击启动,接着点击操作⽇志去查看相关⽇志。⾄此,⼀个canal-instance就启动成功了。

十一.关闭canal-server

$ cd /usr/local/canal-server $ bin/stop.sh

(6)rdbtools扫描RDB⽂件

# 切换⽬录 [root@localhost bin]# cd /usr/local/redis/bin # 在这个⽬录下存放着Redis的rdb⽂件 [root@localhost bin]# ls dump.rdb redis-benchmark redis-check-aof redis-check-rdb redis-cli redis-sentinel redis-server # 使⽤rdbtools⼯具过滤dump.rdb⽂件中的⼤key,⽣成dump.csv⽂件 [root@localhost bin]# rdb -c memory dump.rdb --bytes 10240 -f dump.csv # 可以看到⽣成的dump.csv⽂件 [root@localhost bin]# ls dump.csv dump.rdb redis-benchmark redis-check-aof redis-check-rdb redis-cli redis-sentinel redis-server # 查看dump.csv⽂件的内容 [root@localhost bin]# vim dump.csv

rdb参数说明:

rdb -c memory dump.rdb --bytes 10240 -f dump.csv

dump.rdb是指定Redis的rdb⽂件的路径,–bytes 10240表示过滤出key值⼤⼩超过10240B的key,也就是10K。

dump.csv⽂件内容如下:

database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry 0,string,key1-string,20536,string,17280,17280,

(7)将CSV⽂件导⼊MySQL

一.先查看secure_file_priv属性是否开启

secure_file_priv属性指定导⼊⽂件的位置,只有在该属性指定的⽬录下的⽂件才可以导⼊MySQL。

mysql> show variables like '%secure%'; +--------------------------+------------------------------------------------+ | Variable_name | Value | +--------------------------+------------------------------------------------+ | require_secure_transport | OFF | | secure_auth | ON | | secure_file_priv | ... | +--------------------------+------------------------------------------------+

二.修改MySQL配置⽂件⽂件

# 找到mysqld⽂件的位置 [root@localhost bin]# find / -name "mysqld" /run/mysqld /usr/sbin/mysqld # 找到mysql的默认配置⽂件位置 [root@localhost bin]# /usr/sbin/mysqld --verbose --help |grep -A 1 'Default options' Default options are read from the following files in the given order: /etc/my.cnf /etc/mysql/my.cnf /usr/etc/my.cnf ~/.my.cn # 修改mysql的默认配置⽂件 [root@localhost etc]# vim /etc/my.cnf

在[mysqld]模块下,如果存在下列属性就修改,如果不存在就追加:

[mysqld] # 关闭安全⽂件导⼊路径 secure-file-priv="" # 开启binlog log-bin=mysql-bin binlog-format=ROW server_id=1 

三.重启MySQL服务

# 检查mysql服务运⾏状态 $ service mysqld status # 重启服务 $ service mysqld restart

重启服务后重新连接MySQL,查看secure_file_priv和log_bin的值:

mysql> show variables like '%secure%'; +--------------------------+-------+ | Variable_name | Value | +--------------------------+-------+ | require_secure_transport | OFF | | secure_auth | ON | | secure_file_priv | | +--------------------------+-------+ mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+

四.创建redis_large_key_log表

create table redis_large_key_log ( `id` bigint primary key auto_increment, `database` tinyint comment 'Redis数据库索引', `type` varchar(20) comment 'Redis数据类型', `key` varchar(256) comment 'Redis key', `size_in_bytes` int comment 'value对于的bytes', `encoding` varchar(30) comment '编码', `num_elements` int comment '元素数量', `len_largest_element` int comment '元素⻓度', `expiry` varchar(30) comment '过期时间', `create_time` datetime DEFAULT CURRENT_TIMESTAMP, `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

五.编写导⼊csv⽂件的SQL脚本

$ cd /usr/local/redis/bin/ # 创建SQL脚本⽂件 $ touch csv-transfer-db.sql # 编辑SQL脚本 vim csv-transfer-db.sql

csv-transfer-db.sql⽂件内容如下:

# 指定数据库 USE `careerplan_eshop_redis`; load data infile '/usr/local/redis/bin/dump.csv' into table redis_large_key_log fields terminated by',' lines terminated by'\n' ignore 1 lines (`database`,`type`,`key`,size_in_bytes,encoding,num_elements,len_largest_element,expiry);

六.编写定时任务脚本

脚本职能:

调⽤rdbtools⼯具,扫描⼤key,dump出csv⽂件。

调⽤SQL脚本,将csv⽂件导⼊数据库。

$ touch monitor-large-key-to-db.sh $ vim monitor-large-key-to-db.sh

monitor-large-key-to-db.sh⽂件内容如下:

# crontab没有环境变量给你运⾏,所以要在shell开头⼿动添加环境 source /etc/profile . ~/.bash_profile #!/bin/bash echo "开始执⾏monitor-large-key-to-db.sh脚本" >> /usr/local/redis/monitor-large-key-log.txt rdb -c memory /usr/local/redis/bin/dump.rdb --bytes  -f /usr/local/redis/bin/dump.csv echo "扫描redis过滤出⼤key,⼤key数据保存到/usr/local/redis/bin/dump.csv⽂件" >> /usr/local/redis/monitor-large-key-log.txt mysql -u⽤户名 -p密码 -hIP地址 -P端⼝号 < /usr/local/redis/bin/csv-transfer-db.sql echo "csv⽂件数据已导⼊mysql" >> /usr/local/redis/monitor-large-key-log.txt

七.创建调度任务

$ crontab -e # 每天凌晨3点进⾏⼀次调度,将会扫描rdb⽂件,将⼤key存储到MySQL 0 3 * * * sh /usr/local/redis/bin/monitor-large-key-to-db.sh

(8)binlog数据消费者

一.接⼝说明

消费redis_large_key_log表的binlog数据,该数据包含Redis的⼤key信息。

二.代码位置

com.demo.eshop.monitor.mq.consumer.ConsumerBeanConfig#receiveLargeKeyMonitorConsumer

具体实现如下:

@Configuration public class ConsumerBeanConfig { //配置内容对象 @Autowired private RocketMQProperties rocketMQProperties; //Redis大key binlog 消费者 @Bean("cookbookLargeKeyMonitorTopic") public DefaultMQPushConsumer receiveLargeKeyMonitorConsumer(CookbookLargeKeyMonitorListener cookbookLargeKeyMonitorListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.BINLOG_MONITOR_LARGE_KEY_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(RocketMqConstant.BINLOG_MONITOR_LARGE_KEY_TOPIC, "*"); consumer.registerMessageListener(cookbookLargeKeyMonitorListener); consumer.start(); return consumer; } ... }

三.参数说明

CookbookLargeKeyMonitorListener表示针对BINLOG_MONITOR_LARGE_KEY_GROUP的Listener,它会监听Canal推送的BINLOG_MONITOR_LARGE_KEY_TOPIC消息,然后对消息解析,通过邮件、钉钉等推送给开发⼈员。具体实现如下:

@Component public class CookbookLargeKeyMonitorListener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String msg = new String(messageExt.getBody()); // 解析binlog数据模型 BinlogDataDTO binlogData = BinlogUtils.getBinlogData(msg); log.info("消费到binlog消息, binlogData: {}", binlogData); // 推送通知 informByPush(binlogData); } } catch (Exception e) { // 本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //第三方平台推送消息到app private void informByPush(BinlogDataDTO binlogData) { log.info("消息推送中:消息内容:{}", binlogData); } }

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/100415.html

(0)
上一篇 2024年 12月 26日 下午7:45
下一篇 2024年 12月 26日 下午8:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信