延迟数据处理方案

  • 小延迟(乱序),用 watermark 容错(减慢时间的推进,让本已迟到的数据被认为没有迟到)
  • 中延迟(乱序),用 allowLateness (允许一定限度内的迟到,并对迟到数据重新触发窗口计算)
  • 大延迟(乱序),用 sideOutputLateData(将超出 allowLateness 的迟到数据输出到一个侧流中)

示例代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

// 1,e01,3000,pg02
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<Tuple2<EventBean2, Integer>> beanStream = source.map(s -> {
    String[] split = s.split(",");
    EventBean2 bean = new EventBean2(Long.parseLong(split[0]), split[1], Long.parseLong(split[2]), split[3], Integer.parseInt(split[4]));
    return Tuple2.of(bean, 1);
}).returns(new TypeHint<Tuple2<EventBean2, Integer>>() {
})
        .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<EventBean2, Integer>>forBoundedOutOfOrderness(Duration.ofMillis(0))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<EventBean2, Integer>>() {
                    @Override
                    public long extractTimestamp(Tuple2<EventBean2, Integer> element, long recordTimestamp) {
                        return element.f0.getTimeStamp();
                    }
                }));


OutputTag<Tuple2<EventBean2, Integer>> lateDataOutputTag = new OutputTag<>("late_data", TypeInformation.of(new TypeHint<Tuple2<EventBean2, Integer>>() {
}));

SingleOutputStreamOperator<String> sumResult = beanStream.keyBy(tp -> tp.f0.getGuid())
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))  // 事件时间滚动窗口,窗口长度为10
        .allowedLateness(Time.seconds(2))  // 允许迟到2s
        .sideOutputLateData(lateDataOutputTag)  // 迟到超过允许时限的数据,输出到该“outputtag”所标记的测流
        .sum("f1");

DataStream<Tuple2<EventBean2, Integer>> lateDataSideStream = sumResult.getSideOutput(lateDataOutputTag);

sumResult.print("主流结果");

lateDataSideStream.print("迟到数据");

env.execute();
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐