cancal 同步mysql数据到es中
windocs service2012 、 jdk版本1.8 、canal版本1.5、mysql版本5.7、启动:进入 canal.deployer的目录 ./bin/startup.sh。在 conf/example/instance.properties。如果你下载的是canal1.6,jdk是1.8,那样会报错。es7/mytest-user.yml 文件内容。canal版本1.5需要的jd
1.环境:
windocs service2012 、 jdk版本1.8 、canal版本1.5、mysql版本5.7、
注意:canal版本1.5需要的jdk是1.8 如果你下载的是canal1.6,jdk是1.8,那样会报错。
下载地址 Releases · alibaba/canal · GitHub 下载并上传到服务器
三个文件
canal.adapter 客户端
canal.admin 后台web端
canal.deployer 服务端
2.mysql配置:
mysq 开启日志 mysql配置添加
user=mysql
slow_query_log = 1
log_error = /home/data/mysql57/data/mysql.err
slow_query_log = ON #开启慢查询
long_query_time =4 #设置慢查询时间 超过一秒的记录
server_id = 57
log-bin=mysql-bin
log-bin-index=master-bin.index
expire_logs_days = 7
binlog_format=row
slave_skip_errors=1062
log_slave_updates=1
max_connections = 1000
wait_timeout=864000
interactive_timeout=864000
添加用户权限
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'Canal@123456' WITH GRANT OPTION;
flush privileges;
创建数据库
create database bigdata default charset utf8;
3.配置服务器端 canal.deployer
在 conf/example/instance.properties
## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0
# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=
我只配了两个地方
启动:进入 canal.deployer的目录 ./bin/startup.sh
4.配置客户端 canal.adapter
配置:配置客户端
vi application.yml 配置两个地方,一个是要同步的mysql数据库,一个是es配置
这是配置
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/bigdata?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
username: canal
password: canal
#defaultDS2:
#url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true
#username: canal
#password: 123456
canalAdapters:
# canal instance Name or mq topic name
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
hosts: 127.0.0.1:9200
properties:
mode: rest # or transport
security.auth:
cluster.name: my-application
这是同步的测试脚本
全量/增量更新 es7/mytest-user.yml 文件内容
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: detail202304
_id: _id
# upsert: true
# pk: id
sql: "select a.id as _id,a.id from t_data_order_detail2020 a"
# objFields:
# _labels: array:;
etlCondition: "where a.id ={}"
commitBatch: 3000
配置完启动客户端:进到安装目录
查看启动日志
全量更新为向Adapter发送POST更新
- postMan发送请求
curl http://127.0.0.1:8081/etl/es7/mytest-user.yml
curl -X POST http://127.0.0.1:8081/etl/es7/mytest-user.yml
额添加成功。剩下的就是java的事了。
可以通过jps查看
5.遇到的问题 Task not found
报错:
[root@hbyc ~]# curl -X POST http://127.0.0.1:8081/etl/es7/t_data_order_detail202304.yml
{"succeeded":false,"errorMessage":"Task not found"}
解决:
canal的客户端 canal.adapter 在修改完配置需要重新启动客户端,要不然找不到配置文件。
每次修改配置文件需要重启客户端!!!
每次修改配置文件需要重启客户端!!!
每次修改配置文件需要重启客户端!!!
剩下的就是查看添加索引了

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)