1.环境:

windocs service2012 、 jdk版本1.8 、canal版本1.5、mysql版本5.7、

注意:canal版本1.5需要的jdk是1.8   如果你下载的是canal1.6,jdk是1.8,那样会报错。

下载地址  Releases · alibaba/canal · GitHub  下载并上传到服务器

三个文件  

canal.adapter   客户端

canal.admin   后台web端

canal.deployer 服务端

2.mysql配置:
mysq 开启日志  mysql配置添加


user=mysql
slow_query_log = 1
log_error = /home/data/mysql57/data/mysql.err
slow_query_log  = ON     #开启慢查询
long_query_time  =4     #设置慢查询时间 超过一秒的记录

server_id = 57
log-bin=mysql-bin
log-bin-index=master-bin.index
expire_logs_days = 7 
binlog_format=row
slave_skip_errors=1062
log_slave_updates=1
max_connections = 1000
wait_timeout=864000 
interactive_timeout=864000

 

添加用户权限 

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'Canal@123456' WITH GRANT OPTION;    
flush privileges;    

创建数据库 

create database bigdata default charset utf8; 

3.配置服务器端  canal.deployer

在 conf/example/instance.properties

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0

# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

我只配了两个地方

 启动:进入 canal.deployer的目录 ./bin/startup.sh

 4.配置客户端 canal.adapter

配置:配置客户端

vi application.yml  配置两个地方,一个是要同步的mysql数据库,一个是es配置

 这是配置

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/bigdata?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
      username: canal
      password: canal
    #defaultDS2:
       #url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true
       #username: canal
       #password: 123456	      
  canalAdapters:
   # canal instance Name or mq topic name
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9200
        properties:
          mode: rest # or transport 
          security.auth: 
          cluster.name: my-application

这是同步的测试脚本

 全量/增量更新 es7/mytest-user.yml 文件内容

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: detail202304
  _id: _id
#  upsert: true
#  pk: id
  sql: "select a.id as _id,a.id from t_data_order_detail2020 a"
#  objFields:
#    _labels: array:;
  etlCondition: "where a.id ={}"
  commitBatch: 3000

配置完启动客户端:进到安装目录

 查看启动日志

全量更新为向Adapter发送POST更新
- postMan发送请求

curl  http://127.0.0.1:8081/etl/es7/mytest-user.yml

curl -X POST  http://127.0.0.1:8081/etl/es7/mytest-user.yml

 额添加成功。剩下的就是java的事了。

 可以通过jps查看

5.遇到的问题 Task not found

 报错:

[root@hbyc ~]# curl -X POST  http://127.0.0.1:8081/etl/es7/t_data_order_detail202304.yml
{"succeeded":false,"errorMessage":"Task not found"}

解决:

 canal的客户端 canal.adapter 在修改完配置需要重新启动客户端,要不然找不到配置文件。

每次修改配置文件需要重启客户端!!!

每次修改配置文件需要重启客户端!!!

每次修改配置文件需要重启客户端!!!

 剩下的就是查看添加索引了

es添加索引命令行和浏览器添加索引--图文详解_舰长115的博客-CSDN博客

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐