背景

0点飘移是生产中常见的问题,我按照官网自定义拦截器并部署到生产后,没有起作用,我针对实际业务字段做了代码调整。

修复前

  • 代码
package com.cowa.flume;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }
    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();

        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }
    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }
    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }
}
  • 报错:java.lang.RuntimeException:For Input String :“”
    在这里插入图片描述

修复后

  • 代码
package com.cowa.flume;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }
    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();

        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String ts = jsonObject.getString("time");
        headers.put("timestamp", ts);

        return event;
    }
    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }
    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }
}

分析总结

问题出在了 String ts = jsonObject.getString(“ts”);
我数据源中的时间字段名字不是ts,而是time,所以改为如下
String ts = jsonObject.getString(“time”);恢复正常。

!!!大家一定要注意这个小点,不然拦截器获取不到正确的key放入header

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐