一、使用场景

        对于小文件来说,MR默认会为其分配一个maptask,由于数据量过小可以不会造成内存溢出,所以可以考虑一次将整个文件数据加载,而不是一行行数据加载。

二、自定义InputFormat实现

0、需求?
加载指定目录下的所有文件,统计每个文件的单词个数?
1、实现步骤
自定义类继承FileInputFormat
自定义类继承RecordReader,制定数据加载规则
修改createRecordReader方法返回对象为自定义RecordReader
2、编写自定义InputFormat
package com.cjy.mr.inputformat1;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**定义类继承FileInputFormat
 * 重写片数据读取规则-- 一次加载整个文件数据
 */
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        //一次读取一个文件,一个文件就是一片,不进行文件的切片
        return false;
    }

    /**
     * 定义文件读取规则
     * @param split
     * @param context
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)	throws IOException, InterruptedException {
        //使用自定义数据加载方式
        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split, context);

        return recordReader;
    }
}
3、编写自定义RecordReader
package com.cjy.mr.inputformat1;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * 数据加载方式
 */
public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

    private Configuration configuration;
    private FileSplit split;

    private boolean isProgress= true;
    private BytesWritable value = new BytesWritable();
    private Text k = new Text();

    //初始化环境变量
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.split = (FileSplit)split;
        configuration = context.getConfiguration();
    }

    /**
     * 读取下条数据
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (isProgress) {
            // 1 定义缓存区,文件大小的长度
            byte[] contents = new byte[(int)split.getLength()];

            FileSystem fs = null;
            FSDataInputStream fis = null;

            try {
                // 2 获取文件系统
                Path path = split.getPath();
                fs = path.getFileSystem(configuration);

                // 3 读取数据
                fis = fs.open(path);

                // 4 读取文件内容
                IOUtils.readFully(fis, contents, 0, contents.length);

                // 5 输出文件内容,设置一整个文件的内容为value
                value.set(contents, 0, contents.length);

                // 6 获取文件路径及名称
                String name = split.getPath().toString();

                // 7 设置输出的key值为文件全类名
                k.set(name);

            } catch (Exception e) {

            }finally {
                IOUtils.closeStream(fis);
            }
            //数据加载完成
            isProgress = false;
            return true;
        }
        //因为一次读取了一个文件数据,所以
        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return k;
    }
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }
    @Override
    public void close() throws IOException {
    }
}
4、编写map
package com.cjy.mr.inputformat1;

import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

    @Override
    protected void map(Text key, BytesWritable value,			Context context)		throws IOException, InterruptedException {
        //key=文件全路径,value=文件所有内容
        context.write(key, value);
    }
}
5、编写reducer
package com.cjy.mr.inputformat1;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 计算每个文件多少单词
 */
public class WholeFileReducer extends Reducer<Text, BytesWritable, Text, LongWritable> {
    LongWritable v = new LongWritable();
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context)		throws IOException, InterruptedException {

        Iterator<BytesWritable> iterator = values.iterator();

        while(iterator.hasNext()){

            BytesWritable next = iterator.next();
            //字节数据转为字符
            String content = new String(next.getBytes(), 0, next.getLength());
            //字符切割
            String[] split = content.split("\t");
            v.set(split.length);
        }
        //数据写出
        context.write(key, v);
    }
}
6、编写driver
package com.cjy.mr.inputformat1;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class WholeFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "e:/input/inputinputformat", "e:/output1" };

        // 1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 设置jar包存储位置、关联自定义的mapper和reducer
        job.setJarByClass(WholeFileDriver.class);
        job.setMapperClass(WholeFileMapper.class);
        job.setReducerClass(WholeFileReducer.class);

        // 7设置输入的inputFormat
        job.setInputFormatClass(WholeFileInputformat.class);

// 3 设置map输出端的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        // 4 设置最终输出端的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 5 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:/input/"));
        FileOutputFormat.setOutputPath(job, new Path("d:/wcoutput/"));
        // 6 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

7、测试结果
在这里插入图片描述

计算结果:

file:/D:/input/wd11.txt	37
file:/D:/input/wd22.txt	13
file:/D:/input/wd33.txt	205
file:/D:/input/wd44.txt	97

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐