Flinkcdc同步mysql到StarRocks(删除)
2、1.13版本需要自定义序列化格式(删除的关键点)3、1.13版本编写source & sink1.15版本代码1.15版本的pom.xml序列化和日期转换(删除必须,日期转换类如果报notfound,看上方1.13处关于日期处理的链接文章)1.15版本的同步任务最后:保证mysql和sr中已经建立了表就可以完成一比一同步;如果解决了你的问题
Flink 使用mysql cdc实时监听mysql并且同步到StarRocks(SR)
问题:我们在使用过程中请注意cdc版本和flink的版本,
目前flink 1.15.2还没有很好地cdc兼容版本有能力的可以自己编译当前时间23-04-25我使用flink 1.15.3版本已经正常生产运行,参见目前版本兼容;
SR官方推荐的是Flink sql版本(支持增删改同步,实时同步) 如果不可以修改或者删除,请检查你的flink版本和cdc版本以及sr sink的版本。
以上是flink sql同步,sr官网有教程,以下以Flink DataStream Api编程示例演示基于mysql cdc同步到SR(增删改)
1.13版本代码(下方有1.15完成代码)
1、1.13版本的pom.xml
注意:这个例子基于flink 1.13,
截止目前不推荐1.15*版本,23-04-25 推荐flink 1.15.3版本
1、请注意scala版本jar包注释;
2、scope则是当你需要打包交由flink集群托管时需要设置provided
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.txlc</groupId>
<artifactId>dwh-cdc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>test</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
1.15版本以上 flink-streaming-java
以下需要加上scala版本_2.12
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>test</scope>-->
</dependency>
<!--<!–1.14*–>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_2.12</artifactId>
<version>${flink.version}</version>
<!– <scope>provided</scope>–>
</dependency>
<!–1.15*–>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader_2.12</artifactId>
<version>${flink.version}</version>
<!– <scope>provided</scope>–>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils_2.12</artifactId>
<version>${flink.version}</version>
<!– <scope>test</scope>–>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<!-- <version>30.1.1-jre-15.0</version>-->
<version>18.0-13.0</version>
<!-- <version>30.1.1-jre-14.0</version>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.3</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
<!-- <version>5.1.49</version>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.4</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.20.graal</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- 1.30flink SR官方推荐的 -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- <version>1.2.4_flink-1.15</version>-->
<!-- <version>1.2.4_flink-1.13_2.12</version>-->
<version>1.2.3_flink-1.13_2.11</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
<projectName>Apache Flink</projectName>
<encoding>UTF-8</encoding>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>-->
</plugins>
</build>
</project>
2、1.13版本需要自定义序列化格式(删除的关键点)
注意:
1、mysql cdc同步过来的格式并不能直接由SR sink处理,需要拿出来before或者after中的json数据,并且如果你想要更新或者删除需要增加__op
字段.
2、这里同步有个小问题即日期需要自己处理才可以完美同步到SR.关于日式格式化参见我这片文章
package *;
import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.text.SimpleDateFormat;
import java.util.Objects;
/**
* 自定义反序列化
*
* @author JGMa
*/
public class TxlcCustomerSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
String topic = sourceRecord.topic();
String[] strings = topic.split("\\.");
// String database = strings[1];
// String table = strings[2];
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Struct value = (Struct) sourceRecord.value();
// JSONObject data = new JSONObject();
Struct before = value.getStruct("before");
JSONObject beforeData = new JSONObject();
if (before != null) {
for (Field field : before.schema().fields()) {
Object o = before.get(field);
beforeData.put(field.name(), o);
}
}
Struct after = value.getStruct("after");
JSONObject afterData = new JSONObject();
if (after != null) {
for (Field field : after.schema().fields()) {
Object o = after.get(field);
afterData.put(field.name(), o);
}
}
Envelope.Operation op = Envelope.operationFor(sourceRecord);
System.out.println("->" + value.toString());
System.out.println("===" + beforeData.toJSONString());
System.out.println(">>>" + afterData.toJSONString());
// JSONObject object = new JSONObject();
// object.put("database", database);
// object.put("table", table);
if (Objects.equals(op, Envelope.Operation.DELETE)) {
// starrocks表需要使用主键模型,另外json中需要有{"__op":1}表示删除,{"__op":0}表示upsert
beforeData.put("__op", 1);
collector.collect(beforeData.toJSONString());
} else if (Objects.equals(op, Envelope.Operation.UPDATE)) {
afterData.put("__op", 0);
}
collector.collect(afterData.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
3、1.13版本编写source & sink
package *;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.txlc.cdc.execute.core.TxlcCustomerSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 全量监听mysql同步到starrocks
* Print MySQL Snapshot + Binlog
* <p>
* warning:sr表字段容量要足够,否则会插入NULL
*
* @author JGMa
*/
public class FlinkMysqlCDCStarrocks {
private static final Logger log = LoggerFactory.getLogger(FlinkMysqlCDCStarrocks.class);
public static void main(String[] args) throws Exception {
ParameterTool paramTool = ParameterTool.fromArgs(args);
// String tableName = paramTool.get("table");
// String srcHost = paramTool.get("srcHost");
// String srcDatabase = paramTool.get("srcDatabase");
// String srcUsername = paramTool.get("srcUsername");
// String srcPassword = paramTool.get("srcPassword");
String tableName = "temp_flink";
String srcHost = "192.168.10.14";
String srcDatabase ="xcode";
String srcUsername ="root";
String srcPassword ="123456";
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.flinkcdc 做断点续传,需要将flinkcdc读取binlog的位置信息以状态方式保存在checkpoint中即可.
//(1)开启checkpoint 每隔5s 执行一次ck 指定ck的一致性语义
// env.enableCheckpointing(5000);
// CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//
// checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//
// //3.设置任务关闭后,保存最后后一次cp数据.
// checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
//
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
// // 设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
//
// checkpointConfig.setCheckpointTimeout(600000);
// // 设置两次checkpoint之间的最小时间间隔
// checkpointConfig.setMinPauseBetweenCheckpoints(500);
// // 设置并发checkpoint的数目
// checkpointConfig.setMaxConcurrentCheckpoints(1);
// 有界数据流,则会采用批方式进行数据处理
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 开启checkpoints的外部持久化 这里设置了 清除job时保留checkpoint
// 目前代码不能设置保留的checkpoint个数 默认值时保留一个 假如要保留3个
// 可以在flink-conf.yaml中配置 state.checkpoints.num-retained: 3
// env.setStateBackend();
//5.创建Sources数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(srcHost)
.port(3306)
.databaseList(srcDatabase)
.tableList(srcDatabase + "." + tableName)
.username(srcUsername)
.password(srcPassword)
// converts SourceRecord to JSON String
.deserializer(new TxlcCustomerSchema())
.build();
//6.添加数据源
DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "[MySQL Source]")
.setParallelism(1);
streamSource.addSink(StarRocksSink.sink(
StarRocksSinkOptions.builder()
.withProperty("connector", "starrocks")
.withProperty("jdbc-url", "jdbc:mysql://192.168.10.245:9030?characterEncoding=utf-8&useSSL=false")
.withProperty("load-url", "192.168.10.11:8030")
.withProperty("username", "root")
.withProperty("password", "123456")
.withProperty("table-name", tableName)
.withProperty("database-name", "data_center")
.withProperty("sink.buffer-flush.interval-ms", "10000")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
// .withProperty("sink.properties.column_separator", "\\x01")
// .withProperty("sink.properties.row_delimiter", "\\x02")
.withProperty("sink.parallelism", "1")
.build()
)).name(">>>StarRocks Sink<<<");
env.execute("mysql sync StarRocks 表:" + tableName);
}
}
1.15版本代码
1.15版本的pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.15.3</flink.version>
<flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr>
<flink.connector.mysql>2.3.0</flink.connector.mysql>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 基础包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${flink.connector.sr}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.connector.mysql}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.4</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.8.3-10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.5.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<!-- <scope>provided</scope>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.10</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.txlc.flink.job.CommonFlinkSingleTableStreamJob</mainClass>
</transformer>
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
<projectName>TXLC-Flink-CDC</projectName>
<encoding>UTF-8</encoding>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
序列化和日期转换(删除必须,日期转换类如果报notfound,看上方1.13处关于日期处理的链接文章)
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* @author JGMa
*/
@Slf4j
public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (Objects.equals(op, Envelope.Operation.DELETE)) {
Map<String, Object> beforeRow = extractBeforeRow(value, valueSchema);
beforeRow.put("__op", 1);
ObjectMapper objectMapper = new ObjectMapper();
String delete = objectMapper.writeValueAsString(beforeRow);
log.debug("\n====>DELETE record info:{}", delete);
out.collect(delete);
} else if (Objects.equals(op, Envelope.Operation.UPDATE)) {
Map<String, Object> afterRow = extractAfterRow(value, valueSchema);
afterRow.put("__op", 0);
ObjectMapper objectMapper = new ObjectMapper();
String update = objectMapper.writeValueAsString(afterRow);
log.debug("\n====>UPDATE record info:{}", update);
out.collect(update);
} else {
Map<String, Object> row = extractAfterRow(value, valueSchema);
ObjectMapper objectMapper = new ObjectMapper();
String res = objectMapper.writeValueAsString(row);
log.debug("\n====>record info:{}", res);
out.collect(res);
}
}
private Map<String, Object> getRowMap(Struct after) {
return after.schema().fields().stream().collect(Collectors.toMap(Field::name, f -> after.get(f.name()) == null ? "" : after.get(f.name())));
}
private Map<String, Object> extractAfterRow(Struct value, Schema valueSchema) throws Exception {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
return getRowMap(after);
}
private Map<String, Object> extractBeforeRow(Struct value, Schema valueSchema) throws Exception {
Struct after = value.getStruct(Envelope.FieldName.BEFORE);
return getRowMap(after);
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
////日期处理
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
* mysql日期字段时区/格式处理
* @author JGMa
*/
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
@Override
public void configure(Properties props) {
}
@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
String sqlType = column.typeName().toUpperCase();
SchemaBuilder schemaBuilder = null;
Converter converter = null;
if ("DATE".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
converter = this::convertDate;
}
if ("TIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
converter = this::convertTime;
}
if ("DATETIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
converter = this::convertDateTime;
}
if ("TIMESTAMP".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
converter = this::convertTimestamp;
}
if (schemaBuilder != null) {
registration.register(schemaBuilder, converter);
}
}
private String convertDate(Object input) {
if (input == null) {
return null;
}
if (input instanceof LocalDate) {
return dateFormatter.format((LocalDate) input);
}
if (input instanceof Integer) {
LocalDate date = LocalDate.ofEpochDay((Integer) input);
return dateFormatter.format(date);
}
return String.valueOf(input);
}
private String convertTime(Object input) {
if (input == null) {
return null;
}
if (input instanceof Duration) {
Duration duration = (Duration) input;
long seconds = duration.getSeconds();
int nano = duration.getNano();
LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
return timeFormatter.format(time);
}
return String.valueOf(input);
}
private String convertDateTime(Object input) {
if (input == null) {
return null;
}
if (input instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");
}
return String.valueOf(input);
}
private String convertTimestamp(Object input) {
if (input == null) {
return null;
}
if (input instanceof ZonedDateTime) {
// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
ZonedDateTime zonedDateTime = (ZonedDateTime) input;
LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
return timestampFormatter.format(localDateTime).replaceAll("T", " ");
}
return String.valueOf(input);
}
}
1.15版本的同步任务
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String srcTable = params.get("srcTable", "");
String srcHost = params.get("srcHost", "");
String srcDb = params.get("srcDb", "test_deploy");
String srcUsername = params.get("srcUsername", "");
String srcPassword = params.get("srcPassword", "");
int checkpointInterval = params.getInt("checkpointInterval", 60000);
String sinkHost = params.get("sinkHost", "192.168.10.2");
String sinkDb = params.get("sinkDb", "data_center");
String sinkUsername = params.get("sinkUsername", "root");
String sinkPassword = params.get("sinkPassword", "123456");
String sinkTable = params.get("sinkTable", srcTable);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 关闭 Operator Chaining, 令运行图更容易初学者理解
env.disableOperatorChaining();
// env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
Time.of(10, TimeUnit.SECONDS)
));
// 开启checkpoint 每隔5s(5000) 执行一次cp,精确一次(exactly-once)对比至少一次(at-least-once)对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
env.enableCheckpointing(checkpointInterval);
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
// 设置同步状态存储位置
// env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://192.168.10.245:9000/flink/flink-checkpoints/" + srcTable));
// env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///home/txlc/txlc/flink/flink-checkpoints/" + srcTable));
Properties properties = new Properties();
properties.setProperty("converters", "dateConverters");
properties.setProperty("dateConverters.type", "com.txlc.flink.core.MySqlDateTimeConverter");
properties.setProperty("dateConverters.format.date", "yyyy-MM-dd");
properties.setProperty("dateConverters.format.time", "HH:mm:ss");
properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8");
//全局读写锁,可能会影响在线业务,跳过锁设置
properties.setProperty("debezium.snapshot.locking.mode", "none");
properties.setProperty("include.schema.changes", "true");
properties.setProperty("bigint.unsigned.handling.mode", "long");
properties.setProperty("decimal.handling.mode", "double");
//自定义时间转换配置
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(srcHost)
.port(3306)
.databaseList(srcDb)
.tableList(srcDb + "." + srcTable)
.username(srcUsername)
.password(srcPassword)
// ZoneInfoFile.oldMappings
.debeziumProperties(properties)
.deserializer(new JsonStringDebeziumDeserializationSchema())
.build();
DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[" + srcTable + "<< source >>" + srcHost + srcDb + "]");
// .setParallelism(parallelism);
streamSource.addSink(StarRocksSink.sink(
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai")
.withProperty("load-url", sinkHost + ":8030")
.withProperty("database-name", sinkDb)
.withProperty("username", sinkUsername)
.withProperty("password", sinkPassword)
.withProperty("table-name", sinkTable)
// 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
// .withProperty("sink.properties.partial_update", "true")
// .withProperty("sink.properties.columns", "k1,k2,k3")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
// 设置并行度,多并行度情况下需要考虑如何保证数据有序性
// .withProperty("sink.parallelism", parallelism+"")
.build())
).name(">>>StarRocks " + sinkTable + " Sink<<<");
env.execute(srcTable + "<< stream sync job >>" + srcHost + srcDb);
}
最后:保证mysql和sr中已经建立了表就可以完成一比一同步;
如果解决了你的问题

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)