看下面的示例: 继续阅读
使用 hadoop file shell 可以方便地向 hdfs put 文件,但是,该 shell 不支持从管道读取数据并放到 hdfs 文件中。它仅支持这样的 put 命令: 继续阅读
有一类问题,代码模板相同,但有少部分地方不同,一般可以写一个复杂的程序,使用不同的选项,完成不同的任务。或者,把公共的部分抽象成一个代码库,然后在不同程序中引用。但是,如果公共的部分很少,并且比较“专用”,或者因为其它原因,比较难以部署。怎么办?
实际上,有另一种完全不同的编程模式来实现:代码生成器。unix世界中最知名的代码生成器莫过于lex和yacc了。但是,不比每个代码生成器都那么复杂,比如这个代码生成器就非常简单,它只是简单地转换行记录:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
#! /bin/sh field_seperator="||" output=b while getopts :F:vo: arg do case $arg in F ) field_seperator=$OPTARG;; v ) ;; o ) output=$OPTARG;; : ) echo "$0: missing arg for -$OPTARG " >&2 exit;; /?) echo "Invalid option -$OPTARG ignored." >&2 exit;; esac done if [ $OPTIND -gt $# ] then # echo OPTIND=$OPTIND argc=$# >&2 echo "no program" >&2 exit fi program=${!#} echo field_seperator=$field_seperator cat > a.cpp <<+TemplateCFile #include <vector> #include <string.h> #include <stdio.h> const char field_seperator[]="||"; void split_row(char* line, std::vector<char*>& F, const char* fs) { char* col = line; F.resize(0); size_t fslen = strlen(fs); if (fslen == 1) { for (;;) { F.push_back(col); col = strchr(col, fs[0]); if (col) { col[0] = '/0'; col += 1; } else break; } } else { for (;;) { F.push_back(col); col = strstr(col, fs); if (col) { col[0] = '/0'; col += fslen; } else break; } } } int main(int argc, char* argv[]) { size_t len1 = 0; ssize_t len2; char* line = NULL; std::vector<char*> F; while ((len2 = getline(&line, &len1, stdin)) != -1) { split_row(line, F, field_seperator); int NF = F.size(); //--- begin user program +TemplateCFile echo $program >> a.cpp cat >> a.cpp <<+TemplateCFile //--- end user program ; // avoid user program missing ; printf("/n"); } if (line) free(line); if (ferror(stdin)) { perror("ferror(stdin)"); return 1; } return 0; } +TemplateCFile sed -i 's//(field_seperator/[/]=/).*";//1"'$field_seperator'";/g' a.cpp gcc -O2 a.cpp -lstdc++ -o $output exit $? |
可以象awk一样写程序:
1 2 3 |
# 相当于 awk -F, '{printf("%s/t%s/n", $1, $5)}' # 使用 ',' 做列分隔符,输出第 1 和第 5 个字段,生成二进制可执行程序 myprog ./gencode.sh -F , -o myprog 'printf("%s/t%s/n", F[0], F[4])' |
1 2 3 |
# 相当于 awk -F, '{printf("%s/t%s/n", $1, $5)}' # 使用 ',' 做列分隔符,输出第 1 和第 5 个字段,生成二进制可执行程序 myprog ./gencode.sh -F , -o myprog 'printf("%s/t%s/n", F[0], F[4])' |
我当初写这个生成器的原因是发现非常简单的 awk 程序也比 C 慢 40 倍,以为这是本质上的性能差距,后来才发现不是。
对这个简单的程序,使用awk更方便更安全,也不比C慢,但是一旦碰到其它类似问题而 awk 解决不了,这种模式就可以派上用场了。
无意中发现,在一台服务器上,非常简单的awk程序,比C的等价物要慢40倍,感觉有点不太正常,还以为的确是awk太慢。不得其解,到另一台服务上试了一下,相同的 awk 程序,相同的测试数据,这台服务器的速度与C相当,也就是说,同样是awk,两台机器速度差了 40倍,而两台机器配置基本相当。非常困惑,找了两小时的原因,终于发现gawk手册里面有一段话:
For other single-character record separators, using ‘LC_ALL=C’will give you much better performance when reading records. Otherwise,gawk has to make
several function calls, per inputcharacter to find the record terminator.
再看两台机器的 locale,结果发现,慢的机器上:
[root@slow-server]# locale
LANG=en_US.UTF-8
LC_XXXX=en_US.UTF-8
…
LC_ALL=en_US.UTF-8
快的机器上:
[root@fast-server]# locale
LANG=en_US
LC_XXXX=en_US
…
LC_ALL= <空>
马上试验,将slow-server的locale改掉:
export LC_ALL=C
速度马上快了40倍,与fast-server相当。
这应该是awk实现上的一个缺陷,即便是对utf8,也不应该慢这么多,如果缓冲合适,最多慢2~3倍就可以了,为什么非要gawk has to make several function calls,
per inputcharacter ?
一直在想:如何在 Hadoop.MapReduce 中,插入一个 C 写的 HashFunction,既要高效,又要接口简洁。通过命令行实现调用显然是不行的。刚刚终于想出了:使用管道!
一个非常简单的程序,从stdin读入,写到stdout。多简单!至于效率,管道嘛,本质上就是异步的,自然是buffered&asynchronous 模式。
hash 程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#include <stdio.h> int hash(const char* key) { int h = 234234; for (; *key; ++key) h = h << 3 ^ *key; return h; } int main(int argc, char*[] argv) { char buf[256]; while (fgets(buf, sizeof(buf), stdin) != EOF) { printf("%d/n", hash(buf)); } return 0; } |
框架可以一边不断往管道写key,一边从中读取结果,这两个工作完全可以是异步的。对hash程序来说,如果stdin/stdout是全缓冲的,就几乎没有io的开销,因为几百几千次 fgets/printf 才会导致一次系统调用。
对frame程序也是一样的。
在 hadoop.streaming 中,hash 函数目前还必须由 java 类指定,如果使用这种方式,那就更 unix 了。