二十、自定义InputFormat——加载整个文件数据
一、使用场景 对于小文件来说,MR默认会为其分配一个maptask,由于数据量过小可以不会造成内存溢出,所以可以考虑一次将整个文件数据加载,而不是一行行数据加载。二、自定义InputFormat实现0、需求?加载指定目录下的所有文件,统计每个文件的单词个数?1、实现步骤自定义类继承FileInputFor
·
一、使用场景
对于小文件来说,MR默认会为其分配一个maptask,由于数据量过小可以不会造成内存溢出,所以可以考虑一次将整个文件数据加载,而不是一行行数据加载。
二、自定义InputFormat实现
0、需求?
加载指定目录下的所有文件,统计每个文件的单词个数?
1、实现步骤
自定义类继承FileInputFormat
自定义类继承RecordReader,制定数据加载规则
修改createRecordReader方法返回对象为自定义RecordReader
2、编写自定义InputFormat
package com.cjy.mr.inputformat1;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**定义类继承FileInputFormat
* 重写片数据读取规则-- 一次加载整个文件数据
*/
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
//一次读取一个文件,一个文件就是一片,不进行文件的切片
return false;
}
/**
* 定义文件读取规则
* @param split
* @param context
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//使用自定义数据加载方式
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
}
3、编写自定义RecordReader
package com.cjy.mr.inputformat1;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* 数据加载方式
*/
public class WholeRecordReader extends RecordReader<Text, BytesWritable>{
private Configuration configuration;
private FileSplit split;
private boolean isProgress= true;
private BytesWritable value = new BytesWritable();
private Text k = new Text();
//初始化环境变量
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit)split;
configuration = context.getConfiguration();
}
/**
* 读取下条数据
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
// 1 定义缓存区,文件大小的长度
byte[] contents = new byte[(int)split.getLength()];
FileSystem fs = null;
FSDataInputStream fis = null;
try {
// 2 获取文件系统
Path path = split.getPath();
fs = path.getFileSystem(configuration);
// 3 读取数据
fis = fs.open(path);
// 4 读取文件内容
IOUtils.readFully(fis, contents, 0, contents.length);
// 5 输出文件内容,设置一整个文件的内容为value
value.set(contents, 0, contents.length);
// 6 获取文件路径及名称
String name = split.getPath().toString();
// 7 设置输出的key值为文件全类名
k.set(name);
} catch (Exception e) {
}finally {
IOUtils.closeStream(fis);
}
//数据加载完成
isProgress = false;
return true;
}
//因为一次读取了一个文件数据,所以
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
}
}
4、编写map
package com.cjy.mr.inputformat1;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
//key=文件全路径,value=文件所有内容
context.write(key, value);
}
}
5、编写reducer
package com.cjy.mr.inputformat1;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 计算每个文件多少单词
*/
public class WholeFileReducer extends Reducer<Text, BytesWritable, Text, LongWritable> {
LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
Iterator<BytesWritable> iterator = values.iterator();
while(iterator.hasNext()){
BytesWritable next = iterator.next();
//字节数据转为字符
String content = new String(next.getBytes(), 0, next.getLength());
//字符切割
String[] split = content.split("\t");
v.set(split.length);
}
//数据写出
context.write(key, v);
}
}
6、编写driver
package com.cjy.mr.inputformat1;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class WholeFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "e:/input/inputinputformat", "e:/output1" };
// 1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar包存储位置、关联自定义的mapper和reducer
job.setJarByClass(WholeFileDriver.class);
job.setMapperClass(WholeFileMapper.class);
job.setReducerClass(WholeFileReducer.class);
// 7设置输入的inputFormat
job.setInputFormatClass(WholeFileInputformat.class);
// 3 设置map输出端的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 4 设置最终输出端的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:/input/"));
FileOutputFormat.setOutputPath(job, new Path("d:/wcoutput/"));
// 6 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
7、测试结果
计算结果:
file:/D:/input/wd11.txt 37
file:/D:/input/wd22.txt 13
file:/D:/input/wd33.txt 205
file:/D:/input/wd44.txt 97
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)