//所有数据的 key,省略一些值

public static final String colomns = "touch_type,touch_time,event_type,click_time,customer_user_id...";

/**

* 正则表达式 截取用户行为埋点值

*/

public static final String REGUBC = "(.*?)=(.*?)&";

/**

* 定义hive分隔符

*/

public static final String HIVE_SEPERATE = "\001";

下面是 Main 方法中要执行的:

//从 HDFS 中加载文件,转成RDD

JavaRDD logRdd = javaSparkContext.textFile(scanFiles);

final Broadcast broadcast = javaSparkContext.broadcast(colomns.split(","));

//把原始数据 通过正则表达式过滤,最终转换为 JSONObject 类型的 HashSet

JavaRDD jsonRdd = logRdd.mapPartitions(new FlatMapFunction, JSONObject>() {

final String[] broadcastValue = broadcast.value();

@Override

public Iterable call(Iterator stringIterator) throws Exception {

Set jsonObjs = new HashSet();

while(stringIterator.hasNext()){

//把 log 数据取出来

String inputValue = stringIterator.next();

int startIndex = inputValue.indexOf("_app.gif?");

int endIndex = inputValue.lastIndexOf("HTTP/");

String eventLog = inputValue.substring(startIndex + 9 , endIndex-1) + "&";

//通过正则表达式匹配,存到 Map

Matcher m = CommonUtil.getMatcher(eventLog, REGUBC);

JSONObject jsonObj = new JSONObject();

Map map = new HashMap<>();

while (m.find()) {

map.put(m.group(1), m.group(2));

}

//Map colomns,保存到 JSONObject

for (int i = 0; i

String columnName = broadcastValue[i];

if (map.containsKey(columnName)){

//String value = URLDecoder.decode(map.get(columnName) , "UTF-8");

jsonObj.put(columnName, map.get(columnName));

} else {

jsonObj.put(columnName,"");

}

}

//保存到 JSONObject 类型的 HashSet 中

jsonObjs.add(jsonObj);

}

return jsonObjs;

}

});

//从 jsonObject 中 取出 value,并连接起来

JavaRDD javaJsonRdd = jsonRdd.map(new Function() {

@Override

public String call(JSONObject jsonObject) throws Exception {

List valuesList = new ArrayList(jsonObject.values());

//把 list 中的数据 连接起来,比如:Joiner.on("; ").join("tom", "jerry", "jack") => "tom; jerry; jack"

String hiveEvent = Joiner.on(HIVE_SEPERATE).join(valuesList.iterator());

return hiveEvent;

}

});

//保存到 HDFS

javaJsonRdd.saveAsTextFile(hdfsOutputFilePath);

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐