flink datastream api 使用示例
kafka --> elasticsearchflink版本1.10设置初始化参数ParameterTool parameter_tool = ParameterTool.fromArgs(args);String config_path = parameter_tool.get("ConfigPath");String source_topic = "my-topic";String so
·
kafka --> elasticsearch
flink版本1.10
设置初始化参数
ParameterTool parameter_tool = ParameterTool.fromArgs(args);
String config_path = parameter_tool.get("ConfigPath");
String source_topic = "my-topic";
String source_zookeeper = parameter.get("zookeeper.source.addr");
String source_kafka_server = parameter.get("kafkaServer.source.addr");
String es_ip = parameter.get("elasticsearch.ip");
int es_port = parameter.getInt("elasticsearch.port");
int es_socket_port = parameter.getInt("elasticsearch.sorketport");
String savepoint_dir = parameter.get("checkpoint.uri");
设置重启策略
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
));
设置checkpoint默认值
// start a checkpoint every 5000 ms
env.enableCheckpointing(5000);
// set stat backend dir
env.setStateBackend(new FsStateBackend(savepoint_dir));
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
设置kafka source、消费策略
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", source_kafka_server);
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", source_zookeeper);
properties.setProperty("group.id", "sync_vweb-test");
// 无偏移量时默认从头开始消费
properties.setProperty("auto.offset.reset", "earliest");
// kafka消费者不自动提交偏移量
properties.setProperty("enable.auto.commit", "false");
FlinkKafkaConsumer<ObjectNode> kafka_source = new FlinkKafkaConsumer<>(source_topic, new JSONKeyValueDeserializationSchema(true), properties);
// 设置从最开始位置消费
kafka_source.setStartFromEarliest(); // the default behaviour
DataStream<ObjectNode> input = env.addSource(kafka_source);
设置es sink
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
JSONObject jsonObject = JSONObject.parseObject(element);
JSONObject json_value = new JSONObject();
json_value.put("source", jsonObject.getString("source"));
return Requests.indexRequest()
.index("video_data")
.type("drama_data")
.source(json_value);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
最后执行作业
env.execute("test job");
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)