一、Kafka 安装

1、上官网下载安装包

安装包文件链接

2、根据文档进行安装

(1)、安装Kafka


# 将下载的安装包移动到安装目录, 解压安装包
tar -xzf kafka_2.13-3.1.0.tgz 
# 进入文件夹
cd kafka_2.13-3.1.0/
# 由于kafka是分布式系统,必须依赖于分布式系统协调服务来各个节点,服务的负载均衡
# 文档提供两种形式来对kafka进行管理

# Kafka with ZooKeeper
# 启动ZooKeeper 服务进程
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动Kafka 服务端进程
bin/kafka-server-start.sh config/server.properties

# Kafka with KRaft
# 需要生成一个唯一的UUID来格式化日志文件夹
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
# 启动Kafka 服务端进程
bin/kafka-server-start.sh config/kraft/server.properties

(2)、创建事件单元

# 首先,创建一个单元存储事件,这里单元名为quickstart-events,使用端口9092
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
# 使用命令查看单元信息
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

(3)、启动生产者进程和消费者进程

# 启动生产者进程
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
# 启动消费者进程
# 消费者进程有 --from-beginning 参数,带上表示从该topic的起始消息进行消费,不带则会从消费者未消费的消息开始消费
bin/kafka-console-consumer.sh --topic quickstart-events --bootstrap-server localhost:9092

在生产者终端输入字符串,消费者终端会进行输出
生产者消费者

二、maxwell 安装

github 下载压缩包,我下载的是1.19.0 版本

# 解压压缩包
tar -xzf maxwell-1.19.0.tar.gz
# 进入文件夹
cd maxwell-1.19.0/
# 启动maxwell,监听mysql binlog 并发送至Kafka 对应的topic
bin/maxwell --user='root' --password='password' --port=3306 --host='localhost' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=quickstart-events &

三、操作mysql对Kafka进行测试

  • 对mysql进行插入和更新操作
mysql> create table test_kafka(id int, message text);
Query OK, 0 rows affected (0.02 sec)

mysql> 11:55:26,464 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[binlog.000407:304182], lastHeartbeat=1690430123169] after applying "create table test_kafka(id int, message text)" to zhidou_cdc, new schema id is 2

mysql> 
mysql> desc test_kafka;
+---------+------+------+-----+---------+-------+
| Field   | Type | Null | Key | Default | Extra |
+---------+------+------+-----+---------+-------+
| id      | int  | YES  |     | NULL    |       |
| message | text | YES  |     | NULL    |       |
+---------+------+------+-----+---------+-------+
2 rows in set (0.00 sec)

mysql> insert into test_kafka values(1, '测试1');
Query OK, 1 row affected (0.00 sec)

mysql> insert into test_kafka values(2, '测试2');
Query OK, 1 row affected (0.00 sec)

mysql> update test_kafka set message='测试3' where id=2;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0
  • Kafka消费者终端会打印json格式的binlog日志
bin/kafka-console-consumer.sh --topic quickstart-events --bootstrap-server localhost:9092
{"database":"test","table":"test_kafka","type":"insert","ts":1690430162,"xid":1517,"commit":true,"data":{"id":1,"message":"测试1"}}
{"database":"test","table":"test_kafka","type":"insert","ts":1690430174,"xid":1524,"commit":true,"data":{"id":2,"message":"测试2"}}
{"database":"test","table":"test_kafka","type":"update","ts":1690430197,"xid":1534,"commit":true,"data":{"id":2,"message":"测试3"},"old":{"message":"测试2"}}
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐