Hadoop.MapReduce.简介
本文是2009年9月为公司内部培训写得的一篇简介。
MapReduce概述
- 提供计算任务的自动并行化机制,使用分发-收集的并行策略,Map阶段处理(无依赖的)原始输入,Reduce阶段处理依赖关系(按Key依赖)。
- 架构在hadoop之上,原则上可以使用hadoop代理的所有分布式文件系统(hdfs,kfs,s3),但我们目前仅使用hdfs。
MapReduce流程
- 客户端提交MapReduce任务
- JobTracker分发任务,依据输入文件的chunk位置信息,将相应的split分发到TaskTracker
- Map.TaskTracker 执行Map任务
- 读取split
- 产生输出,输出按i= Hash(K) % R分发至Reduce[i]
- Reduce.TaskTracker执行Reduce任务
- Reduce.shuffle可与Map任务并行执行
- 甚至sort也可以和Map并行执行
- 但用户的reduce过程不能用Map并行执行
- 产生R个最终输出文件,整个MapReduce过程结束
MR矩阵与容错
NameNode处理任务调度与错误恢复,因此,在NameNode上,最基本的一个数据结构就是MR[M,R]矩阵。每个Map进程一行,每个Reduce进程一列。
R1 | R2 | R3 | Rj | Rr | |||||
M1 | |||||||||
M2 | |||||||||
M.. | |||||||||
Mi | |||||||||
Mm |
每个Map的输出被分成R份,按hash规则,每个Reduce一份,这样当任何一个/多个Reduce任务失败时,重启的Reduce只需要从每个Map的输出读取自己的这一份(绿色列)。
当任何一个/多个Map任务失败——这个很简单,随后重启成功,每个Reduce进程只需要读取自己相应于该Map任务的那些数据(粉色行)。
Map
切分输入
如何切分 | 大文件(大于1 chunk) | 小文件(小于1 chunk) |
压缩 | 不切分 | 不切分 |
未压缩 | 按chunk切分 |
对于大文件,按chunk进行切分,切分程序需要处理chunk边界情况,例如,对于普通文本文件,每行一个记录,chunk边界通常在一行中间,切分程序必须处理这种情况,把跨chunk的记录归入前一个split,因此:
- 需要在网络上传输半条记录
- 并不是每个split的尺寸都精确地等于1chunk
- 对于定长记录文件,要简单一些,也可以按照这种方式来处理。
- 一般情况下,压缩文件不能进行切分,因为切分后找不到同步点(压缩头)。所以,把压缩文件的尺寸控制在1 chunk 内,一方面可以提高Map的并行度,另一方面也可以减少网络传输,因为超出1 chunk的就不在第一个chunk所在的data server了。
解析(parse)记录
将输入的字节流parse成一条条记录
调用Map.map
调用用户定义的map方法
在Map.map中,除了处理、变换数据,一般还需要调用report,向框架报告进度(执行情况)。
Reduce
Reduce.shuffle
可与Map任务并行执行
Reduce.sort
也可以和Map并行执行,然而,一旦有Map任务失败重启,排序结果就作废了!
Reduce.reduce
调用用户定义的reduce函数。这个过程不能用Map并行执行,因为reduce函数需要接受每个Key对应的整个value集合(这个集合可能也是有序的——SecondarySort)。作为一个极端情况,最后完成的那个Map可能包含所有Key,并且value也是最小的!
这个过程,也可能需要向框架报告进度。
这一步将产生最终输出文件,每个Reduce进程一个,整个MapReduce过程结束。因此,MapReduce的输出总是/path/to/reuce/output/part-####,而不是一个单一的文件。这些输出有可能作为后续其它MapReduce过程的输入。
Hadoop.MapReduce接口
原生接口
旧接口(0.20以前)
1 2 3 4 5 6 7 8 9 10 11 |
@Deprecated public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable{ void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporterreporter) throws IOException; } @Deprecated public interface Reducer<K2, V2, K3,V3> extends JobConfigurable, Closeable { void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3>output, Reporter reporter) throws IOException; } |
新接口(0.20以后)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public classMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public abstract class Context implementsMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {} protected void setup(Context context) throws IOException,InterruptedException; @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException; protected void cleanup(Context context) throws IOException, InterruptedException; public void run(Context context) throws IOException, InterruptedException; } public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{ public abstract class Context implements ReuceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {} protected void setup(Context context) throws IOException, InterruptedException; @SuppressWarnings("unchecked") protected void reduce(KEYINkey, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException; protected void cleanup(Context context) throws IOException, InterruptedException; @SuppressWarnings("unchecked") public void run(Context context) throws IOException, InterruptedException; } |
新版更灵活,另一方面,默认的map和reduce都是identity,而以前版本的identity是用专门的类来表达的。
灵活性表现在context参数上,在重构中,这个叫参数提取(Introduce Parameter Object)。如果以后参数需要改变,或者需要插入新的方法,就只需要修改Parameter Object,而不需要修改接口本身。
同时,又为旧的接口提供Adapter/Bridge,以便兼容(二进制兼容+源码兼容)旧程序。
示例(WordCount)
Map
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporterreporter) throws IOException { String line =value.toString(); StringTokenizer itr = newStringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } |
Reduce
1 2 3 4 5 6 7 8 9 10 |
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable>values, OutputCollector<Text, IntWritable> output, Reporterreporter) throws IOException { long sum = 0; while (values.hasNext()) sum += values.next().get(); output.collect(key, new IntWritable(sum)); } } |
配置任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(),WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); // the keys are words (strings) conf.setOutputValueClass(IntWritable.class); // the values are counts (ints) conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); List<String> other_args = newArrayList<String>(); for(int i=0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])){ conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); } } // Make sure there are exactly 2 parameters left. if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf,other_args.get(0)); FileOutputFormat.setOutputPath(conf,new Path(other_args.get(1))); JobClient.runJob(conf); return 0; } |
Streaming
文本协议的接口,记录以换行符分隔,Key-Value以t分隔。比原生接口更简单,同时,还可以进行更方便的本地测试——通过管道进行。
原则上,Streaming接口只需要遵循一条:Map.OutputKey==Reduce.InputKey
最简单的一个程序:大客户每天独立用户数计算。MapReduce的输出结果是每个用户和他对应的浏览量,再使用wc -l就可以得出总量,并且,可以从原始输出取得每个用户的浏览量。
Map(为简单起见,省略了错误处理)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
#include <stdio.h> #include <febird/trb_cxx.h> int main(int argc, char* argv[]) { size_t len = 0; char* line = NULL; using febird::trbstrmap; trbstrmap<int> smap; for (;;) { ssize_t len2 = getline(&line,&len, stdin); if (-1 == len2) break; char* ck = strstr(line,"&xnbc="); char* end = strstr(ck+6,"HTTP"); end[-4] = 0; smap[ck+6]++; } for(trbstrmap<int>::iterator iter = smap.begin();iter; ++iter) printf("%st%dn",smap.key(iter), *iter); if (line) free(line); return 0; } |
这个Map的逻辑比WordCount.Map要稍微复杂一点,因为在程序中,相当于已经做了Combine,这个Combine比MapReduce本身的Combine要高效得多。
优化无止境,如果需要进一步优化,该程序可以在smap的尺寸达到一定大小时就进行输出,而不必等到处理完全部输入后再输出,这样一方面减小了内存占用,另一方面还提高了并发度——每输出一点数据框架就可以传输一点,进而shuffle、sort。
Reduce
框架传递给Reduce程序的[{key, value}]中,相等的Key总是相邻的,充分利用这一点,可以有效化简程序,并提高性能。然而要利用这一点,在编程上付出的努力就比原生接口要复杂一些,原生接口只需遍历Value集合,而使用Streaming需要自己判断相同Key的集合边界,还要处理一些其他边界问题(代码中黄色行)。
这个Reduce实际上可以作为一个通用的reduce,叫它sumlong,可以计算每个Key发生的次数,只要输入匹配keytnum,用正则表达式就是:.*?t[0-9]-?{1,20}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#include <stdio.h> #include <string.h> #include <stdlib.h> int main(int argc, char* argv[]) { size_t llen = 0, klen = 0; char *line = NULL, *key =(char*)malloc(1024); long cnt = 0; key[0] = 0; for (;;) { ssize_t len2 = getline(&line,&llen, stdin); if (-1 == len2) { if (klen) printf("%st%ldn",key, cnt); break; } char* tab = (char*)memchr(line, 't',len2); if (tab) { long cnt2 = atol(tab+1); size_t klen2 = tab - line; if (klen2 == klen &&memcmp(line, key, klen) == 0) cnt += cnt2; else { if (klen)printf("%st%ldn", key, cnt); memcpy(key, line, klen2); key[klen2] = 0; klen = klen2; cnt = cnt2; } } // if (tab) } if (line) free(line); free(key); return 0; } |
调用脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#Distinct User Sum if (($# < 1)); then echo usage: $0 yyyy_mm_dd[_hh] exit elif (($# < 2)); then outFile=/data/leipeng/big/dus/$1 else outFile=$2 fi hdir=/opt/hadoop year=`echo $1 | awk-F"_" '{print $1;}'` month=`echo $1 | awk-F"_" '{print $2;}'` day=`echo $1 | awk -F"_"'{print $3;}'` hour=`echo $1 | awk-F"_" '{print $4;}'` if [[ -z $hour ]] ; then pvd=`/opt/hadoop/bin/hadoop fs –ls /user/root/log_data/log_in/pv/$year/$month/$day/*/ | awk 'NF>=8{printf("-input %s", $8)}'` else pvd="-input /user/root/log_data/log_in/pv/$year/$month/$day/$hour/pvval" fi echo $pvd cd $hdir bin/hadoop fs -rmr/test/dus/output/$1 bin/hadoop jar contrib/streaming/hadoop-*-streaming.jar -conf conf/dus.xml $pvd -output /test/dus/output/$1 -mapper $hdir/aa_dumap -reducer $hdir/aa_duadd bin/hadoop fs -cat "/test/dus/output/$1/part*"| wc -l > $outFile |
如果再加上配置文件conf/dus.xml,整个调用脚本比程序本身还要长。
本地测试命令,相当于仅一个Map和一个Reduce进程: cat pvfiles | aa_dumap | aa_duadd [|wc –l]
如果要看所有用户的结果,而非最终统计,就不需要 |wc–l。这个比原生接口的测试要简单得多!
使用其它语言
Streaming不光可以使用C/C++,任何语言都可以,比如awk,上面的程序用awk可以更简单——效率也许会低一点,但的确更简单得多得多!
awk.Map
cookie之前有&xnbc=标识,之后有|| HTTP,因此,用这两个串做字段分隔符,cookie内容正好是第二个字段$2。
awk '-F&xnbc=|\|\| HTTP' '{printf("%st1n", $2)}'
程序本身不做Combine,让MapReduce框架去做,或者干脆不做。
awk.Reduce
Reduce不需要自定义字段分隔符,默认的正好
简单的,用内存多点
awk '{km[$1]+=$2}END{for (k inkm){printf("%st%dn", k, km[k]);}}'
复杂点,用内存小点
awk 'p==$1{c+=$2}p!=$1{if(p!="") printf("%st%dn", p, c); p=$1; c=$2;}END{if(NR>0) printf("%st%dn", p, c)}'
可读形式
1 2 3 4 5 6 7 8 9 10 11 |
p==$1{c+=$2} p!=$1{ if(p!="") printf("%st%dn",p, c); p=$1;c=$2; } END{ # 不能漏掉最后一条,并且,空的输入必须是空的输出 if(NR>0) printf("%st%dn",p, c) } |
调用脚本的复杂性还是一样的,不过,使用awk,可以直接把awk程序写在调用脚本中,就不需要任何其它程序了。
更多控制选项
每个任务可调的参数
Name | Type | Description |
mapred.job.id | String | The job id |
mapred.jar | String | job.jar location in job directory |
job.local.dir | String | The job specific shared scratch space |
mapred.tip.id | String | The task id |
mapred.task.id | String | The task attempt id |
mapred.task.is.map | boolean | Is this a map task |
mapred.task.partition | int | The id of the task within the job |
map.input.file | String | The filename that the map is reading from |
map.input.start | long | The offset of the start of the map input split |
map.input.length | long | The number of bytes in the map input split |
mapred.work.output.dir | String | The task’s temporary output directory |
其它参数是全局选项,可参考hadoop官方文档(Hadoop安装目录内)。