多线程的 pipeline 设计模式
一个简单例子:有很多个html网页,网页的id、title、url、path等信息存在一个数据库表中,网页内容存储在一个磁盘阵列上。现在要把所有网页都读出来,统计其中的html标签、正文等信息,并写入另一个数据库表,怎样的设计最好呢? 继续阅读
一个简单例子:有很多个html网页,网页的id、title、url、path等信息存在一个数据库表中,网页内容存储在一个磁盘阵列上。现在要把所有网页都读出来,统计其中的html标签、正文等信息,并写入另一个数据库表,怎样的设计最好呢? 继续阅读
C++ 现在最时髦的用法是 template meta programming。booster 们对此非常津津乐道,我本人也是个狂热的booster。到了什么程度?不使用template 就浑身不舒服,不boost一下就感觉对不起C++。但是这种狂热带来的严重后果就是程序编译速度极慢无比,生成的执行程序尺寸超常。
曾经一个 C++ 服务器程序,代码也就10000行左右,编译出来的执行程序竟然20M!编译时间半小时!写的时候感觉不到用了多少template,但是写出来竟然得到这样的结果,不得不让人吃惊!
记得在大学的时候(2000年前后),初学习 template 时,感觉template之间的耦合有点“过于松散”,就像只要有一个螺母,再一个螺栓,就可以往一起套,而不管他们是否一个是塑料,一个是金属,大小粗细是否匹配,螺距是否匹配,所有的这一切,如果只有哪怕一点点不匹配,都会在编译错误中造成一个非常令人费解的超长消息。当初也不知道这一点早已被无数人诟病了,就写了一片文章,自作聪明地提出应该对模板参数有个类似接口声明一样的规格定义(现在C++0x 中的对应物是concept)。当初立即招致骂声一片。现在这一点已经毋庸置疑了,C++0x出世后我们就可以结束这些痛苦了。
但是,前面说的C++的那两个致命缺点,看来短期内很难克服。
编译时间,受制于文件包含这种古老的内存不够用的时代的无奈选择,就像人的阑尾,喉头,视网膜。
程序尺寸,这个缺陷在某些时候也非常致命,举个简单例子,std::sort 使用了多种排序策略,每个sort 的机器码都很大。同时,对每一种数据类型,每一种randiter每一种comparator,都会生成一个sort 的版本,这会造成非常大的代码膨胀。相比之下,C 的 qsort 就没有这种缺陷。如果我们对几十种数据,使用几十个comparator排序,std::sort 的代码尺寸比 qsort 要大几十倍。虽然它在inline方面获得了优势,但是cpucache的失效,甚至是memcache的失效,造成的性能损失要大得多。BS在TCPL中提到的消除代码膨胀的方法,在某些情况下的确有用,但是太繁琐,大约也只有库编写者会使用它。
Java现在也支持泛型,据说不存在代码膨胀问题,但它的泛型只是语法糖,对程序性能好像没有提升。
C++ 怎样平衡代码膨胀和代码性能?是否可以为 template 生成 runtime meta info,用来操纵泛型算法。或甚至使用这些meta info 来在运行时生成真正的机器码。这样甚至可以允许在运行时进行template组装,而不是完全在编译时?这又有些类似于现代虚拟机(如Java HotSpot 虚拟机)的动态优化。
扯太远了,休息下。
– 共有 n 个内部结点,n 个外部结点
– winner 只用于初始化时计算败者树,算完后即丢弃
– winner/loser 的第 0 个单元都不是内部结点,不属于树中的一员
– winner 的第 0 个单元未用
– m_tree 的第 0 个单元用于保存最终的赢者, 其它单元保存败者
– 该初始化需要的 n-1 次比较,总的时间复杂度是 O(n)
– 严蔚敏&吴伟民 的 LoserTree 初始化复杂度是 O(n*log(n)),并且还需要一个 min_key,
但是他们的初始化不需要额外的 winner 数组
– 并且,这个实现比 严蔚敏&吴伟民 的 LoserTree 初始化更强壮
其中引用的一些代码比较长,故未贴出
#define LT_iiter_traits typename std::iterator_traits<typename std::iterator_traits<RandIterOfInput>::value_type>
template< class RandIterOfInput
, class KeyType = LT_iiter_traits::value_type
, bool StableSort = false //!< same Key in various way will output by way order
, class Compare = std::less<KeyType>
, class KeyExtractor = typename boost::mpl::if_c<
boost::is_same<KeyType,
LT_iiter_traits::value_type
>::value,
boost::multi_index::identity<KeyType>,
MustProvideKeyExtractor
>::type
, CacheLevel HowCache = cache_default
>
class LoserTree :
public CacheStrategy< typename std::iterator_traits<RandIterOfInput>::value_type
, KeyType
, KeyExtractor
, HowCache
>,
public MultiWay_SetOP< LT_iiter_traits::value_type
, KeyType
, LoserTree<RandIterOfInput, KeyType, StableSort, Compare, KeyExtractor, HowCache>
>
…{
DECLARE_NONE_COPYABLE_CLASS(LoserTree)
typedef CacheStrategy< typename std::iterator_traits<RandIterOfInput>::value_type
, KeyType
, KeyExtractor
, HowCache
>
super;
friend class MultiWay_SetOP< LT_iiter_traits::value_type
, KeyType
, LoserTree<RandIterOfInput, KeyType, StableSort, Compare, KeyExtractor, HowCache>
>;
public:
typedef typename std::iterator_traits<RandIterOfInput>::value_type way_iter_t;
typedef typename std::iterator_traits<way_iter_t >::value_type value_type;
typedef KeyType key_type;
typedef KeyExtractor key_extractor;
typedef boost::integral_constant<bool, StableSort> is_stable_sort;
typedef typename super::cache_category cache_category;
typedef typename super::cache_item_type cache_item_type;
public:
/**//**
@brief construct
@par 图示如下:
@code
RandIterOfInput this is guard value
|| ||
|| ||
/ /
first–> 0 way_iter_t [min_value…………………….max_value]
/ 1 way_iter_t [min_value…………………….max_value] <— 每个序列均已
| 2 way_iter_t [min_value…………………….max_value] 按 comp 排序
| 3 way_iter_t [min_value…………………….max_value]
< 4 way_iter_t [min_value…………………….max_value]
| 5 way_iter_t [min_value…………………….max_value]
| 7 way_iter_t [min_value…………………….max_value]
8 way_iter_t [min_value…………………….max_value]
last—> end
@endcode
@param comp value 的比较器
@note 每个序列最后必须要有一个 max_value 作为序列结束标志,否则会导致未定义行为
*/
LoserTree(RandIterOfInput first, RandIterOfInput last,
const KeyType& max_key,
const Compare& comp = Compare(),
const KeyExtractor& keyExtractor = KeyExtractor())
…{
init(first, last, max_key, comp, keyExtractor);
}
LoserTree(RandIterOfInput first, int length,
const KeyType& max_key,
const Compare& comp = Compare(),
const KeyExtractor& keyExtractor = KeyExtractor())
…{
init(first, first + length, max_key, comp, keyExtractor);
}
// LoserTree(RandIterOfInput first, RandIterOfInput last,
// const cache_item_type& min_item,
// const cache_item_type& max_item,
// const KeyType& max_key,
// const Compare& comp = Compare(),
// const KeyExtractor& keyExtractor = KeyExtractor())
// {
// init_yan_wu(first, last, min_item, max_item, comp, keyExtractor);
// }
// LoserTree(RandIterOfInput first, int length,
// const cache_item_type& min_item, // yan_wu init need
// const cache_item_type& max_item,
// const KeyType& max_key,
// const Compare& comp = Compare(),
// const KeyExtractor& keyExtractor = KeyExtractor())
// {
// init_yan_wu(first, first + length, min_item, max_item, comp, keyExtractor);
// }
LoserTree()
…{
}

/**//**
@brief 初始化
– 共有 n 个内部结点,n 个外部结点
– winner 只用于初始化时计算败者树,算完后即丢弃
– winner/loser 的第 0 个单元都不是内部结点,不属于树中的一员
– winner 的第 0 个单元未用
– m_tree 的第 0 个单元用于保存最终的赢者, 其它单元保存败者
– 该初始化需要的 n-1 次比较,总的时间复杂度是 O(n)
– 严蔚敏&吴伟民 的 LoserTree 初始化复杂度是 O(n*log(n)),并且还需要一个 min_key,
但是他们的初始化不需要额外的 winner 数组
– 并且,这个实现比 严蔚敏&吴伟民 的 LoserTree 初始化更强壮
*/
void init(RandIterOfInput first, RandIterOfInput last,
const KeyType& max_key,
const Compare& comp = Compare(),
const KeyExtractor& keyExtractor = KeyExtractor())
…{
m_comp = comp;
m_key_extractor = keyExtractor;
m_beg = first;
m_end = last;
m_max_key = max_key;
int len = int(last – first);
if (0 == len)
…{
throw std::logic_error("LoserTree: way sequence must not be empty");
}
m_tree.resize(len);
this->resize_cache(len);
int i;
for (i = 0; i != len; ++i)
…{
// read first value from every sequence
this->input_cache_item(i, *(first+i));
}
if (1 == len)
…{
m_tree[0] = 0;
return;
}
int minInnerToEx = len / 2;
std::vector<int> winner(len);
for (i = len – 1; i > minInnerToEx; —i)
…{
exter_loser_winner(m_tree[i], winner[i], i, len);
}
int left, right;
if (len & 1) // odd
…{ // left child is last inner node, right child is first external node
left = winner[len–1];
right = 0;
}
else
…{
left = 0;
right = 1;
}
get_loser_winner(m_tree[minInnerToEx], winner[minInnerToEx], left, right);
for (i = minInnerToEx; i > 0; i /= 2)
…{
for (int j = i–1; j >= i/2; —j)
…{
inner_loser_winner(m_tree[j], winner[j], j, winner);
}
}
m_tree[0] = winner[1];
}
//! 严蔚敏&吴伟民 的 LoserTree 初始化
//! 复杂度是 O(n*log(n)),并且还需要一个 min_key
void init_yan_wu(RandIterOfInput first, RandIterOfInput last,
const cache_item_type& min_item,
const cache_item_type& max_item,
const Compare& comp = Compare(),
const KeyExtractor& keyExtractor = KeyExtractor())
…{
//! this function do not support cache_none
BOOST_STATIC_ASSERT(HowCache != cache_none);
assert(first < last); // ensure that will not construct empty loser tree
m_comp = comp;
m_key_extractor = keyExtractor;
m_beg = first;
m_end = last;
m_max_key = this->key_from_cache_item(max_item);
int len = int(last – first);
m_tree.resize(len);
this->resize_cache(len+1);
this->set_cache_item(len, min_item);
int i;
for (i = 0; i != len; ++i)
…{
m_tree[i] = len;
// read first value from every sequence
this->input_cache_item(i, *(first+i));
}
for (i = len–1; i >= 0; —i)
ajust(i);
// 防止 cache 的最后一个成员上升到 top ??…..
//
this->set_cache_item(len, max_item);
// assert(!m_tree.empty());
// if (m_tree[0] == len)
// ajust(len); // 会导致在 ajust 中 m_tree[parent] 越界
}
const value_type& current_value() const
…{
assert(!m_tree.empty());
// assert(!is_end()); // 允许访问末尾的 guardValue, 便于简化 app
return current_value_aux(cache_category());
}

/**//**
@brief return current way NO.
*/
int current_way() const
…{
assert(!m_tree.empty());
assert(!is_end());
return m_tree[0];
}
size_t total_ways() const
…{
return m_tree.size();
}
bool is_any_way_end() const
…{
return is_end();
}
bool is_end() const
…{
assert(!m_tree.empty());
const KeyType& cur_key = get_cache_key(m_tree[0], cache_category());
return !m_comp(cur_key, m_max_key); // cur_key >= max_value
}
void increment()
…{
assert(!m_tree.empty());
assert(!is_end());
int top = m_tree[0];
input_cache_item(top, ++*(m_beg + top));
ajust(top);
}
void ajust_for_update_top()
…{
assert(!m_tree.empty());
int top = m_tree[0];
input_cache_item(top, *(m_beg + top));
ajust(top);
}
way_iter_t& top()
…{
assert(!m_tree.empty());
return *(m_beg + m_tree[0]);
}
void reserve(int maxTreeSize)
…{
m_tree.reserve(maxTreeSize);
resize_cache(maxTreeSize);
}
protected:
void ajust(int s)
…{
int parent = int(s + m_tree.size()) >> 1;
while (parent > 0)
…{
if (comp_cache_item(m_tree[parent], s, cache_category(), is_stable_sort()))
…{
std::swap(s, m_tree[parent]);
}
parent >>= 1;
}
m_tree[0] = s;
}
void exter_loser_winner(int& loser, int& winner, int parent, int len) const
…{
int left = 2 * parent – len;
int right = left + 1;
get_loser_winner(loser, winner, left, right);
}
void inner_loser_winner(int& loser, int& winner, int parent, const std::vector<int>& winner_vec) const
…{
int left = 2 * parent;
int right = 2 * parent + 1;
left = winner_vec[left];
right = winner_vec[right];
get_loser_winner(loser, winner, left, right);
}
void get_loser_winner(int& loser, int& winner, int left, int right) const
…{
if (comp_cache_item(left, right, cache_category(), is_stable_sort()))
…{
loser = right;
winner = left;
}
else
…{
loser = left;
winner = right;
}
}
const value_type& current_value_aux(tag_cache_none) const
…{
assert(m_tree[0] < int(m_tree.size()));
return **(m_beg + m_tree[0]);
}
const value_type& current_value_aux(tag_cache_key) const
…{
assert(m_tree[0] < int(m_tree.size()));
return **(m_beg + m_tree[0]);
}
const value_type& current_value_aux(tag_cache_value) const
…{
assert(m_tree[0] < int(m_tree.size()));
return this->m_cache[m_tree[0]];
}
using super::get_cache_key;
inline const KeyType get_cache_key(int nth, tag_cache_none) const
…{
return this->m_key_extractor(**(m_beg + nth));
}
template<class CacheCategory>
inline bool comp_cache_item(int x, int y,
CacheCategory cache_tag,
boost::true_type isStableSort) const
…{
return comp_key_stable(x, y,
get_cache_key(x, cache_tag),
get_cache_key(y, cache_tag),
typename HasTriCompare<Compare>::type());
}
bool comp_key_stable(int x, int y, const KeyType& kx, const KeyType& ky,
boost::true_type hasTriCompare) const
…{
int ret = m_comp.compare(kx, ky);
if (ret < 0)
return true;
if (ret > 0)
return false;
ret = m_comp.compare(kx, m_max_key);
assert(ret <= 0);
if (0 == ret)
return false;
else
return x < y;
}
bool comp_key_stable(int x, int y, const KeyType& kx, const KeyType& ky,
boost::false_type hasTriCompare) const
…{
if (m_comp(kx, ky))
return true;
if (m_comp(ky, kx))
return false;
if (!m_comp(kx, m_max_key)) // kx >= max_key –> kx == max_key
…{ // max_key is the max, so must assert this:
assert(!m_comp(m_max_key, kx));
return false;
}
else return x < y;
}
template<class CacheCategory>
inline bool comp_cache_item(int x, int y,
CacheCategory cache_tag,
boost::false_type isStableSort) const
…{
return m_comp(get_cache_key(x, cache_tag), get_cache_key(y, cache_tag));
}
protected:
KeyType m_max_key;
std::vector<int> m_tree;
RandIterOfInput m_beg;
RandIterOfInput m_end;
Compare m_comp;
};

使用示例:
// test_multi_way.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
using namespace std;
using namespace febird;
//using namespace febird::prefix_zip;
using namespace febird::multi_way;
template<class _Cont>
void printResult(const char* title, const _Cont& result)
…{
cout << title << ": ";
for (typename _Cont::const_iterator i = result.begin(); i != result.end(); ++i)
cout << *i << ",";
cout << endl;
}
template<class _Cont>
void printKeyValue(const char* title, const _Cont& result)
…{
cout << title << ": ";
for (typename _Cont::const_iterator i = result.begin(); i != result.end(); ++i)
cout << "(" << result.key(i) << "," << *i << "),";
cout << endl;
}
template<class _Cont>
void printPairCont(const char* title, const _Cont& result)
…{
cout << title << ": ";
for (typename _Cont::const_iterator i = result.begin(); i != result.end(); ++i)
cout << "(" << i->first << "," << i->second << "),";
cout << endl;
}
int main(int argc, char* argv[])
…{
// cout << setw(5) << setiosflags(ios::right) << 100 << setw(20) << setiosflags(ios::left) << "abcd" << endl;
// cout << setw(5) << setiosflags(ios::right) << 100 << setw(20) << left << "abcd" << endl;
int ivals[][11] =
…{
…{1, 8, 20, 31, 47, 54, 75, 82, 93, 99, INT_MAX},
…{1, 7, 20, 30, 48, 53, 76, 81, 95, 98, INT_MAX},
…{3, 6, 17, 20, 35, 42, 49, 73, 90, 91, INT_MAX},
…{2, 4, 19, 20, 46, 51, 73, 88, 96, 97, INT_MAX},
…{2, 4, 15, 20, 46, 51, 73, 88, 96, 97, INT_MAX},
};
vector<int> intersect;
vector<int> unionvec;
vector<int*> ilower, iupper;
for (int i = 0; i < 5; ++i) ilower.push_back(ivals[i]);
for (int i = 0; i < 5; ++i) iupper.push_back(ivals[i] + 10);
LoserTree<vector<int*>::iterator> loserTree(ilower.begin(), ilower.end(), INT_MAX);
loserTree.intersection(back_inserter(intersect));
for (int i = 0; i < 5; ++i) ilower.push_back(ivals[i]);
loserTree.init(ilower.begin(), ilower.end(), INT_MAX);
loserTree.union_set(back_inserter(unionvec));
vector<int> v1;
copy(&ivals[0][0], &ivals[5][0], back_inserter(v1));
printResult("all_values", v1);
printResult("intersection_result", intersect);
printResult("union_result", unionvec);
vector<pair<int*, int*> > range, range2;
for (int i = 0; i < 5; ++i)
…{
range.push_back(make_pair(ivals[i], ivals[i] + 10));
}
intersect.clear();
range2 = range;
HeapMultiWay<vector<pair<int*, int*> >::iterator> heap(range.begin(), range.end());
heap.intersection(back_inserter(intersect));
printResult("intersection_result2", intersect);
range = range2;
…{
vector<int> copyset;
heap.init(range.begin(), range.end());
heap.copy_if2(back_inserter(copyset), MultiWay_CopyAtLeastDup(3));
printResult("MultiWay_CopyAtLeastDup(3)", copyset);
}
range = range2;
…{
map<int, int> counting;
heap.init(range.begin(), range.end());
heap.copy_if2((int*)(0), MultiWay_GetCountTable(counting, 2));
printPairCont("MultiWay_GetCountMap", counting);
}
range = range2;
…{
vector<pair<int, int> > counting;
heap.init(range.begin(), range.end());
heap.copy_if2((int*)(0), MultiWay_GetCountSequence(counting, 2));
printPairCont("MultiWay_GetCountSequence", counting);
}
range = range2;
…{
// MultiWayTable<int, int> counting(16);
map<int, int> counting;
heap.init(range.begin(), range.end());
heap.copy_if2((int*)(0), MultiWay_GetCountTable(counting, 2));
// printKeyValue("MultiWay_GetCountTable", counting);
printPairCont("MultiWay_GetCountTable", counting);
}
range = range2;
…{
// PackedTable<int, int> counting;
map<int, int> counting;
heap.init(range.begin(), range.end());
heap.copy_if2((int*)(0), MultiWay_GetCountTable(counting, 2));
// printKeyValue("MultiWay_GetCountTable", counting);
printPairCont("MultiWay_GetCountTable", counting);
}
return 0;
}

|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
/** @brief 生成一个比较器(Comparator),兼键提取(KeyExtractor)类 使用这个宏生成的比较器可以作用在不同的对象上,只要这些对象有相同名称的成员, 并且可以作用在类型为成员类型的对象上。 - 假设: - 有 n 个类 class[1], class[2], ... class[n],都有类型为 MemberType ,名称为 MemberName 的数据成员 - 那么以下类型的对象可以使用该类相互比较,并且可以从这些对象中提取出 MemberType 类型的键: class[1] ... class[n], MemberType, 以及所有这些类型的任意级别的指针 @param ComparatorName 比较器类的名字 @param MemberType 要比较的对象的成员类型 @param MemberName 要比较的对象的成员名字,也可以是一个成员函数调用, 前面必须加 '.' 或者 '->', 加 '->' 只是为用于 smart_ptr/iterator/proxy 等重载 '->' 的对象 当用于裸指针时,仍使用 '.',这意味着裸指针和 smart_ptr/iterator/proxy 不能使用同一个生成的 Comparator,虽然裸指针的语法和它们都相同 @param ComparePred 比较准则,这个比较准则将被应用到 XXXX MemberName @note - 这个类不是从 ComparePred 继承,为的是可以允许 ComparePred 是个函数, 但这样(不继承)阻止了编译器进行空类优化 - 不在内部使用 const MemberType&, 而是直接使用 MemberType, 是为了允许 MemberName 是一个函数时,返回一个即时计算出来的 Key; - 当为了效率需要使用引用时,将 const MemberType& 作为 MemberType 传进来 - 当 MemberType 是个指针时,将 Type* 作为 MemberType ,而非 const Type*,即使 MemberType 真的是 const Type* - 注意 C++ 参数推导机制: @code template<T> void f(const T& x) { } // f1 template<T> void f(const T* x) { } // f2 template<T> void g(const T& x) { } // g1 template<T> void g(const T* x) { } // g2 template<T> void g( T& x) { } // g3 template<T> void g( T* x) { } // g4 void foo() { int a; const int b; f(&a); // call f1, T was deduced as int*, and then convert to 'const int*&', so match f1, not f2 f(&b); // call f2, T was deduced as int g(&a); // call g4, T was deduced as int g(&b); // call g2, T was deduced as int } @endcode 在上述代码已经表现得比较明白了,这就是要生成四个不同 deref 版本的原因 - 为了配合上述机制,传入的 MemberType 不要有任何 const 修饰符 */ #define SAME_NAME_MEMBER_COMPARATOR_EX(ComparatorName, MemberType, MemberName, ComparePred) / class ComparatorName / { / ComparePred m_comp; / public: / typedef bool result_type; / typedef MemberType key_type; / typedef boost::integral_constant<bool, / febird::HasTriCompare<ComparePred>::value / > has_tri_compare; / / ComparatorName() {} / ComparatorName(const ComparePred& rhs) / : m_comp(rhs) {} / / template<class T>const T&deref(T*x)const{return*x;} / template<class T>const T&deref(T&x)const{return x;} / template<class T>const T&deref(const T*x)const{return*x;}/ template<class T>const T&deref(const T&x)const{return x;}/ / const MemberType operator()(const MemberType x)const{return x;} / template<class T>const MemberType operator()(const T&x)const{return deref(x)MemberName;}/ / template<class Tx, class Ty> / bool operator()(const Tx&x, const Ty&y) const / { / return m_comp((*this)(x),(*this)(y)); / } / template<class Tx, class Ty> / int compare(const Tx&x, const Ty&y) const / { / return m_comp.compare((*this)(x),(*this)(y)); / } / }; //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #endif // #define SAME_NAME_MEMBER_COMPARATOR_EX(ComparatorName, MemberType, MemberName, ComparePred) / // SAME_NAME_MEMBER_COMPARATOR_EX_NO_TRAITS(ComparatorName, MemberType, MemberName, ComparePred)/ // BOOST_TT_AUX_BOOL_TRAIT_SPEC1(HasTriCompare, ComparatorName, HasTriCompare<ComparePred>::value) //! note@ if MemberType must not be a reference, neither const nor non-const #define SAME_NAME_MEMBER_COMPARATOR(ComparatorName, MemberType, MemberName) / SAME_NAME_MEMBER_COMPARATOR_EX(ComparatorName, MemberType, MemberName, std::less<MemberType>) |
用C++写程序经常需要写一些很小的functor,最常见的例子就是 compare functor,排序的,查找的,自己每定义一个数据结构,就要定义一个 compare functor,甚至多个(对不同字段)。甚至,针对指针的,智能指针的……的compare,这件工作很繁琐,很容易使人厌倦。
举个例子,同一个数据结构有M个字段,这些字段有P种类型,还有有N种不同的访问方式(直接提取、通过指针、通过智能指针、甚至通过反序列化等等),要实现所有这些情况的查找/排序,就需要 M×N 个 compare functor 的定义!
从 boost::multi_index 中学到一点,将 KeyExtractor 和 Comparator 分离,这样,只需要写 P 个Comparator,M+N个KeyExtractor,一般情况下,甚至不需要写Comparator,因为字段类型大多是内建类型,Comparator是默认的。举个例子吧:
using namespace std;
using boost::shared_ptr;
//using boost::intrusive_ptr;
struct mydata
{
int d1, d2, d3, d4, d5;
string s1, s2, s3;
};
struct mydata_get_int
{
int offset;
mydata_get_int(int offset) : offset(offset) {}
int operator()(const mydata& x) const
{
return *(int*)(offset + (unsigned char*)&x);
}
// 假定T 就是mydata* 或者智能指针
template<class T>
int operator()(const T& x) const
{
return *(int*)(offset + (unsigned char*)&(*x));
}
};
struct mydata_get_str
{
int offset;
mydata_get_str(int offset) : offset(offset) {}
const string& operator()(const mydata& x) const
{
return *(string*)(offset + (unsigned char*)&x);
}
// 假定T 就是mydata* 或者智能指针
template<class T>
const string& operator()(const T& x) const
{
return *(string*)(offset + (unsigned char*)&(*x));
}
};
class ExtractCompare
{
KeyExtractor m_extractor;
KeyCompare m_comp;
public:
ExtractCompare(const KeyExtractor& ext = KeyExtractor(),
const KeyCompare& comp = KeyCompare())
: m_extractor(ext), m_comp(comp) {}
template<class T1, class T2>
bool operator()(const T1& t1, const T2& t2) const
{
return m_comp(m_extractor(t1), m_extractor(t2));
}
};
int main(int argc, char* argv[])
{
vector<mydata> v1;
vector<mydata*> v2;
vector<shared_ptr<mydata> > v3;
//vector<intrusive_ptr<mydata> > v4;
// …. fill some data to v1, v2, v3
sort(v1.begin(), v1.end(), make_ec(mydata_get_int(FIELD_OFFSET(mydata, d1)), less<int>()));
sort(v2.begin(), v2.end(), make_ec(mydata_get_int(FIELD_OFFSET(mydata, d2)), less<int>()));
sort(v3.begin(), v3.end(), make_ec(mydata_get_str(FIELD_OFFSET(mydata, s3)), less<string>()));
return 0;
}
其中的ExtractCompare和make_ec可以作为公用代码,在多个程序中使用。
使用FIELD_OFFSET,在不降低效率的前提下,避免了代码膨胀。当然,这个例子中因为vector元素类型不同,会生成sort的3个不同版本,但是,如果不使用FIELD_OFFSET,而是直接再写一个extractor,这里会生成sort的4个版本。当然,一般情况下,不会像这样同时使用三个不同类型的vector。
C++0X 问世以后,其中的closure功能或许会使得这种方法显得过时,但是在没有closure的当前编译器上,这种方法还是很实用的。
原以为足够现代的编译器的优化能力很强,看来我是高估了。GCC 没测过,VC 2008 刚刚被证实了。
pair<int,int> x(rand(), rand()), y(rand(), rand());
for (int i = 0; i < 1000; ++i)
{
if (x < y)
x.first += y.second, y.second -= x.second;
}
// pair.< 的定义
template<class _Ty1, class _Ty2> inline
bool operator<(const pair<_Ty1, _Ty2>& _Left,
const pair<_Ty1, _Ty2>& _Right)
{ // test if _Left < _Right for pairs
return (_Left.first < _Right.first ||
!(_Right.first < _Left.first) && _Left.second < _Right.second);
}
; 94 : if (x < y)
003d5 3b 45 ec cmp eax, DWORD PTR _y$[ebp]
003d8 7c 1c jl SHORT $LN71@TestBoolOp
003da 8b 4d ec mov ecx, DWORD PTR _y$[ebp]
003dd 3b 4d e4 cmp ecx, DWORD PTR _x$[ebp]
003e0 7c 08 jl SHORT $LN70@TestBoolOp
003e2 8b 55 e8 mov edx, DWORD PTR _x$[ebp+4]
003e5 3b 55 f0 cmp edx, DWORD PTR _y$[ebp+4]
003e8 7c 0c jl SHORT $LN71@TestBoolOp
$LN70@TestBoolOp:
003ea c7 85 40 ff ff
ff 00 00 00 00 mov DWORD PTR tv84[ebp], 0
003f4 eb 0a jmp SHORT $LN68@TestBoolOp
$LN71@TestBoolOp:
003f6 c7 85 40 ff ff
ff 01 00 00 00 mov DWORD PTR tv84[ebp], 1
$LN68@TestBoolOp:
00400 0f b6 85 40 ff
ff ff movzx eax, BYTE PTR tv84[ebp]
00407 85 c0 test eax, eax
00409 74 12 je SHORT $LN1@TestBoolOp
0040e 03 4d f0 add ecx, DWORD PTR _y$[ebp+4]
00411 89 4d e4 mov DWORD PTR _x$[ebp], ecx
00414 8b 55 f0 mov edx, DWORD PTR _y$[ebp+4]
00417 2b 55 e8 sub edx, DWORD PTR _x$[ebp+4]
0041a 89 55 f0 mov DWORD PTR _y$[ebp+4], edx
$LN1@TestBoolOp:
$LN2@TestBoolOp:
我原乐观地以为编译器的代码不会出现紫色背景的代码,而红色背景的跳转会直接到黄色处,绿色背景的跳转会跳过黄色。是我对编译器要求太高了吗?可是,这样functor在排序、查找等应用中是随处可见的,要浪费多少CPU 啊!真的是硬件水平高了,软件就可以随意挥霍了吗?
与其这样,类似 std::sort 这样的函数还不如不要 inline Compare Functor,直接使用 qsort 算了,还不用那么多的代码膨胀。感觉这样的代码膨胀带来的效益太少了,好像某权威测试对 std::sort 和 qsort 的对简单数据的排序对比说明它仅比 qsort 快 20%~30%。可是在一个典型应用中它带来的代码膨胀恐怕 2000%~3000%都有吧。
现在用习惯了 C++ 的 Template,不用它就感觉不舒服。但是经常一个小规模的 C++ 程序编译出的执行程序就有2~3M,这还是优化过,去掉所有调试信息的。真不知道如何办好!