kafka --> elasticsearch

flink版本1.10

设置初始化参数

ParameterTool parameter_tool = ParameterTool.fromArgs(args);
String config_path = parameter_tool.get("ConfigPath");
String source_topic = "my-topic";
String source_zookeeper = parameter.get("zookeeper.source.addr");
String source_kafka_server = parameter.get("kafkaServer.source.addr");

String es_ip = parameter.get("elasticsearch.ip");
int es_port = parameter.getInt("elasticsearch.port");
int es_socket_port = parameter.getInt("elasticsearch.sorketport");

String savepoint_dir = parameter.get("checkpoint.uri");

设置重启策略

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
               3, // 尝试重启的次数
               Time.of(10, TimeUnit.SECONDS) // 延时
       ));
env.setRestartStrategy(RestartStrategies.failureRateRestart(
        3, // 每个时间间隔的最大故障次数
        Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
        Time.of(10, TimeUnit.SECONDS) // 延时
));

设置checkpoint默认值

// start a checkpoint every 5000 ms
env.enableCheckpointing(5000);
// set stat backend dir
env.setStateBackend(new FsStateBackend(savepoint_dir));
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

设置kafka source、消费策略

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", source_kafka_server);
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", source_zookeeper);
properties.setProperty("group.id", "sync_vweb-test");
// 无偏移量时默认从头开始消费
properties.setProperty("auto.offset.reset", "earliest");
// kafka消费者不自动提交偏移量
properties.setProperty("enable.auto.commit", "false");

FlinkKafkaConsumer<ObjectNode> kafka_source = new FlinkKafkaConsumer<>(source_topic, new JSONKeyValueDeserializationSchema(true), properties);

// 设置从最开始位置消费
kafka_source.setStartFromEarliest(); // the default behaviour
DataStream<ObjectNode> input = env.addSource(kafka_source);

设置es sink

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
            public IndexRequest createIndexRequest(String element) {
                JSONObject jsonObject = JSONObject.parseObject(element);
                JSONObject json_value = new JSONObject();
                json_value.put("source", jsonObject.getString("source"));
                return Requests.indexRequest()
                        .index("video_data")
                        .type("drama_data")
                        .source(json_value);
            }

            @Override
            public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        }));

最后执行作业

 env.execute("test job");
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐