1. 准备安装包,安装canal

Releases · alibaba/canal · GitHub

我下载的是最新版1.1.7:canal.deployer-1.1.7.tar.gz

此为部署安装包,没有管理界面,可满足基础使用。

上传至服务器,在需要安装的路径新建一个canal-1.1.7文件夹,解压至该文件夹:

tar -zxvf canal.deployer-1.1.7.tar.gz -C /opt/module/cannal-1.1.7

2. 开启mysql binlog,为canal创建用户

2.1 开启mysql binlog

sudo vim /etc/my.cnf

修改文件: 

server-id=1

log-bin=mysql-bin

binlog_format=row

binlog-do-db=test  

binlog-do-db=test2       # 开启binlog监控test、test2库

 重启mysql服务

sudo systemctl restart mysqld

2.2 为canal创建用户并授权

grant all privileges on *.* to 'canal'@'%' IDENTIFIED BY 'canal123';
FLUSH PRIVILEGES ;

以上表示创建canal用户,密码为canal123,授权所有库的查询、插入、复制slave等所有权限,可根据需求自行修改。%表示允许canal用户使用任何远程ip登录(前提是服务器联通)。

 

2.3 注意事项

如果mysql和canal安装的不在一台服务器,最好事先检查连通性:

telnet hadoop102 3306    # hadoop102为mysql所在服务器ip

若报错:-bash: telnet: 未 找 到 命 令,表示未安装telnet,请安装:

rpm -qa telnet-server

yum install telnet-server

yum install telnet

如图表示能够联通:

3. 配置canal 

# 进入安装路径下conf'文件夹

cd /opt/module/cannal-1.1.7/conf

 如上图注意两个文件,canal.properties是基本通用配置,example/instance.properties是监控实例的具体配置。

3.1 基本通用配置:canal.properties

vim canal.properties

修改必要配置:

canal.serverMode = kafka        # 监控后输出到kafka,默认是tcp

#kafka集群地址

kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 

其他配置按需求改即可。

3.2 监控实例配置:example/instance.properties

vim example/instance.properties

修改必要配置:

canal.instance.mysql.slaveId=1111         # canal伪装成mysql slave的id,不重复即可

canal.instance.master.address=hadoop102:3306         # mysql地址

canal.instance.dbUsername=canal         # 2.2中mysql为canal创建的账号

canal.instance.dbPassword=canal123        # 2.2中mysql为canal创建的密码

canal.mq.topic=example        # kafka的topic名称,默认example,可按需修改

canal.mq.partition=0        # 同步到kafka的某个分区,默认一个,修改成多个时要考虑kafka多分区间数据的顺序性

其他配置按需求改即可。

4. 启动canal

./bin/startup.sh

查看启动情况:jps

 

5. 查看同步情况

5.1 启动zookeeper、kafka集群

准备一个kafka消费者:

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic example

5.2 mysql中test数据库中操作:

use test;

####################  DDL  #####################

# 1. 建表

CREATE TABLE `t1` (
  `object_id` varchar(50) COMMENT '数据对象ID',
  `collected_time` varchar(50) COMMENT '入仓时间'
)
COMMENT 'xx表';

# 2. 改表名

alter table t1 rename to t2;

# 3. 改字段名

ALTER TABLE t2  CHANGE object_id id int;

# 4. 新增字段

ALTER TABLE t2 ADD COLUMN name varchar(50);

####################  DML  #####################

# 1. 插入数据

insert into t2 values('11','20230101');
insert into t2 values('22','20230101');
insert into t2 values('33','20230101');
insert into t2 values('44','20230101');

# 2. 删数据

delete from t2 where object_id = '11';

# 3. 改数据

update t2 set collected_time = '20232222' where id = '22';

5.3 查看kafka消费情况

[myuser@hadoop103 software]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic example

{"data":null,"database":"test","es":1700534242000,"gtid":"","id":8,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `t1` (\n `object_id` varchar(50) COMMENT '数 据 对 象 ID',\n `collected_time` varchar(50) COMMENT '入 仓 时 间 '\n)\nCOMMENT 'xx表 '","sqlType":null,"table":"t1","ts":1700534243406,"type":"CREATE"}

{"data":null,"database":"test","es":1700534630000,"gtid":"","id":9,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"alter table t1 rename to t2","sqlType":null,"table":"t2","ts":1700534631275,"type":"RENAME"}

{"data":[{"object_id":"11","collected_time":"20230101"}],"database":"test","es":1700534636000,"gtid":"","id":10,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534637025,"type":"INSERT"}

{"data":[{"object_id":"22","collected_time":"20230101"}],"database":"test","es":1700534636000,"gtid":"","id":10,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534637025,"type":"INSERT"}

{"data":[{"object_id":"33","collected_time":"20230101"}],"database":"test","es":1700534636000,"gtid":"","id":10,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534637025,"type":"INSERT"}

{"data":[{"object_id":"44","collected_time":"20230101"}],"database":"test","es":1700534637000,"gtid":"","id":11,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534638543,"type":"INSERT"}

{"data":[{"object_id":"11","collected_time":"20230101"}],"database":"test","es":1700534643000,"gtid":"","id":12,"isDdl":false,"mysqlType":{"object_id":"varchar(50)","collected_time":"varchar(50)"},"old":null,"pkNames":null,"sql":"","sqlType":{"object_id":12,"collected_time":12},"table":"t2","ts":1700534644692,"type":"DELETE"}

{"data":null,"database":"test","es":1700534799000,"gtid":"","id":13,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER TABLE t2 ADD COLUMN name varchar(50)","sqlType":null,"table":"t2","ts":1700534799911,"type":"ALTER"}

{"data":null,"database":"test","es":1700534802000,"gtid":"","id":14,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER TABLE t2 CHANGE object_id id int","sqlType":null,"table":"t2","ts":1700534803242,"type":"ALTER"}

{"data":[{"id":"22","collected_time":"20232222","name":null}],"database":"test","es":1700535473000,"gtid":"","id":15,"isDdl":false,"mysqlType":{"id":"int","collected_time":"varchar(50)","name":"varchar(50)"},"old":[{"collected_time":"20230101"}],"pkNames":null,"sql":"","sqlType":{"id":4,"collected_time":12,"name":12},"table":"t2","ts":1700535474600,"type":"UPDATE"}

数据格式:

 {
    "data":[
        {
            "id":"22",
            "collected_time":"20232222",
            "name":null
        }
    ],
    "database":"test",
    "es":1700535473000,
    "gtid":"",
    "id":15,
    "isDdl":false,
    "mysqlType":{
        "id":"int",
        "collected_time":"varchar(50)",
        "name":"varchar(50)"
    },
    "old":[
        {
            "collected_time":"20230101"
        }
    ],
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "id":4,
        "collected_time":12,
        "name":12
    },
    "table":"t2",
    "ts":1700535474600,
    "type":"UPDATE"        # 操作类型
}


参考

1. flink集成

Canal格式的使用方法和类型映射_实时计算Flink版-阿里云帮助中心 (aliyun.com) 

Kafka中访问Canal元数据字段的代码示例:

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
  origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'canal-json'
);

2. 各字段含义参考

https://github.com/alibaba/canal/wiki

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐