资料

文档查看地址:http://flume.apache.org/FlumeUserGuide.html
Flume官网地址:http://flume.apache.org/
下载地址:http://archive.apache.org/dist/flume/
参考博客:https://blog.csdn.net/qq_40180229/article/details/104523248
链接:https://pan.baidu.com/s/1EWOiiNWyIWTYjxYfBfZplw
提取码:3hti

集群规划

在这里插入图片描述

编写kafka-flume-hdfs.conf

[scorpion@warehouse102 flume-1.9.0]$ cd /opt/module/flume-1.9.0/conf/
[scorpion@warehouse102 conf]$ vim kafka-flume-hdfs.conf
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = warehouse102:9092,warehouse103:9092,warehouse104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.warehouse.flume.interceptor.TimeStampInterceptor$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6


## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

在这里插入图片描述

编写TimeStampInterceptor

仓库:https://github.com/SmallScorpion/flume-interceptor

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {

    private ArrayList<Event> events = new ArrayList<>();

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        events.clear();
        for (Event event : list) {
            events.add(intercept(event));
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

上传jar包并分发

在这里插入图片描述

编写消费数据flume群起脚本

[scorpion@warehouse102 bin]$ vim flume-consumer-log.sh
#! /bin/bash
# 消费kafka数据flume群起脚本
case $1 in
"start"){
        for i in warehouse104
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.9.0/log2.txt   2>&1 &"
        done
};;
"stop"){
        for i in warehouse104
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
        done

};;
esac

// 增加权限
[scorpion@warehouse102 bin]$ chmod 777 ./flume-consumer-log.sh

在这里插入图片描述

数据通道测试

// 更改/opt/module/applog/ 下的application.properties
[scorpion@warehouse102 applog]$ vim application.properties
mock.date=2020-06-19

// 生成数据 
[scorpion@warehouse102 applog]$ java -jar gmall2020-mock-log-2020-04-01.jar

在这里插入图片描述

采集通道启动/停止脚本

[scorpion@warehouse102 bin]$ vim acquisition-channel-cluster-together.sh

#!/bin/bash
# 采集通道启动/停止脚本
case $1 in
"start"){
        echo ================== 启动 集群 ==================

        # 启动 Zookeeper集群
        zookeeper-cluster-together.sh start

        # 启动 Hadoop集群
        hadoop-cluster-together.sh start

        # 启动 Kafka采集集群
        kafka-cluster-together.sh start

        # 启动 Flume采集集群
        flume-collection-log.sh start

        # 启动 Flume消费集群
        flume-consumer-log.sh start

        };;
"stop"){
        echo ================== 停止 集群 ==================

        # 停止 Flume消费集群
        flume-consumer-log.sh stop

        # 停止 Flume采集集群
        flume-collection-log.sh stop

        # 停止 Kafka采集集群
        kafka-cluster-together.sh stop

        # 停止 Hadoop集群
        hadoop-cluster-together.sh stop
		
        # 停止 Zookeeper集群
        zookeeper-cluster-together.sh stop

};;
esac

// 增加权限
[scorpion@warehouse102 bin]$ chmod 777 acquisition-channel-cluster-together.sh

在这里插入图片描述

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐