Flink CDC + Flink SQL:构建端到端实时数据处理流水线(MySQL → Flink → ClickHouse)

1. 技术架构概述
  • 数据源:MySQL(通过CDC捕获变更)
  • 处理引擎:Flink(使用SQL API进行实时处理)
  • 数据目标:ClickHouse(高性能列式存储)
  • 核心组件
    • Flink CDC Connector:实时捕获MySQL的INSERT/UPDATE/DELETE事件
    • Flink SQL:进行数据转换、过滤和聚合
    • ClickHouse JDBC Sink:写入处理后的数据
2. 环境准备
-- 添加Flink CDC依赖(示例)
ADD JAR '/path/to/flink-sql-connector-mysql-cdc-2.4.0.jar';
ADD JAR '/path/to/flink-connector-clickhouse-1.16.0.jar';

3. Flink SQL 实现步骤
步骤1:创建MySQL CDC源表
CREATE TABLE mysql_users (
    id BIGINT PRIMARY KEY,
    name STRING,
    age INT,
    update_time TIMESTAMP(3)
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql-host',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test_db',
    'table-name' = 'users',
    'server-time-zone' = 'UTC'
);

步骤2:数据转换(示例:过滤+字段计算)
CREATE VIEW processed_users AS
SELECT 
    id,
    UPPER(name) AS name_upper,  -- 姓名转大写
    age,
    CASE WHEN age >= 18 THEN 1 ELSE 0 END AS is_adult,  -- 年龄标记
    update_time
FROM mysql_users
WHERE age > 0;  -- 过滤无效年龄

步骤3:创建ClickHouse目标表
CREATE TABLE ck_users (
    id BIGINT,
    name_upper STRING,
    age INT,
    is_adult INT,
    update_time TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:clickhouse://clickhouse-host:8123/default',
    'table-name' = 'users',
    'username' = 'default',
    'password' = '',
    'driver' = 'com.clickhouse.jdbc.ClickHouseDriver',
    'sink.batch.interval' = '1s',
    'sink.max-retries' = '3'
);

步骤4:启动实时同步作业
INSERT INTO ck_users
SELECT * FROM processed_users;

4. 关键优化策略
  1. CDC配置优化
    'debezium.snapshot.mode' = 'initial'  -- 首次全量+增量同步
    'scan.incremental.snapshot.chunk.size' = '8096'  -- 分块读取
    

  2. Flink检查点配置
    SET 'execution.checkpointing.interval' = '30s';
    

  3. ClickHouse写入优化
    • 启用批量写入(sink.batch.size=1000
    • 使用ReplacingMergeTree引擎处理更新:
      ENGINE = ReplacingMergeTree(update_time)
      ORDER BY id
      

5. 异常处理机制
  • CDC断点续传:通过Flink Checkpoint保存binlog位置
  • 死信队列:捕获异常数据写入Kafka
    CREATE TABLE dead_letter_queue (...) WITH ('connector'='kafka'...);
    INSERT INTO dead_letter_queue SELECT * FROM ... WHERE age IS NULL;
    

6. 监控指标
  • Flink Dashboard:跟踪numRecordsIn/numRecordsOut
  • ClickHouse查询
    SELECT count() FROM system.parts WHERE table='users'
    

7. 完整部署流程
graph LR
A[MySQL] -->|CDC| B(Flink SQL)
B -->|实时处理| C[ClickHouse]
C --> D{Grafana监控}

8. 典型应用场景
  • 实时用户画像更新
  • 电商订单状态监控
  • 日志审计分析系统

注意:生产环境需配置高可用(HA)模式,建议使用:

  • Flink on YARN/K8s
  • ClickHouse多副本集群
  • MySQL主从复制
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐