4. MapReduce-1

MR 相关概念

  1. Job(作业) : 一个MR程序称为一个Job
  2. MRAppMaster(MR任务的主节点): 一个Job在运行时,会先启动一个进程,这个进程为 MRAppMaster。负责 Job 中执行状态的监控,容错,和 RM 申请资源,提交 Task 等。
  3. Task(任务): Task是一个进程,负责某项计算。
  4. Map(Map阶段): Map 是 MapReduce 程序运行的第一个阶段。Map阶段的目的是将输入的数据,进行切分。将一个大数据,切分为若干小部分。切分后,每个部分称为1片(split),每片数据会交给一个Task(进程)进行计算,负责 Map 阶段的 Task 称为 MapTask。在一个 MR 程序的 Map 阶段,会启动N(取决于切片数,多少个切片就会启动多少个 MapTask)个 MapTask。每个 MapTask 是并行运行。
  5. Reduce(Reduce阶段): Reduce 是MapReduce 程序运行的第二个阶段(最后一个阶段),Reduce 阶段的目的是将 Map 阶段,每个 MapTask 计算后的结果进行合并汇总,得到最终结果。Reduce阶段是可选的,不一定有。负责 Reduce 阶段的 Task 称为ReduceTask。一个Job可以通过设置,启动N个ReduceTask,这些ReduceTask也是并行运行,每个ReduceTask最终都会产生一个结果。

MR 相关组件

  1. Mapper: map 阶段核心的处理逻辑
  2. Reducer: reduce 阶段核心的处理逻辑
  3. InputFormat: 输入格式。MR 程序必须指定一个输入目录,一个输出目录,InputFormat 代表输入目录中文件的格式。如果是普通文件,可以使用FileInputFormat。如果是SequeceFile(hadoop提供的一种文件格式),可以使用 SequnceFileInputFormat,如果处理的数据在数据库中,需要使用 DBInputFormat。
  4. RecordReader: 记录读取器。RecordReader 负责从输入格式中,读取数据,读取后封装为一组记录(k-v)。
  5. OutPutFormat: 输出格式。OutPutFormat 代表 MR 处理后的结果,要以什么样的文件格式写出。将结果写出到一个普通文件中,可以使用 FileOutputFormat,将结果写出到数据库中,可以使用 DBOutPutFormat,将结果写出到 SequeceFil e中,可以使用 SequnceFileOutputFormat。
  6. RecordWriter: 记录写出器。将处理的结果以什么样的格式写出到输出文件中。
  7. Partitioner: 分区器。负责在 Mapper 将数据写出时,为每组 keyout-valueout 打上标记,进行分区。一个ReduceTask只会处理一个分区的数据。

MR 流程

  1. InputFormat 调用 RecordReader,从输入目录的文件中,读取一组数据,封装为 keyin-valuein 对象
  2. 将封装好的 key-value,交给 Mapper.map() ——>将处理的结果写出 keyout-valueout
  3. ReduceTask 启动 Reducer,使用 Reducer.reduce() 处理 Mapper写出的 keyout-valueout,
  4. OutPutFormat 调用 RecordWriter,将 Reducer 处理后的 keyout-valueout 写出到文件

Map阶段(MapTask): 切片(Split) —– 读取数据(Read) —– 交给Mapper处理(Map) —– 分区和排序(sort)
Reduce阶段(ReduceTask): 拷贝数据(copy) —– 排序(sort) —– 合并(reduce) —– 写出(write)

MR 编程

MR的编程只需要将自定义的组件和系统默认组件进行组合,组合之后运行即可,步骤:

  1. Map 阶段的核心处理逻辑需要编写在 Mapper 中
  2. Reduce 阶段的核心处理逻辑需要编写在 Reducer 中
  3. 将编写的 Mapper 和 Reducer 进行组合,组合成一个 Job
  4. 对 Job 进行设置,设置后运行

wordcount

InputFormat 的实现类很多

InputFormat 的作用:

  1. 验证输入目录中的文件格式,是否符合当前 Job 的要求
  2. 生成切片,每个切片都会交给一个 MapTask 处理
  3. 提供 RecordReader,由 RecordReader 从切片中读取记录,交给 Mapper 处理

InputFormat 中的 List<InputSplit> getSplits 方法的功能就是切片。ecordReader<K,V> createRecordReader 的功能是创建 RecordReader。默认 Hadoop 使用的是 TextInputFormat,而 TextInputFormat 创建的 RecordReader 是 LineRecordReader。所以 Hadoop 默认的 InputFormat 使用 TextInputFormat,默认是 Reader 使用 LineRecordReader。

本地模式

WCMapper 完整代码

package com.yanrs.mr.wordcount;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
 * KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
 */
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 是行号,value 是一行的文本内容
        System.out.println("keyin: " + key + " valuein: " + value);
        // 将文本内容进行拆分,得到一个个单词组成的数组
        String[] words = value.toString().split("\t");
        // 遍历数组,并输出,输出格式为(单词,1)
        for (String word:words) {
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

WCReducer 完整代码

package com.yanrs.mr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN,VALUEIN: Mapper 的输出做为这里的输入
 * KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
 */
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outValue = new IntWritable();

    //reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
        for (IntWritable value:values) {
            sum+=value.get();
        }

        outValue.set(sum);
        // 将结果写出,key 是单词,outValue 是累加的次数
        context.write(key, outValue);
    }
}

WCDriver 完整代码

package com.yanrs.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class WCDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/wcinput");
        Path outPath = new Path("/mroutput");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置 job 名称
        job.setJobName("wordcount");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

直接在 idea 中运行 WCDriver 的 main 方法即可。上面设置连接的是 Hadoop10 的文件系统,但是是在本地运行的。

代码地址

yarn 上运行

WCMapper 完整代码,同上

WCReducer 完整代码,同上

在 yarn 上运行,需要指定运行方式为 yarn,且指定 resourcemanager 的地址

// 设置在 yarn 上运行
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop11");

还需要设置 job 所在的 jar 包

// yarn 运行时候还需要设置 job 所在的 jar 包
job.setJarByClass(WCDriver.class);
// 或者使用

将代码打包,上传到 hadoop 上,使用 hadoop jar 命令运行

hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.yanrs.mr.wordcount.WCDriver

WCDriver 完整代码

package com.yanrs.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class WCDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        // 设置在 yarn 上运行
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "hadoop11");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/wcinput");
        Path outPath = new Path("/mroutput");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // yarn 运行时候还需要设置 job 所在的 jar 包
        job.setJarByClass(WCDriver.class);
        // 或者使用
        // job.setJar("mapreduce-test-1.0-SNAPSHOT.jar");

        // 设置 job 名称
        job.setJobName("wordcount");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

代码地址

自定义 Bean

数据格式如上所示,需要统计每个手机消耗的上行,下行,总流量信息

FlowBeanMapper 代码如下,mapper 输入参数 key 为行号,value 为一行的文本。mapper 输出参数 key 手机号,value 为 bean 对象(对象中分别有上行,下行,总流量三个属性)

package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * mapper 输入参数 key 为行号,value 为一行的文本
 * mapper 输出参数 key 手机号,value bean 对象(对象中分别有上行,下行,总流量三个属性)
 */
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    private Text outKey = new Text();
    private FlowBean flowBean = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 为序列号,value 为每行的内容
        String[] words = value.toString().split("\t");

        // 封装手机号
        outKey.set(words[1]);
        // 上行流量
        flowBean.setUpFlow(Long.parseLong(words[words.length - 3]));
        // 下行流量
        flowBean.setDownFlow(Long.parseLong(words[words.length - 2]));
        context.write(outKey, flowBean);
    }
}

FlowBean 为实体类,有三个属性,需要实现 hadoop 的序列化方法。需要重写 write(称为序列化) 和 readFields(称为反序列化) 方法。并且反序列化和序列化的顺序要一致,并且提供属性的 get,set 方法,空参构造,toString 方法。

package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class FlowBean implements Writable {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    /**
     * 序列化, 在写出属性时,如果属性为引用数据类型,那么属性不能为 null
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    /**
     * 反序列化,反序列化和序列化的顺序要一致
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public FlowBean() {
    }

    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
        }
}

FlowBeanReducer 处理 FlowBeanMapper 输出的数据,所以输入 key 和 value 的类型分别为 Text 和 FlowBean。输出也为 Text, FlowBean

package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 输入 key 和 value 的类型分别为 Text 和 FlowBean
 *
 */
public class FlowBeanReducer extends Reducer <Text, FlowBean, Text, FlowBean>{

    private FlowBean outValue = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        // 累加每个手机号的上行流量和下行流量,并计算总流量
        long sumUpFlow = 0;
        long sumDownFlow = 0;

        for (FlowBean flowBean: values) {
            sumUpFlow += flowBean.getUpFlow();
            sumDownFlow += flowBean.getDownFlow();
        }

        // 将值封装进入 FlowBean 中
        outValue.setDownFlow(sumDownFlow);
        outValue.setUpFlow(sumUpFlow);
        outValue.setSumFlow(sumDownFlow + sumUpFlow);

        context.write(key, outValue);
    }
}

FlowBeanDriver 中设置输入和输出目录,设置 MapperClass 和 ReducerClass。设置 Mapper,Reducer 的输出 key 和 value 类型。

package com.yanrs.mr.flowbean;

import com.yanrs.mr.wordcount.WCMapper;
import com.yanrs.mr.wordcount.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class FlowBeanDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/flowbean");
        Path outPath = new Path("/mroutput/flowbean");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置 job 名称
        job.setJobName("FlowBean");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(FlowBeanMapper.class);
        job.setReducerClass(FlowBeanReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

因为没有配置在 yarn 上运行,所以直接 idea 运行即可。结果如下

13470253144    FlowBean{upFlow=180, downFlow=180, sumFlow=360}
13509468723    FlowBean{upFlow=7335, downFlow=110349, sumFlow=117684}
13560439638    FlowBean{upFlow=918, downFlow=4938, sumFlow=5856}
13568436656    FlowBean{upFlow=3597, downFlow=25635, sumFlow=29232}
13590439668    FlowBean{upFlow=1116, downFlow=954, sumFlow=2070}
13630577991    FlowBean{upFlow=6960, downFlow=690, sumFlow=7650}
13682846555    FlowBean{upFlow=1938, downFlow=2910, sumFlow=4848}
......

代码地址

默认的切片流程

片和块的关系

:在计算MR程序时,才会切片。在运行程序时,临时将文件从逻辑上划分为若干部分(所以只是逻辑上的切片,并不是真正的切分),使用的输入格式不同(不同的 InputFormat),切片的方式不同,切片的数量也不同。每片的数据最终也是以块的形式存储在 HDFS。

: 在向HDFS写文件时,文件中的内容以块为单位存储,块是实际的物理存在。

建议: 片大小最好等于块大小,将片大小设置和块大小一致,可以最大限度减少因为切片带来的磁盘IO和网络IO,MR计算框架速度慢的原因在于在执行MR时,会发生频繁的磁盘IO和网络IO。理论上来说:如果文件的数据量是一定的话,片越大,切片数量少,启动的 MapTask 少,Map 阶段运算慢,片越小,切片数量多,启动的MapTask多,Map阶段运算快。默认情况下片大小就是块大小,即文件的块大小默认为 128M,默认每片就是128M。MapTask的数量只取决于切片数,有多少切片就有多少个 MapTask

如果需要调节片大小 > 块大小:那么需要配置 mapreduce.input.fileinputformat.split.minsize > 128M

如果需要调节片大小 < 块大小:那么需要配置 mapreduce.input.fileinputformat.split.maxsize < 128M

FileInputFormat的切片策略(默认)
  1. 获取当前输入目录中所有的文件
  2. 以文件为单位切片,如果文件为空文件,默认创建一个空的切片
  3. 如果文件不为空,尝试判断文件是否可切(不是压缩文件,都可切)
  4. 如果文件不可切,整个文件作为1片
  5. 如果文件可切,先获取片大小(默认等于块大小),循环判断 待切部分/ 片大小 > 1.1倍,如果大于先切去一片,再判断…
  6. 剩余部分整个作为1片

常见的输入格式

FileInputFormat 中有六个子类,下面总结一下常见的四个子类的切片策略和 RecordReader

TextInputFormat

TextInputFormat 常用于输入目录中全部是文本文件

切片策略: 默认的切片策略

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value,即 key 的类型为 LongWritable,value 的类型为 Text

上面的 wordcount 例子就是使用的默认的 TextInputFormat

NlineInputFormat

切片策略: 以文件为单位,读取配置中 mapreduce.input.lineinputformat.linespermap 参数(默认为1),每次这么多行切为一片。

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value,即 key 的类型为 LongWritable,value 的类型为 Text

NLMapper 完整代码

package com.yanrs.mr.nline;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
 * KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
 */
public class NLMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 是行号,value 是一行的文本内容
        System.out.println("keyin: " + key + " valuein: " + value);
        // 将文本内容进行拆分,得到一个个单词组成的数组
        String[] words = value.toString().split("\t");
        // 遍历数组,并输出,输出格式为(单词,1)
        for (String word:words) {
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

NLReducer 完整代码

package com.yanrs.mr.nline;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN,VALUEIN: Mapper 的输出做为这里的输入
 * KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
 */
public class NLReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outValue = new IntWritable();

    //reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
        for (IntWritable value:values) {
            sum+=value.get();
        }

        outValue.set(sum);
        // 将结果写出,key 是单词,outValue 是累加的次数
        context.write(key, outValue);
    }
}

NLDriver 完整代码。在 Driver 中新增设置使用 NLineInputFormat。默认是一行切分为一片,如果需要设置可以在 conf 中设置 mapreduce.input.lineinputformat.linespermap 值即可。

package com.yanrs.mr.nline;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class NLDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        // 设置几行为一片,默认一行一片
        // conf.set("mapreduce.input.lineinputformat.linespermap", "2");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/nline");
        Path outPath = new Path("/mroutput/nline");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置使用 NLineInputFormat
        job.setInputFormatClass(NLineInputFormat.class);

        // 设置 job 名称
        job.setJobName("nline");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(NLMapper.class);
        job.setReducerClass(NLReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

代码地址

KeyValueTextInputFormat

针对文本文件,使用分割字符,将每一行分割为 key 和 value,如果没有找到分隔符,当前行的内容作为 key,value 为空串。默认分隔符为 \t,可以通过参数 mapreduce.input.keyvaluelinerecordreader.key.value.separator 指定。

切片策略:默认的切片策略

RecordReader : key 和 value 的类型都是 Text

KVMapper 完整代码

package com.yanrs.mr.keyvalue;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
 * KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
 */
public class KVMapper extends Mapper<Text, Text, Text, IntWritable> {

    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        // key 是 * 之前的姓名,value 是计数1
        context.write(key, outValue);
    }
}

KVReducer 完整代码

package com.yanrs.mr.keyvalue;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN,VALUEIN: Mapper 的输出做为这里的输入
 * KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
 */
public class KVReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outValue = new IntWritable();

    //reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
        for (IntWritable value:values) {
            sum+=value.get();
        }

        outValue.set(sum);
        // 将结果写出,key 是单词,outValue 是累加的次数
        context.write(key, outValue);
    }
}

KVDriver 完整代码如下,需要设置使用 KeyValueTextInputFormat,并且需要设置分隔符,需要注意的是分隔符只是一个 byte 类型的数据,即便传入的是一个字符串,也只会读取第一个字符。

package com.yanrs.mr.keyvalue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class KVDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        // 设置分隔符(需要注意的是分隔符只是一个 byte 类型的数据,即便传入的是一个字符串,也只会读取第一个字符)
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "*");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/keyvalue");
        Path outPath = new Path("/mroutput/keyvalue");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置使用 KeyValueTextInputFormat
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        // 设置 job 名称
        job.setJobName("keyvalue");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(KVMapper.class);
        job.setReducerClass(KVReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

代码地址

CombineTextInputFormat

改变了传统的切片方式。将多个小文件,划分到一个切片中,适合小文件过多的场景。

切片策略: 先确定片的最大值 maxSize,maxSize 通过参数 mapreduce.input.fileinputformat.split.maxsize 设置。流程是以文件为单位,将每个文件划分为若干 part,如果文件的待切部分的大小小于等于 maxSize, 则整个待切部分作为1个 part,如果文件的待切部分的大小大于 maxsize 但是小于等于 2 maxSize, 那么将整个待切部分均匀的切分为2个 part。如果文件的待切部分的大小大于 2 maxSize, 那么先切去 maxSize 大小,得到 1个 part,剩余待切部分继续判断

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为 key,一行内容作为 value,即 key 的类型为 LongWritable,value 的类型为 Text

CMMapper 完整代码

package com.yanrs.mr.combine;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
 * KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
 */
public class CMMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // key 是行号,value 是一行的文本内容
        System.out.println("keyin: " + key + " valuein: " + value);
        // 将文本内容进行拆分,得到一个个单词组成的数组
        String[] words = value.toString().split("\t");
        // 遍历数组,并输出,输出格式为(单词,1)
        for (String word:words) {
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

CMReducer 完整代码

package com.yanrs.mr.combine;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN,VALUEIN: Mapper 的输出做为这里的输入
 * KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
 */
public class CMReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outValue = new IntWritable();

    //reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
        for (IntWritable value:values) {
            sum+=value.get();
        }

        outValue.set(sum);
        // 将结果写出,key 是单词,outValue 是累加的次数
        context.write(key, outValue);
    }
}

CMDriver 完整代码。需要设置多大文件切为一片,设置使用 CombineTextInputFormat

package com.yanrs.mr.combine;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
 * 启动这个进程,那么就会运行该 job
 */
public class CMDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取文件系统
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
        // 设置多大文件切为一片
        conf.set("mapreduce.input.fileinputformat.split.maxsize", "2048");

        FileSystem fileSystem = FileSystem.get(conf);

        // 设置输入目录和输出目录
        Path inputPath = new Path("/mrinput/combine");
        Path outPath = new Path("/mroutput/combine");
        // 输出目录存在就删除
        if(fileSystem.exists(outPath)){
            fileSystem.delete(outPath, true);
        }

        // 创建 Job
        Job job = Job.getInstance(conf);

        // 设置使用 CombineTextInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        // 设置 job 名称
        job.setJobName("combine");

        // 设置job运行的 Mapper,Reducer
        job.setMapperClass(CMMapper.class);
        job.setReducerClass(CMReducer.class);

        // 设置 Mapper,Reducer 的输出 key 和 value 类型。
        // job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
        // 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出目录
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // 运行 Job 并打印日志信息
        job.waitForCompletion(true);
    }
}

代码地址

MR 核心阶段划分

MapTask 阶段

  1. map
  2. sort

RedcueTask 阶段

  1. copy
  2. sort
  3. reduce

shuffle 阶段

上面的 2-4 又称为 shuffle 阶段。Shuffle 阶段横跨 MapTask 和 RedcueTask,在MapTask端也有 Shuffle,在RedcueTask 也有 Shuffle。具体 Shuffle 阶段指 MapTask 的 map 方法运行之后到 RedcuceTask 的 reduce 方法运行之前。

总结

mapper 的输出,为 reducer 的输入,mapper 的输出由不同的 InputFormat 的 RecordReader 决定。

不同的 InputFormat 有着不同的切片策略,默认如果不设置,那么使用的是 TextInputFormat。

reduce 方法一次处理一组数据,key 相同的数据为一组。

mapper 和 reducer 的输出数据格式由自己根据需求来设置,可以是 hadoop 内置的类型,也可以自定义 bean。

如果要将编写好的程序在 yarn 上运行,那么需要配置 yarn 的地址,设置 job 所在的 jar 包,将程序打包为 jar 之后运行。