MapReduce Key Revert ——特定数据模式的负载均衡
符号、记法
其中{k,v}指一个Key,Value对,{..} 中第一个分量是Key,第二个是Value。
[e]指一个集合,其中的元素为e。 [{k,v}]就指一个{k,v}的集合。
问题
给定巨大的集合S=[{k1,k2}],对S中每个k1,计算k1相同,而k2不同的元素个数。生成[{k1,distinct_count([k2])}]。
其中S数据的特点是:
1. 不同k1的数目很少(少于1K,很可能小于机器数目);
2. 不同k2的数目很大(大于20M);
3. 总的记录数目很大(大于1G);
4. k1的分布很不均匀,最频繁的k1,其记录数占到总数的1/3,对应的独立k2几乎包括整个k2集合;
5. k2的分布可以认为是均匀分布。
数据集的这些特点,也正是这种方法的适用条件。如果k1,k2的分布都很均匀,并且不同k1的数目远大于机器数目,还是用传统方案比较简单。
MapReduce解决方案
传统方案
Map.Input={k1,k2}
Map.Output={k1,k2}
Partition=Hash(k1)
Reduce.Input={k1,k2}
Reduce.Output={k1,distinct_count(k2)}
Reduce中,对每个k1,需要做set_union,计算
distinct_count(k2)=count(set_union([[k2]]))
这种方法的问题是因为k1的分布不均匀,导致负载不平衡,最频繁的k1,成为计算的瓶颈,无法提高并行度。
倒转Key
Map1.Input={k1,k2}
Map1.Output={k2,k1}
Partition1=Hash(k2)
Reduce1.Input=Map1.Output={k2,k1}
Reduce1.Output={k1,cnt=distinct_count([{k1,k2}])}
Map2.Input=Reduce1.Output
Map2=Identity
Partition2=Hash(k1)
Reduce2.Input=Map2.Output={k1,cnt}
Reduce2.Output={k1,sum([cnt])}
l 在Map1和Reduce1之间,按Hash(k2)进行partition。
l 每个Reduce1处理的数据就只包含k2的一个子集;因为k2均匀分布,每个子集尺寸相当;最关键之处就在这里,这样一个转换,达到了负载均衡!
l Reduce1维护一个全局的hash_map<k1,hash_set<k2> >,对每条记录,分别进行一次k1和k2的查找或插入【m[k1].insert(k2)】。
l Reduce2每读一条输入,要进行一次k1的查找,并累加该k1->cnt【m[k1]+=cnt】。