canal-1.1.7 实时同步mysql至kafka配置
Releases · alibaba/canal · GitHub我下载的是最新版1.1.7:canal.deployer-1.1.7.tar.gz此为部署安装包,没有管理界面,可满足基础使用。上传至服务器,在需要安装的路径新建一个canal-1.1.7文件夹,解压至该文件夹:修改文件:重启mysql服务以上表示创建canal用户,密码为canal123,授权所有库的查询、插入、复制slave等所
1. 准备安装包,安装canal
Releases · alibaba/canal · GitHub
我下载的是最新版1.1.7:canal.deployer-1.1.7.tar.gz
此为部署安装包,没有管理界面,可满足基础使用。
上传至服务器,在需要安装的路径新建一个canal-1.1.7文件夹,解压至该文件夹:
tar -zxvf canal.deployer-1.1.7.tar.gz -C /opt/module/cannal-1.1.7
2. 开启mysql binlog,为canal创建用户
2.1 开启mysql binlog
sudo vim /etc/my.cnf
修改文件:
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test
binlog-do-db=test2 # 开启binlog监控test、test2库
重启mysql服务
sudo systemctl restart mysqld
2.2 为canal创建用户并授权
grant all privileges on *.* to 'canal'@'%' IDENTIFIED BY 'canal123';
FLUSH PRIVILEGES ;
以上表示创建canal用户,密码为canal123,授权所有库的查询、插入、复制slave等所有权限,可根据需求自行修改。%表示允许canal用户使用任何远程ip登录(前提是服务器联通)。
2.3 注意事项
如果mysql和canal安装的不在一台服务器,最好事先检查连通性:
telnet hadoop102 3306 # hadoop102为mysql所在服务器ip
若报错:-bash: telnet: 未 找 到 命 令,表示未安装telnet,请安装:
rpm -qa telnet-server
yum install telnet-server
yum install telnet
如图表示能够联通:
3. 配置canal
# 进入安装路径下conf'文件夹
cd /opt/module/cannal-1.1.7/conf
如上图注意两个文件,canal.properties是基本通用配置,example/instance.properties是监控实例的具体配置。
3.1 基本通用配置:canal.properties
vim canal.properties
修改必要配置:
canal.serverMode = kafka # 监控后输出到kafka,默认是tcp
#kafka集群地址
kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
其他配置按需求改即可。
3.2 监控实例配置:example/instance.properties
vim example/instance.properties
修改必要配置:
canal.instance.mysql.slaveId=1111 # canal伪装成mysql slave的id,不重复即可
canal.instance.master.address=hadoop102:3306 # mysql地址
canal.instance.dbUsername=canal # 2.2中mysql为canal创建的账号
canal.instance.dbPassword=canal123 # 2.2中mysql为canal创建的密码
canal.mq.topic=example # kafka的topic名称,默认example,可按需修改
canal.mq.partition=0 # 同步到kafka的某个分区,默认一个,修改成多个时要考虑kafka多分区间数据的顺序性
其他配置按需求改即可。
4. 启动canal
./bin/startup.sh
查看启动情况:jps
5. 查看同步情况
5.1 启动zookeeper、kafka集群
准备一个kafka消费者:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic example
5.2 mysql中test数据库中操作:
use test;
#################### DDL #####################
# 1. 建表
CREATE TABLE `t1` (
`object_id` varchar(50) COMMENT '数据对象ID',
`collected_time` varchar(50) COMMENT '入仓时间'
)
COMMENT 'xx表';# 2. 改表名
alter table t1 rename to t2;
# 3. 改字段名
ALTER TABLE t2 CHANGE object_id id int;
# 4. 新增字段
ALTER TABLE t2 ADD COLUMN name varchar(50);
#################### DML #####################
# 1. 插入数据
insert into t2 values('11','20230101');
insert into t2 values('22','20230101');
insert into t2 values('33','20230101');
insert into t2 values('44','20230101');# 2. 删数据
delete from t2 where object_id = '11';
# 3. 改数据
update t2 set collected_time = '20232222' where id = '22';
5.3 查看kafka消费情况
[myuser@hadoop103 software]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic example
{"data":null,"database":"test","es":1700534242000,"gtid":"","id":8,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `t1` (\n `object_id` varchar(50) COMMENT '数 据 对 象 ID',\n `collected_time` varchar(50) COMMENT '入 仓 时 间 '\n)\nCOMMENT 'xx表 '","sqlType":null,"table":"t1","ts":1700534243406,"type":"CREATE"}
{"data":null,"database":"test","es":1700534630000,"gtid":"","id":9,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"alter table t1 rename to t2","sqlType":null,"table":"t2","ts":1700534631275,"type":"RENAME"}
{"data":[{"object_id":"11","collected_time":"20230101"}],"database":"test","es":1700534636000,"gtid":"","id":10,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534637025,"type":"INSERT"}
{"data":[{"object_id":"22","collected_time":"20230101"}],"database":"test","es":1700534636000,"gtid":"","id":10,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534637025,"type":"INSERT"}
{"data":[{"object_id":"33","collected_time":"20230101"}],"database":"test","es":1700534636000,"gtid":"","id":10,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534637025,"type":"INSERT"}
{"data":[{"object_id":"44","collected_time":"20230101"}],"database":"test","es":1700534637000,"gtid":"","id":11,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534638543,"type":"INSERT"}
{"data":[{"object_id":"11","collected_time":"20230101"}],"database":"test","es":1700534643000,"gtid":"","id":12,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534644692,"type":"DELETE"}
{"data":null,"database":"test","es":1700534799000,"gtid":"","id":13,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER TABLE t2 ADD COLUMN name varchar(50)","sqlType":null,"table":"t2","ts":1700534799911,"type":"ALTER"}
{"data":null,"database":"test","es":1700534802000,"gtid":"","id":14,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER TABLE t2 CHANGE object_id id int","sqlType":null,"table":"t2","ts":1700534803242,"type":"ALTER"}
{"data":[{"id":"22","collected_time":"20232222","name":null}],"database":"test","es":1700535473000,"gtid":"","id":15,"isDdl":false,"mysqlType":{"id":"int","collected_time":"varchar(50)","name":"varchar(50)"},"old":[{"collected_time":"20230101"}],"pkNames":null,"sql":"","sqlType":{"id":4,"collected_time":12,"name":12},"table":"t2","ts":1700535474600,"type":"UPDATE"}
数据格式:
{
"data":[
{
"id":"22",
"collected_time":"20232222",
"name":null
}
],
"database":"test",
"es":1700535473000,
"gtid":"",
"id":15,
"isDdl":false,
"mysqlType":{
"id":"int",
"collected_time":"varchar(50)",
"name":"varchar(50)"
},
"old":[
{
"collected_time":"20230101"
}
],
"pkNames":null,
"sql":"",
"sqlType":{
"id":4,
"collected_time":12,
"name":12
},
"table":"t2",
"ts":1700535474600,
"type":"UPDATE" # 操作类型
}
参考
1. flink集成
Canal格式的使用方法和类型映射_实时计算Flink版-阿里云帮助中心 (aliyun.com)
Kafka中访问Canal元数据字段的代码示例:
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'canal-json'
);
2. 各字段含义参考

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)