Mapreduce 编程模型

2020/09/16

MapReduce 即是编程模型,又是计算框架。开发人员需要基于 MapReduce 编程模型去开发分布式程序,然后将程序通过 MapReduce 计算框架分发到 Hadoop 集群中来运行。MapReduce计算模型主要由三个阶段构成:Map、shuffle、Reduce。

MapReduce 定义

  • MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
  • MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

Mapreduce 优缺点

  • 优点
    • 易于编程
    • 良好的扩展性
    • 高容错性
    • 适合 PB 级以上海量数据的离线处理
  • 缺点
    • 不擅长实时计算
    • 不擅长流式计算
    • 不擅长 DAG 计算

Mapreduce 核心思想

  • MapReduce 运算程序一般需要分成两个阶段:Map阶段和Reduce阶段。

w6xmUH.md.png

Mapreduce 进程

  • 一个完整的Mapreduce程序在分布式运行时,有三类实例进程:
    • MrAppMaster:负责整个程序的过程调度和状态协调。
    • MapTask:负责 Map 阶段的整个数据处理流程。
    • ReduceTask:负责 Reduce 阶段的整个数据处理流程。

Mapreduce 编程规范

wOmpPs.png

  • 用户编写的程序分为三个部分:Mapper,Reducer,Driver。
  • Mapper 阶段(切分成一个个小的任务)
    • 用户自定义的 Mapper 要继承自己的父类。
    • Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)。
    • Mapper 中的业务逻辑写在 map()方法中。
    • map() 方法(mapTask进程)对每一个<K,V>调用一次。
    • 输出是一系列的键值对,输出写到本地磁盘
  • Reducer 阶段(汇总小任务的结果)
    • 用户自定的 Reducer 要继承自己的父类。
    • Reducer 的输入类型对应 Mapper 的输出类型,也是 K,V。
    • ReducerTask 进程对每一组相同<K,V>组调用一次 reduce() 方法。
    • 输出也是一系列的键值对,结果最终写入HDFS。
  • Driver 阶段
    • 相当于 Yarn 集群的客户端,用于提交我们程序 YARN 集群,提交的是封装 MapReduce 程序相关运行参数的 Job 对象。

wOmPx0.png

举个例子

需求:统计一批英文文章中,每个单词出现的总次数。 假设:现在有一个输入文件”Gone With The Wind”,这个文件有三个block:block1, block2, block3。三个block的内容依次如下图。

wOmAqU.png

Map阶段

  • 每一个block对应一个分片split[3] (默认split与block一一对应)。
  • 每一个split对应一个map任务(map task)。所以这里三个block将会对应三个map task(map1, map2, map3),这3个任务的逻辑完全一样。
  • 以map1为例。map1会读取block1的数据,一次读取block1的一行数据,然后会产生一个kv对(其中,key是当前所读行的行首相对于当前block开始处的字节偏移量;value是当前行的内容;如假设当前所读行是第一行,那么当前行的内容是”Dear Bear River”,则kv对是(0, “Dear Bear River”)),作为map()的参数传入,调用map()。
  • map()方法。将value当前行内容按空格切分,得到三个单词Dear Bear River,然后将每个单词变成键值对(Dear, 1) (Bear, 1) (River, 1),最终结果输出为文件,写入map任务所在节点的本地磁盘中(其中还有一个Shuffle的过程,下文会详细讲解)。
  • block的第一行数据被处理完后,接着处理第二行,当map任务将当前block所有的数据全部处理完后,此map任务即运行结束。

Reduce阶段

  • reduce任务(reduce task)的个数由用户程序指定,main()内的job.setNumReduceTask(4)指定reduce任务是4个(reduce1, reduce2, reduce3, reduce4)。
  • 以reduce1为例。reduce1通过网络,连接到map1,将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端(拷贝阶段)。同样地,也会连接到map2,map3获取数据。最终reduce1端获得4个(Dear, 1)键值对;由于key键相同,它们分到同一个组[4]。4个(Dear, 1)键值对,转换成[Dear, Iterable(1, 1, 1, )],作为两个参数传入(其中还有一个Shuffle的过程,下文会详细讲解),调用reduce()。
  • reduce()方法。计算Dear的总数为4,并将(Dear, 4)作为键值对输出,最终结果输出成文件,写入HDFS。

Mapreduce 计算流程

w6TkT0.png

整个过程就是先读取文件,接着进行split切割,变成一个一个的词,然后进行 map task 任务,排列出所有词的统计量,接着 sorting 排序,按照字典序来排,接着就是进行 reduce task,进行了词频的汇总,最后一步就是输出为文件。例如图中的 spacedong 就出现了两次。

其中 Hadoop Mapreduce 对外提供的五个可编是分别是InputFormatMapperParititionerreducerouputStream

用一句话简单地总结就是,Mapreduce的运算过程就是进行拆解-排序-汇总,解决的就是统计的问题,使用的思想就是分治的思想。

Mapper和Reducer

Mapper

wOmKR1.png

InputFormat 为每一个 InputSplit 生成一个 map 任务,mapper的实现是通过job中的setMapperClass(Class)方法来配置写好的map类,如这样

//设置要执行的mapper类
job.setMapperClass(WordMapper.class);

其内部是调用了map(WritableComparable, Writable, Context)这个方法来为每一个键值对写入到InputSplit,程序会调用cleanup(Context)方法来执行清理任务,清理掉不需要使用到的中间值。

关于输入的键值对类型不需要和输出的键值对类型一样,而且输入的键值对可以映射到0个或者多个键值对。通过调用context.write(WritableComparable, Writable)来收集输出的键值对。程序使用Counter来统计键值对的数量,在Mapper中的输出被排序后,就会被划分到每个Reducer中,分块的总数目和一个作业的reduce任务的数目是一样的。

Reducer

wOm1sK.png

Reducer任务的话就是将Mapper中输出的结果进行统计合并后,输出到文件系统中。 用户可以自定义Reducer的数量,使用Job.setNumReduceTasks(int)这个方法。 在调用Reducer的话,使用的是Job.setReducerClass(Class)方法,内部调用的是reduce(WritableComparable, Iterable, Context)这个方法,最后,程序会调用cleanup(Context)来进行清理工作。如这样:

//设置要执行的reduce类
job.setReducerClass(WordReduce.class);

Reducer实际上是分三个阶段,分别是ShuffleSortSecondary Sort

shuffle

这个阶段是指Reducer的输入阶段,系统会为每一个Reduce任务去获取所有的分块,通过的是HTTP的方式

sort

这个阶段是指在输入Reducer阶段的值进行分组,sortshuffle是同时进行的,可以这么理解,一边在输入的时候,同时在一边排序。

Secondary Sort

这个阶段不是必需的,只有在中间过程中对key的排序和在reduce的输入之前对key的排序规则不同的时候,才会启动这个过程,可以通过的是Job.setSortComparatorClass(Class)来指定一个Comparator进行排序,然后再结合Job.setGroupingComparatorClass(Class)来进行分组,最后可以实现二次排序。

在整个reduce中的输出是没有排序

需要多少个 Reducer 任务

建议是0.95或者是1.75*mapred.tasktracker.reduce.tasks.maximum。如果是0.95的话,那么就可以在mapper任务结束时,立马就可以启动Reducer任务。如果是1.75的话,那么运行的快的节点就可以在map任务完成的时候先计算一轮,然后等到其他的节点完成的时候就可以计算第二轮了。当然,Reduce任务的个数不是越多就越好的,个数多会增加系统的开销,但是可以在提升负载均衡,从而降低由于失败而带来的负面影响。

Partitioner

这个模块用来划分键值空间,控制的是map任务中的key值分割的分区,默认使用的算法是哈希函数,HashPartitioner是默认的Partitioner

这篇文章主要就是讲了MapReduce的框架模型,分别是分为用户程序层、工具层、编程接口层这三层,在编程接口层主要有五种编程模型,分别是InputFomatMapperReducePartitionerOnputFomatReducer。主要是偏理论,代码的参考例子可以参考官方的例子:WordCount_v2.0

Mapreduce 框架原理

Hadoop 序列化

自定义 Bean 对象实现序列化接口

  • 必须实现 Writeable 接口。
  • 反序列化时,需要反射调用空参构造函数,所以必须要有空参构造。
  • 重写序列化方法。
  • 注意反序列化的顺序和序列化的顺序完全一致。
  • 想要把结果显示在文件中,就要重写 toString(),可以用 “\t” 分开,方便后续使用。
  • 如果需要将自定义的 Bean 放在 key 中传输,则还需要实现 Compareable 接口,因为 Mapreduce 框架中的 shuffle 过程要求 key 必须能排序。

切片与 MapTask 并行度决定机制

  • MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
  • MapTask 并行处理决定机制
    • 数据块:Block 是 HDFS 物理上把数据分成一块一块。
    • 数据切片:数据切片只是逻辑上对输入进行切片,并不会在磁盘上将其切分成片进行存储。
  • 一个 Job 在 Map 阶段并行度由客户端在提交 Job 时的切片数决定。
  • 每一个 Split 切片分配一个 MapTask 并行实例处理。
  • 默认情况,切片大小 = BlockSize。

Mapreduce 架构

Client

Client的含义是指用户使用MapReduce程序通过Client来提交任务到Job Tracker上,同时用户也可以使用Client来查看一些作业的运行状态。

Job Tracker

这个负责的是资源监控和作业调度。JobTracker会监控着TaskTracker和作业的健康状况,会把失败的任务转移到其他节点上,同时也监控着任务的执行进度、资源使用量等情况,会把这些消息通知任务调度器,而调度器会在资源空闲的时候选择合适的任务来使用这些资源。

任务调度器是一个可插拔的模块,用户可以根据自己的需要来设计相对应的调度器

TaskTracker

TaskTracker会周期性地通过Hearbeat来向Job Tracker汇报自己的资源使用情况和任务的运行进度。会接受来自于JobTaskcker的指令来执行操作(例如启动新任务、杀死任务之类的)。

TaskTracker中通过的是slot来进行等量划分一个节点上资源量,只用Task获得slot的时候才有机会去运行。调度器的作用就是进行将空闲的slot分配给Task使用,可以配置slot的数量来进行限定Task上的并发度。

Task

Task分为Map TaskReduce Task,在MapReduce中的 split 就是一个 Map Task,split 的大小可以设置的,由 mapred.max.spilt.size 参数来设置,默认是 Hadoop中的block的大小,在Hadoop 2.x中默认是128M,在Hadoop 1.x中默认是64M

Task中的设置可以这么设置,一般来讲,会把一个文件设置为一个split,如果是小文件,那么就会存在很多的Map Task,这是特别浪费资源的,如果split切割的数据块的量大,那么会导致跨节点去获取数据,这样也是消耗很多的系统资源的。

Mapreudce 生命周期

  1. 作业的提交和初始化

    由用户提交作业之前,需要先把文件上传到HDFS上,JobClient使用upload来加载关于打包好的jar包,JobClientRPC创建一个JobInProcess来进行管理任务,并且创建一个TaskProcess来管理控制关于每一个Task

  2. JobTracker

    JobTracker会调度和管理任务,一发现有空闲资源,会按照一个策略选择一个合适的任务来使用该资源。

    任务调度器有两个点:一个是保证作业的顺利运行,如果有失败的任务时,会转移计算任务,另一个是如果某一个Task的计算结果落后于同一个Task的计算结果时,会启动另一个Task来做计算,最后去计算结果最块的那个。

  3. 任务运行环境

    TaskTracker会为每一个Task来准备一个独立的JVM从而避免不同的Task在运行过程中的一些影响,同时也使用了操作系统来实现资源隔离防止Task滥用资源。

  4. 执行任务

    每个Task的任务进度通过RPC来汇报给TaskTracker,再由TaskTracker汇报给JobTracker。

  5. 任务结束

    写入输出的文件到HDFS中。

Post Directory