思路:

1、创建flink mysql cdc表

2、将order join products的结果写入到kafka表中。

这样就相当于完成了,DWD中的事实表构建。写入到kafka中,等待消费构建DWS或者ADS。

主要参考https://ververica.github.io/flink-cdc-connectors/master/content/快速上手/index.html

安装flink1.3.5

启动单机版 flink

./bin/start-cluster.sh

启动flink sql clint

./bin/sql-client.sh

有三种展示结果的模式:

SET sql-client.execution.result-mode=table;

SET sql-client.execution.result-mode=changelog;

SET sql-client.execution.result-mode=tableau;

使用flink cdc2.1.1

下载flink-sql-connector-mysql-cdc-2.1.1.jar 放到flink_home/lib下

https://github.com/ververica/flink-cdc-connectors/releases

在mysql中准备数据

-- MySQL

CREATE DATABASE mydb;

USE mydb;

CREATE TABLE products (

  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,

  name VARCHAR(255) NOT NULL,

  description VARCHAR(512)

);

ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products

VALUES (default,"scooter","Small 2-wheel scooter"),

       (default,"car battery","12V car battery"),

       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),

       (default,"hammer","12oz carpenter's hammer"),

       (default,"hammer","14oz carpenter's hammer"),

       (default,"hammer","16oz carpenter's hammer"),

       (default,"rocks","box of assorted rocks"),

       (default,"jacket","water resistent black wind breaker"),

       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (

  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,

  order_date DATETIME NOT NULL,

  customer_name VARCHAR(255) NOT NULL,

  price DECIMAL(10, 5) NOT NULL,

  product_id INTEGER NOT NULL,

  order_status BOOLEAN NOT NULL -- Whether order has been placed

) AUTO_INCREMENT = 10001;

INSERT INTO orders

VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),

       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),

       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

设置检查点

-- Flink SQL                  

Flink SQL> SET execution.checkpointing.interval = 3s;

创建flink mysql cdc table

-- Flink SQL

Flink SQL> CREATE TABLE products (

    id INT,

    name STRING,

    description STRING,

    PRIMARY KEY (id) NOT ENFORCED

  ) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'localhost',

    'port' = '3306',

    'username' = 'root',

    'password' = '12345678',

    'database-name' = 'mydb',

    'table-name' = 'products'

  );

Flink SQL> CREATE TABLE orders (

   order_id INT,

   order_date TIMESTAMP(0),

   customer_name STRING,

   price DECIMAL(10, 5),

   product_id INT,

   order_status BOOLEAN,

   PRIMARY KEY (order_id) NOT ENFORCED

) WITH (

   'connector' = 'mysql-cdc',

   'hostname' = 'localhost',

   'port' = '3306',

   'username' = 'root',

   'password' = '12345678',

   'database-name' = 'mydb',

   'table-name' = 'orders'

);

通过操作mysql表,发现flinksql中的表,可以实时感知到 mysql的表中数据变化,测试了DML语句,增删改都支持。DDL语句不能生效,对表结构的修改无法感知。

下载 flink kafka connector 放入FLINK_HOME/lib中

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.5/flink-sql-connector-kafka_2.12-1.13.5.jar

创建kafka表

kafka表配置详情:Kafka | Apache Flink

Flink SQL> CREATE TABLE ordersjoinproducts (

   order_id INT,

   order_date TIMESTAMP(0),

   customer_name STRING,

   price DECIMAL(10, 5),

   product_id INT,

   order_status BOOLEAN,

   product_name STRING

) WITH (

  'connector' = 'kafka',

  'topic' = 'ordersjoinproducts',

  'properties.bootstrap.servers' = 'localhost:9092',

  'properties.group.id' = 'testGroup',

  'scan.startup.mode' = 'earliest-offset',

  'format' = 'debezium-json', -- 这里必须是debezium-json,如果是Json那么 source mysql cdc表的数据无法写入到kafka中。

  'debezium-json.ignore-parse-errors'='true' -- default: false

);

将order join products的结果插入到kafka表中。

insert into ordersjoinproducts

select o.order_id,o.order_date,o.customer_name,o.price,o.product_id,o.order_status,p.name 

from orders o left join products p on p.id = o.product_id;

如果写入成功,那么在kafka中的数据格式应该是

lb@luobaodeMacBook-Pro ~/study/kafka_2.12-2.8.0$  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ordersjoinproducts --from-beginning                                                         130 ↵

{"before":null,"after":{"order_id":10001,"order_date":"2020-07-30 10:08:22","customer_name":"Jark","price":50.5,"product_id":102,"order_status":false,"product_name":null},"op":"c"}

{"before":null,"after":{"order_id":10002,"order_date":"2020-07-30 10:11:09","customer_name":"Sally","price":15,"product_id":105,"order_status":false,"product_name":null},"op":"c"}

{"before":{"order_id":10001,"order_date":"2020-07-30 10:08:22","customer_name":"Jark","price":50.5,"product_id":102,"order_status":false,"product_name":null},"after":null,"op":"d"}

{"before":null,"after":{"order_id":10001,"order_date":"2020-07-30 10:08:22","customer_name":"Jark","price":50.5,"product_id":102,"order_status":false,"product_name":"car battery"},"op":"c"}

{"before":null,"after":{"order_id":10003,"order_date":"2020-07-30 12:00:30","customer_name":"Edward","price":25.25,"product_id":106,"order_status":false,"product_name":null},"op":"c"}

{"before":{"order_id":10003,"order_date":"2020-07-30 12:00:30","customer_name":"Edward","price":25.25,"product_id":106,"order_status":false,"product_name":null},"after":null,"op":"d"}

{"before":null,"after":{"order_id":10003,"order_date":"2020-07-30 12:00:30","customer_name":"Edward","price":25.25,"product_id":106,"order_status":false,"product_name":"hammer"},"op":"c"}

{"before":{"order_id":10002,"order_date":"2020-07-30 10:11:09","customer_name":"Sally","price":15,"product_id":105,"order_status":false,"product_name":null},"after":null,"op":"d"}

{"before":null,"after":{"order_id":10002,"order_date":"2020-07-30 10:11:09","customer_name":"Sally","price":15,"product_id":105,"order_status":false,"product_name":"hammer"},"op":"c"}

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐