【map任务】split –&gt重点并无在此编程模板上。

引子

怎么要MapReduce?

因为MapReduce可以“分而治之”,将计好数量的纷繁任务分解成几何略多少任务。“简单”的意是:计算范围变多少、就近节点计算数据、并行任务。

脚摆放一布置《Hadoop权威指南》的流程图

【一句子话版】

输入文件 ->【map任务】split –> map –> partition –> sort
–> combine(写内存缓冲区) ~~ spill(独立线程写磁盘) –> merge
–> map输出结果  ~~~ 【reduce任务】copy –> merge –>reduce
–> 输出文件

mapreduce是什么?

凡是一个编程模型, 分为map和reduce. map接受一条record,
将立刻长达record进行各种纪念使博取的易输出为中结果,
而reduce把key相同的高中级结果在一块儿(key, iterable value list),
进行联谊输出0独或1个结果.

Map阶段

split:文件首先会让切除成split,split和block的关联是1:1要么N:1,如下图所展示。

map :

M个map任务开始并行处理分配至的基本上单split数据。输出数据格式如
<k,v>。

Partition:

用意:将map阶段的输出分配受相应的reducer,partition数 == reducer数

默认是HashPartitioner。之后将出口数据<k,v,partition>写副内存缓冲区memory
buff。

spill:

当memory
buff的多少达一定阈值时,默认80%,将出发溢写spill,先锁住这80%底内存,将即时部分数形容上地方磁盘,保存也一个临时文件。此等由独立线程控制,与写memory
buff线程同步进行。

sort & combine:

每当spill写文件前,要针对性80%之数额(格式<k,v,partition>)进行排序,先partition后key,保证每个分区内key有序,如果job设置了combine,则更拓展combine操作,将<aa1,2,p1>
<aa1,5,p1> 这样的数统一成<aa1,7,p1>。
最终输出一个spill文件。

merge:

基本上个spill文件通过多程归并排序,再统一成一个文本,这是map阶段的末段输出。同时还起一个索引文件(file.out.index),记录每个partition的发端位置、长度。

mapreduce(mr)不是什么

mr不是一个新定义, mr来自函数式编程中一度部分概念.
Google对mr做出的孝敬无在创造了这编程模板,
而是把mr整合到分布式的存储和天职管理面临错过, 实现分布式的计算.
所以就mr而言,重点并无以这编程模板上, 而是如何通过分布式去实现mr的.
这是自家接下要关注的重点.

reduce阶段

copy:多线程并发从各个mapper上拉属于本reducer的数据块(根据partition),获取后存入内存缓冲区,使用率高达阈值时写副磁盘。

merge:一直启动,由于不同map的出口文件是尚未sort的,因此在形容副磁盘前待merge,知道没有新的map端数据写入。最后启动merge对具备磁盘中之数据统一排序,形成一个尾声文件作为reducer输入文件。至此shuffle阶段结束。

reduce:和combine类似,都是以平之key合并计算,最终结果写到HDFS上。

一个mr过程的overview:

经过分割[1],
输入数据化一个来M个split的子集(每一个split从16M到64M差[2]).
map函数被分布及差不多尊服务器上执行map任务.
使得输入的split能够当不同之机械及叫并行处理.

map函数的输出通过用split函数来分中间key, 来形成R个partition(例如,
hash(key) mod R), 然后reduce调用让分布到多态机器上去.
partition的数与分割函数由用户来乘定.

一个mr的整体过程:

1> mr的库房首先划分输入文件成M个片, 然后再也聚众多中启大量的copy程序

2> 这些copy中生一个新鲜的: 是master. 其它的还是worker.
有M个map任务和R个reduce任务将让分配.
mater会把一个map任务或是一个reduce任务分配给idle worker(空闲机器).

3> 一个于分配了map任务的worker读取相关输入split的内容.
它由输入数据遭到分析出key/value pair,
然后把key/value对传递让用户从定义之map函数, 有map函数产生的中等key/value
pair吃缓存在内存中

4> 缓存到内存的遭kv paoir会被周期性的状副当地磁盘上. 怎么形容?
通过partitioning function把他们写副R个分叉区. 这些buffered
pair在该地磁盘的职务会叫传播给master.
master会在后将这职务转发给reduce的worker.

5> 当reduce的worker接收至master发来之职位信息后,
它通过远程访问来读map worker溢写到磁盘上之数据. 当reduce
worker把装有的中间结果都念了了今后, 它使基于中结果的key做一个sort
–> 这样的话, key相同的record会被group到同样起. 这个sort是必须的,
因为一般而言相同之reduce task会收到众多两样的key(如果非做sort,
就没法把key相同之record group在共了). 如果中结果最好老跨越了内存容量,
需要开一个表的sort.

6> reducer worker会对每一个unique key进行相同次于遍历, 把各国一个unique
key和它corresponding的value list传送给用户定义之reduce function中去.
reduce的输出为append到这reduce的partition的末尾的输出文件中失

7> 当所有的map任务和reduce任务都成功后, master结点会唤醒user program.
这个时刻, 在user program中之针对mapreduce的call会返回到用户之code中去.

终极, mr执行之输出会给细分及R个出口文件被失去(每个reduce输出一个partition,
共R个.) 通常来讲, 用户不欲把及时R个出口文件合并成为一个,
因为她们常会于当做下一个mapreduce程序的输入.
或者是经别的程序来调用他们,
这个顺序要得handle有多独partition作为输入的情况.

master的数据结构:
master维护的最主要是metadata.
它也各级一个map和reduce任务存储他们的状态(idle, in-progress,
or completed).
master就比如一个管道,通过其,中间文件区域之职于map任务传递到reduce任务.因此,对于每个完成的map任务,master存储由map任务产生的R个中间文件区域的高低和位置.当map任务成功的当儿,位置及分寸的换代信息被接受.这些信让逐渐充实的传递给那些正在工作的reduce任务.

Fault Tolerance

不当分呢2遭受 worker的故障与master的故障.

worker故障:

master会周期性的ping每个worker.
如果在一个瑕疵的年华段内没有接到worker返回的消息,
master会把这个worker标记成失效. 失败的天职是哪些重新做的呢?
每一个worker完成的map任务会叫reset为idle的状态,
所以它可以于部署让其他的worker.
对于一个failed掉的worker上的map任务及reduce任务,
也属同样可通过这种方法来处理.

master失败:

master就出一个, 它的败会造成single point failure. 就是说,
如果master失败, 就会停mr计算. 让用户来检查是状态,
根据需要再次履行mr操作.

每当错误面前的拍卖机制(类似于exactly once?)

当map当user提供的map和reduce operator是有关输入的引人注目的操作,
我们提供的分布式implementation能够提供相同之输出. 什么一样之出口为?
和一个非容错的一一执行之顺序一样的输出. 是什么成功这或多或少之?

举凡依赖让map和reduce任务输出的原子性提交来兑现这特性的.
对有的task而言, task会将出口写到private temporary files中去.
一个map任务会发生R单如此的临时文件,
一个reduce任务会发出1独这么的临时文件. 当map任务到位的时候,
worker会给master发一个消息, 这个信息包含了R个临时文件的name.
如果master收到了同一长达已经好的map任务之初的就信息,
master会忽略这个信息.否则的话, master会纪录这R个公文的讳到自己之data
structure中去.

当reduce任务完成了, reduce worker会自动将团结输出的临时文件重命名为final
output file. 如果一致之以多态机器上实行, 那么以同之final output
file上且见面实行重命名. 通过这种方式来管最终的输出文件才包含被一个reduce
task执行了的数据.

囤位置

mr是只要利用网络带宽的?
论文被说, 利用把输入数据(HDFS中)存储在机器的地方磁盘来save网络带宽.
HDFS把每个文件分为64MB的block.
然后每个block在别的机器及召开replica(一般是3卖). 做mr时,
master会考虑输入文件之职位信息,
并努力当某机器及布置一个map任务.什么样的机?
包含了是map任务的数额的replica的机器上. 如果失败以来,
则尝试就近安排(比如安排及一个worker machine上, 这个machine和含input
data的machine在与一个network switch上), 这样的话,
想使得大部分输入数据在地方读取, 不吃网络带来宽.

职责粒度

将map的输入拆成了M个partition, 把reduce的输入拆分成R个partition.
因为R通常是用户指定的,所以我们设定M的值.
让每一个partition都以16-64MB(对应为HDFS的储存策略, 每一个block是64MB)
另外, 经常把R的价设置成worker数量的小的倍增数.

备用任务

straggler(落伍者): 一个mr的到底的执行时间连由落伍者决定的.
导致同玉machine 慢的原因产生多:可能硬盘出了问题,
可能是key的分配起了问题等等. 这里透过一个通用的故的建制来拍卖此情:
当一个MapReduce操作看似完成的时段,master调度备用(backup)任务过程来实行剩下的、处于处理面临状态(in-progress)的职责。无论是最初的施行过程、还是备用(backup)任务过程就了职责,我们还把这任务标记成为已经成功。我们调优了之机制,通常只是见面占有比较正规操作多几只百分点的精打细算资源。我们发现以这样的体制于减少超大MapReduce操作的总处理时间效果显著。

技巧

  1. partition 函数
    map的输出会划分到R个partition中去.
    默认的partition的章程是应用hash进行分区. 然而有时候,
    hash不可知满足我们的需求. 比如: 输出的key的价是URLs,
    我们愿意每个主机的兼具条条框框保持在与一个partition中,
    那么我们虽假设和谐写一个分区函数, 如hash(Hostname(urlkey) mod R)

  2. 逐保证
    咱俩保证于加的partition中, 中间的kv pair的价值增量逐处理的.
    这样的依次保证对每个partition生成一个一如既往的输出文件.

  3. Combiner函数
    以一些情况下,Map函数产生的中级key值的重新数据会占大怪的比重.
    如果将这些还的keybu’zu我们允许用户指定一个可选的combiner函数,combiner函数首先以本土将这些记录进行同样不良联合,然后拿联合的结果再行经网络发送出。
    Combiner函数在各国台执行Map任务之机械及还见面给执行同样差。因此combiner是map侧的一个reduce.
    一般景象下,Combiner和Reduce函数是同等的。Combiner函数和Reduce函数之间唯一的界别是MapReduce库怎样控制函数的出口。Reduce函数的出口为保留在最终的出口文件里,而Combiner函数的输出为描写到中游文件里,然后被发送给Reduce任务。

  4. 输入输出类型
    支撑多种. 比如文本的话, key是offset, value是立同样行之内容.
    每种输入型的竖线都得能把输入数据分割成split.
    这个split能够由独立的map任务来拓展继续处理.
    使用者可以经过提供一个reader接口的兑现来支撑新的输入类型.
    而且reader不一定需要由文本中读取数据.

  5. 越了吃的记录
    偶,
    用户程序中之bug导致map或者reduce在拍卖某些record的时光crash掉.
    我们提供相同种植忽略这些record的模式,
    mr会检测检测哪些记录导致确定性的crash,并且越了这些记录不处理。
    具体做法是: 在实施MR操作前, MR库会通过全局变量保存record的sequence
    number, 如果用户程序出发了一个体系信号, 消息处理函数将故”最后一口气”
    通过UDP包向master发送处理的末梢一长达纪录的序号.
    当master看到于处理某条特定的record不止失败一不成时,
    就对准其进行标记需要为跳过,
    在下次再次履行相关的mr任务之时段过了及时漫长纪录.

于Google给的例证中, 有好几值得注意.
通过benchmark的测试, 能了解key的分区情况. 而一般对用排序的程序来说,
会增加一个先期处理的mapreduce操作用于采样key值的遍布情况.
通过采样的多寡来算对最终排序处理的分区点.

旋即极其成功之使: 重写了Google网络搜索服务所使用及之index系统

总结:公海赌船网址 mr的牛逼之处在于:
1>
MapReduce封装了并行处理、容错处理、数据本地化优化、负载均衡等等技术难关的细节,这使得MapReduce库易于使。
2> 编程模板好. 大量见仁见智类别的问题且可由此MapReduce简单的解决。

3> 部署方便.

总的阅历:

1>
约束编程模式使相互和分布式计算非常容易,也容易构造容错的精打细算环境(暂时无明了)
2> 网络带来富是百年不遇资源, 大量的系统优化是对准减少网络传输量为目的的:
本地优化策略让大量之数量从当地磁盘读取, 中间文件写副当地磁盘,
并且只写一卖中文件.
3>
多次履行同样之任务可减掉性能缓慢的机器带来的负面影响,同时缓解了是因为机械失效导致的数码丢失问题。

关于shuffle, combiner 和partition

shuffle: 从map写起开始到reduce执行前的进程可统一叫shuffle.
具体可分为map端的shuffle和reduce端的shuffle.
combiner和partition: 都是在map端的.

切切实实经过:

  1. Collect阶段
    1> 在map()端,
    最后一步通过context.write(key,value)输出map处理的中级结果.
    然后调用partitioner.getPartiton(key, value,
    numPartitions)来赢得这长达record的分区号. record 从kv pair(k,v)
    –>变为 (k,v,partition).

2>
将变后的record暂时保存在内存中之MapOutputBuffer内部的环形数据缓冲区(默认大小是100MB,
可以经过参数io.sort.mb调整, 设置是缓存是为着排序速度提高, 减少IO开销).
当缓冲区的数额使用率达一定阈值后, 触发一样坏spill操作.
将环形缓冲区的部分数据勾勒到磁盘上,
生成一个即之linux本地数据的spill文件, 在缓冲区之使用率还上阈值后,
再次挺成一个spill文件. 直到数据处理完毕, 在磁盘上会见生成很多临时文件.
关于缓冲区的构造先不讨论

2.spill阶段
当缓冲区的使用率达一定阈值后(默认是80%, 为什么要设置比例,
因为要是被写及朗诵而开展), 出发一差”spill”,
将一些缓冲区的数码勾勒到地头磁盘(而不是HDFS).
特别注意: 在以数据勾勒副磁盘前, 会对立即同有的数据开展sort.
默认是应用QuickSort.先按(key,value,partition)中之partition分区号排序,然后再次按key排序.
如果安了针对性中等数据做缩减的布置还会见召开缩减操作.

流动:当及溢起极后,比如默认的是0.8,则会宣读来80M底数量,根据前的分区元数据,按照分区号进行排序,这样就是只是实现同分区的多少都于一齐,然后还因map输出的key进行排序。末尾实现溢起底文书内是分区的,且分区内是板上钉钉的

3.Merge阶段
map输出数据较多之时候,会变多只溢起文件,任务就的结尾一宗工作就是是拿这些文件合并为一个怪文件。合并的进程被一定会做merge操作,可能会见举行combine操作。
merge与combine的对比:
于map侧可能发2次combine. 当spill出去之前,
会combine一蹩脚(在user设置的前提下).
如果map的泛滥写文件个数大于3时(可配置:min.num.spills.for.combine)在merge的历程被(多独spill文件合并为一个那个文件)中还见面实施combine操作.

Combine: a:1,a:2 —> a:3
Merge: a:1,a:2 —> a,[1,2]

Reducer端: copy, sort, reduce
4.copy
copy的经过是凭借reduce尝试从不辱使命的map中copy该reduce对应之partition的片数据.
哟时候开始开copy呢?
等job的第一单map结束晚即便开copy的长河了.因为对各个一个map,都冲你reduce的一再以map的出口结果分成R个partition.
所以map的中等结果负是来或含有各级一个reduce需要处理的组成部分数据的.
由于每一个map产生的中间结果还产生或包含有reduce所在的partition的多少,
所以这个copy是自多个map并行copy的(默认是5单).

注: 这里因网络问题down失败了怎么处置? 重试, 在肯定时间后如依然失败,
那么下载现成就会放弃这次下载, 随后尝试从别的地方下载.

5.merge
Reduce将map结果下充斥至当地时,同样为是用展开merge的之所以io.sort.factor的配备选同样会影响reduce进行merge时的行为.
当发现reduce在shuffle阶段iowait非常的强之时段,就出或通过调大这个参数来加大一涂鸦merge时的出现吞吐,优化reduce效率。

(copy到哪里, 先是内存的buffer, 然后是disk)
reduce在shuffle阶段对下载下来的map数据为未是当下写副磁盘,
而是先用一个buffer存在内存中.
然后当使用内存达到少的早晚才spill到磁盘.
这个比重是由此其他一个参数来控制.

reduce端的merge不是等具备溢写好后更merge的.
而是一边copy一边sort一边merge. 在执行了merge sort后, reduce
task会将数据交由reduce()方法开展处理

参考:

  1. http://blog.51cto.com/xigan/1163820
  2. http://flyingdutchman.iteye.com/blog/1879642
  3. http://www.cnblogs.com/edisonchou/p/4298423.html

相关文章