电商数仓3.0 数据采集模块之消费Kafka数据Flume安装
资料文档查看地址:http://flume.apache.org/FlumeUserGuide.htmlFlume官网地址:http://flume.apache.org/下载地址:http://archive.apache.org/dist/flume/参考博客:https://blog.csdn.net/qq_40180229/article/details/104523248链接:https
·
资料
文档查看地址:http://flume.apache.org/FlumeUserGuide.html
Flume官网地址:http://flume.apache.org/
下载地址:http://archive.apache.org/dist/flume/
参考博客:https://blog.csdn.net/qq_40180229/article/details/104523248
链接:https://pan.baidu.com/s/1EWOiiNWyIWTYjxYfBfZplw
提取码:3hti
集群规划
编写kafka-flume-hdfs.conf
[scorpion@warehouse102 flume-1.9.0]$ cd /opt/module/flume-1.9.0/conf/
[scorpion@warehouse102 conf]$ vim kafka-flume-hdfs.conf
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = warehouse102:9092,warehouse103:9092,warehouse104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.warehouse.flume.interceptor.TimeStampInterceptor$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
编写TimeStampInterceptor
仓库:https://github.com/SmallScorpion/flume-interceptor
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TimeStampInterceptor implements Interceptor {
private ArrayList<Event> events = new ArrayList<>();
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
String ts = jsonObject.getString("ts");
headers.put("timestamp", ts);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
events.clear();
for (Event event : list) {
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
上传jar包并分发
编写消费数据flume群起脚本
[scorpion@warehouse102 bin]$ vim flume-consumer-log.sh
#! /bin/bash
# 消费kafka数据flume群起脚本
case $1 in
"start"){
for i in warehouse104
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.9.0/log2.txt 2>&1 &"
done
};;
"stop"){
for i in warehouse104
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
done
};;
esac
// 增加权限
[scorpion@warehouse102 bin]$ chmod 777 ./flume-consumer-log.sh
数据通道测试
// 更改/opt/module/applog/ 下的application.properties
[scorpion@warehouse102 applog]$ vim application.properties
mock.date=2020-06-19
// 生成数据
[scorpion@warehouse102 applog]$ java -jar gmall2020-mock-log-2020-04-01.jar
采集通道启动/停止脚本
[scorpion@warehouse102 bin]$ vim acquisition-channel-cluster-together.sh
#!/bin/bash
# 采集通道启动/停止脚本
case $1 in
"start"){
echo ================== 启动 集群 ==================
# 启动 Zookeeper集群
zookeeper-cluster-together.sh start
# 启动 Hadoop集群
hadoop-cluster-together.sh start
# 启动 Kafka采集集群
kafka-cluster-together.sh start
# 启动 Flume采集集群
flume-collection-log.sh start
# 启动 Flume消费集群
flume-consumer-log.sh start
};;
"stop"){
echo ================== 停止 集群 ==================
# 停止 Flume消费集群
flume-consumer-log.sh stop
# 停止 Flume采集集群
flume-collection-log.sh stop
# 停止 Kafka采集集群
kafka-cluster-together.sh stop
# 停止 Hadoop集群
hadoop-cluster-together.sh stop
# 停止 Zookeeper集群
zookeeper-cluster-together.sh stop
};;
esac
// 增加权限
[scorpion@warehouse102 bin]$ chmod 777 acquisition-channel-cluster-together.sh

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)