欢迎大家来到IT世界,在知识的湖畔探索吧!
说明:这里主要是安装和基本的使用。Debezinum 也可以用阿里的Canal 来替代。如果您觉得不错或对您有帮助,轻轻的动一下您的手指留言评论或点个赞,谢谢大家
下载安装kafka
- 下载地址:http://archive.apache.org/dist/kafka/2.4.0/kafka_2.11-2.4.0.tgz
- 解压: tar -zxvf /apps/kafka_2.11-2.4.0.tgz
- 编辑:server.properties.(我这里zk,kafka分用了三台机器)
broker.id=3 port=9094 listeners=PLAINTEXT://flink-master:9094 advertised.listeners=PLAINTEXT://flink-master:9094 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes= socket.receive.buffer.bytes= socket.request.max.bytes= log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes= log.retention.check.interval.ms= zookeeper.connect=flink-master:2181,flink-slave1:2182,flink-slave2:2183 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
欢迎大家来到IT世界,在知识的湖畔探索吧!
下载安装Debezium
- 下载地址:https://debezium.io/releases/1.2/
- 解压到kafka的plugins目录: tar -zxvf debezium-connector-mysql-1.2.5.Final-plugin.tar.gz -C /usr/local/share/kafka/plugins/debezium
- 解压后如图:
欢迎大家来到IT世界,在知识的湖畔探索吧!
编辑kafka 的connet-distributed.properties
- 只需要修改两个地方
- bootstrap.servers=flink-master:9094
- plugin.path=plugin.path=/usr/local/share/kafka/plugins/debezium
创建mysql.properties
1.创建
欢迎大家来到IT世界,在知识的湖畔探索吧!name=inventory-connector connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=mysql-hostname database.port=3306 database.user=root database.password= database.server.id= database.server.name=flink-slave2 database.whitelist=cdc database.history.kafka.bootstrap.servers=flink-master:9094 database.history.kafka.topic=dbhistory.flink-slave2 include.schema.changes=true
2.debezium属性说明
- 地址:https://debezium.io/documentation/reference/0.10/connectors/mysql.html
启动kafka 以及 kafka connect
1.启动kafka:kafka-server-start.sh server.properties
2.启动connect:connect-distributed.sh
connect-distributed.properties mysql.properties
3.启动成功后如图:会出现connect-configs,connect-offsets,connect-status,flink-slave2.cdc.customers,flink-slave2.cdc.order,flink-slave2.cdc.products
4.customers,order,product 分别对应数据库中的表,如果没有可以手动执行connect api
官网地址:
https://kafka.apache.org/24/documentation.html#connect
启动Flink 集群并运行Flink sql
1.启动集群:start-cluster.sh
2.运行Flink sql:sql-client.sh embedded
3.运行结果:
4.查看mysql 中的表
order表数据:
5.修改order 表中的数据,并查看flink sql 中的显示
到这里我们的Flink+Debezium+Kafka 就完成了。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/114141.html