MapReduce做了多余的事情
本文假定读者已了解MapReduce。
先说 Map
Map阶段一般做三件事情:
1. 切分输入
2. 变换输入为输出
3. 执行可选的Combine
如果要说哪项是多于的,大概就是Combine了。Combine在很多时候可以减少传递给Reduce的数据量;但是,也有一些时候,Combine只是空耗时间:
1. Map输入中重复Key很多时,Combine会提高性能
2. Map输入中重复Key很少时,Combine会降低性能
3. 网络速度很快时,Combine提高的性能有限,甚至不会提高性能
作为总结:用不用Combine,一方面取决于数据的特征(重复Key的多寡);另一方面就是网络带宽。
Reduce
以下ReduceCallback指应用程序定义的Reduce回调函数。
在Reduce阶段,传统上,Reduce阶段做三件事情:
1. shuffle
把各个Map进程产生的结果,按 i = Hash(Key, ReduceCount) 将记录分配给第 i 个Reduce进程。
2. sort
将每个Reduce进程原始输入(shuffle产生的)进行排序,将key相同的记录对应的value集中到一起,产生{key,[value]}。
3. reduce
读取{key,{value}},为每个(key,{value}),调用应用程序自定义的reduce函数。
通过这种方式,应用程序只需要实现map和reduce方法,其它一切都交给框架来完成了,应用程序会非常简单。也正因此,性能会受到一定影响:
从理论上讲,Reduce进程收到第一条记录时,就可以开始ReduceCallback了。但是,因为需要sort,就必须等到所有的记录全部接收,才能开始排序,排序完了才能开始ReduceCallback。
实际上,使用一些优化手段,也可以在一边接收数据,一边做部分排序。比如在接收输入时使用置换选择排序生成初始段,等输入全部读完(所有Map进程都结束),再做一次多路归并,归并的同时,就可以调用ReduceCallback。
如果我们只是做简单的计数工作(如WordCount),并非必须等到排序完了才开始回调应用程序的Reduce。如果接收输入时就开始ReduceCallback(同时写入backup文件),这样Map和Reduce是完全并行的,Map进程结束时,Reduce差不多就可以开始输出结果了。
如果考虑到容错,假定Map是幂等的,那么,相同的输入将产生完全相同的输出。
框架总需要维护MR[m][r]矩阵,设每个矩阵元素为E。E中至少包含一个全局文件,一个完成状态:
struct E {
string filebackup; // 一旦某个reduce失败,从这里恢复
OtherState state;
int recordno; // 新增
};
如果再每个矩阵元素中增加一个当前记录号recordno.当一个map进程失败重启时,直接忽略recordno之前的元素,只处理recordno之后的记录。Map失败的恢复策略和当前的实现也就这么些不同。
Map的幂等性很重要,如果Map不是幂等的,两次运行的结果可能会有不同;因此不能用recordno。在现实中,实际上很难遇到不满足Map幂等性的情况,如果真出现了这种情况,绝大多数也是程序错误!所以,这个重要性,更多是理论上的意义。
Reduce[j]失败时,恢复策略也还跟以前一样。读入所有的MR[*][j].filebackup,并调用ReduceCallback。
美好的…
我预计,如果使用了这样的实现,MapReduce的性能会有大幅提升。