多音字 搜索词纠错: 匹配简拼(汉字数≥5时才有效)  单字的拼音来自:这里(最全的多音字)...     

当搜索词中有错别字时,搜索引擎会尝试纠错

通过相似拼音纠错

搜索引擎把这些字还原成拼音,用一个拼音相同的已知的搜索词代替。

这是一种众所周知的纠错策略,但是,当输错的字是多音字,特别是有多个这样的错误输入时,所有的搜索引擎都尽量绕开这个问题,或者仅使用最常用的那些音去纠错。 因为要考虑所有可能的拼音组合,在极端情况下会导致指数爆炸! 例如某互联网大厂的实现(枚举多音字全排列)

基于自动机的算法可以完美解决这个指数爆炸问题

  • 这是自动机应用的又一个绝佳范例,作为演示,这个页面只收录了 800万搜索词+词频,数据也不太干净
  • 该算法全部在内存中运行,使用了 293M 内存,这个数据量,如果用传统方法暴力实现,并且达到这个性能,需要 几十G 的内存
  • 暴力方法是 Query 越长越可怕,该算法则是 Query 越长,优势越大
  • 纠错耗时仅供参考(单核虚拟云主机: Xeon E5-2430 2.20GHz + RAM:1G),如果你看到搜索耗时过长,很可能是 mmap 数据被 swap 到了硬盘上,再搜索一次会得到客观的搜索耗时

这个算法也可以用来解决用户输入预测(智能提示)功能

用户只输入Query开头部分,就自动提示出整个Query,例如用户输入举头望,就提示出举头望明月。就像现在各种搜索引擎做的那样。

基于编辑距离的纠错

在已知的搜索词中寻找编辑距离与用户 Query 最小的词,使用我的算法也可以高效解决(还没做演示页面)

通用的 LoserTree

阅读更多关于《通用的 LoserTree》

 – 共有 n 个内部结点,n 个外部结点
 – winner 只用于初始化时计算败者树,算完后即丢弃
 – winner/loser 的第 0 个单元都不是内部结点,不属于树中的一员
 – winner 的第 0 个单元未用
 – m_tree 的第 0 个单元用于保存最终的赢者, 其它单元保存败者

 – 该初始化需要的 n-1 次比较,总的时间复杂度是 O(n)

 – 严蔚敏&吴伟民 的 LoserTree 初始化复杂度是 O(n*log(n)),并且还需要一个 min_key,
   但是他们的初始化不需要额外的 winner 数组

 – 并且,这个实现比 严蔚敏&吴伟民 的 LoserTree 初始化更强壮

其中引用的一些代码比较长,故未贴出

#define LT_iiter_traits typename std::iterator_traits<typename std::iterator_traits<RandIterOfInput>::value_type>

template
< class RandIterOfInput

        , 
class KeyType = LT_iiter_traits::value_type

        , 
bool StableSort = false //!< same Key in various way will output by way order

        , 
class Compare = std::less<KeyType>

        , 
class KeyExtractor = typename boost::mpl::if_c<
                boost::is_same
<KeyType,
                               LT_iiter_traits::value_type
                              
>::value,
                boost::multi_index::identity
<KeyType>,
                MustProvideKeyExtractor
            
>::type

        , CacheLevel HowCache 
= cache_default
        
>
class LoserTree :
    
public CacheStrategy< typename std::iterator_traits<RandIterOfInput>::value_type
                        , KeyType
                        , KeyExtractor
                        , HowCache
                        
>,
    
public MultiWay_SetOP< LT_iiter_traits::value_type
                         , KeyType
                         , LoserTree
<RandIterOfInput, KeyType, StableSort, Compare, KeyExtractor, HowCache>
                         
>
{
    DECLARE_NONE_COPYABLE_CLASS(LoserTree)

    typedef CacheStrategy
< typename std::iterator_traits<RandIterOfInput>::value_type
                        , KeyType
                        , KeyExtractor
                        , HowCache
                        
>
    super;

    friend 
class MultiWay_SetOP< LT_iiter_traits::value_type
                         , KeyType
                         , LoserTree
<RandIterOfInput, KeyType, StableSort, Compare, KeyExtractor, HowCache>
                         
>;
public:
    typedef typename std::iterator_traits
<RandIterOfInput>::value_type way_iter_t;
    typedef typename std::iterator_traits
<way_iter_t     >::value_type value_type;
    typedef KeyType  key_type;
    typedef KeyExtractor key_extractor;
    typedef boost::integral_constant
<bool, StableSort> is_stable_sort;

    typedef typename super::cache_category  cache_category;
    typedef typename super::cache_item_type cache_item_type;

public:
    
/**
     @brief construct

     @par 图示如下:
     @code

RandIterOfInput                                          this is guard value
      ||                                                          ||     
      ||                                                          ||     
      /                                                          /      
     first–> 0 way_iter_t [min_value…………………….max_value] 
       /      1 way_iter_t [min_value…………………….max_value] <— 每个序列均已
       |      2 way_iter_t [min_value…………………….max_value]      按 comp 排序
       |      3 way_iter_t [min_value…………………….max_value]
      <       4 way_iter_t [min_value…………………….max_value]
       |      5 way_iter_t [min_value…………………….max_value]
       |      7 way_iter_t [min_value…………………….max_value]
             8 way_iter_t [min_value…………………….max_value]
     last—> end

     @endcode

     @param comp    value 的比较器

     @note 每个序列最后必须要有一个 max_value 作为序列结束标志,否则会导致未定义行为
     
*/

    LoserTree(RandIterOfInput first, RandIterOfInput last,
              
const KeyType& max_key,
              
const Compare& comp = Compare(),
              
const KeyExtractor& keyExtractor = KeyExtractor())
    
{
        init(first, last, max_key, comp, keyExtractor);
    }

    LoserTree(RandIterOfInput first, 
int length,
              
const KeyType& max_key,
              
const Compare& comp = Compare(),
              
const KeyExtractor& keyExtractor = KeyExtractor())
    
{
        init(first, first 
+ length, max_key, comp, keyExtractor);
    }


//     LoserTree(RandIterOfInput first, RandIterOfInput last,
//               const cache_item_type& min_item,
//               const cache_item_type& max_item,
//               const KeyType& max_key,
//               const Compare& comp = Compare(),
//               const KeyExtractor& keyExtractor = KeyExtractor())
//     {
//         init_yan_wu(first, last, min_item, max_item, comp, keyExtractor);
//     }
//     LoserTree(RandIterOfInput first, int length,
//               const cache_item_type& min_item, // yan_wu init need
//               const cache_item_type& max_item,
//               const KeyType& max_key,
//               const Compare& comp = Compare(),
//               const KeyExtractor& keyExtractor = KeyExtractor())
//     {
//         init_yan_wu(first, first + length, min_item, max_item, comp, keyExtractor);
//     }

    LoserTree()
    
{
    }


    
/**
     @brief 初始化
      
    – 共有 n 个内部结点,n 个外部结点
    – winner 只用于初始化时计算败者树,算完后即丢弃
    – winner/loser 的第 0 个单元都不是内部结点,不属于树中的一员
    – winner 的第 0 个单元未用
    – m_tree 的第 0 个单元用于保存最终的赢者, 其它单元保存败者

    – 该初始化需要的 n-1 次比较,总的时间复杂度是 O(n)

    – 严蔚敏&吴伟民 的 LoserTree 初始化复杂度是 O(n*log(n)),并且还需要一个 min_key,
      但是他们的初始化不需要额外的 winner 数组

    – 并且,这个实现比 严蔚敏&吴伟民 的 LoserTree 初始化更强壮
     
*/

    
void init(RandIterOfInput first, RandIterOfInput last,
              
const KeyType& max_key,
              
const Compare& comp = Compare(),
              
const KeyExtractor& keyExtractor = KeyExtractor())
    
{
        m_comp 
= comp;
        m_key_extractor 
= keyExtractor;

        m_beg 
= first;
        m_end 
= last;

        m_max_key 
= max_key;

        
int len = int(last  first);
        
if (0 == len)
        
{
            
throw std::logic_error("LoserTree: way sequence must not be empty");
        }


        m_tree.resize(len);

        
this->resize_cache(len);

        
int i;
        
for (i = 0; i != len; ++i)
        
{
            
// read first value from every sequence
            this->input_cache_item(i, *(first+i));
        }

        
if (1 == len)
        
{
            m_tree[
0= 0;
            
return;
        }


        
int minInnerToEx = len / 2;

        std::vector
<int> winner(len);

        
for (i = len  1; i > minInnerToEx; i)
        
{
            exter_loser_winner(m_tree[i], winner[i], i, len);
        }

        
int left, right;
        
if (len & 1// odd
        // left child is last inner node, right child is first external node
            left = winner[len1];
            right 
= 0;
        }

        
else
        
{
            left 
= 0;
            right 
= 1;
        }

        get_loser_winner(m_tree[minInnerToEx], winner[minInnerToEx], left, right);

        
for (i = minInnerToEx; i > 0; i /= 2)
        
{
            
for (int j = i1; j >= i/2j)
            
{
                inner_loser_winner(m_tree[j], winner[j], j, winner);
            }

        }

        m_tree[
0= winner[1];
    }


    
//! 严蔚敏&吴伟民 的 LoserTree 初始化
    
//! 复杂度是 O(n*log(n)),并且还需要一个 min_key
    void init_yan_wu(RandIterOfInput first, RandIterOfInput last,
                     
const cache_item_type& min_item,
                     
const cache_item_type& max_item,
                     
const Compare& comp = Compare(),
                     
const KeyExtractor& keyExtractor = KeyExtractor())
    
{
        
//! this function do not support cache_none
        BOOST_STATIC_ASSERT(HowCache != cache_none);

        assert(first 
< last); // ensure that will not construct empty loser tree

        m_comp 
= comp;
        m_key_extractor 
= keyExtractor;

        m_beg 
= first;
        m_end 
= last;

        m_max_key 
= this->key_from_cache_item(max_item);

        
int len = int(last  first);
        m_tree.resize(len);

        
this->resize_cache(len+1);
        
this->set_cache_item(len, min_item);

        
int i;
        
for (i = 0; i != len; ++i)
        
{
            m_tree[i] 
= len;

            
// read first value from every sequence
            this->input_cache_item(i, *(first+i));
        }

        
for (i = len1; i >= 0i)
            ajust(i);

        
// 防止 cache 的最后一个成员上升到 top ??…..
        
//
        this->set_cache_item(len, max_item);

    
//    assert(!m_tree.empty());
    
//    if (m_tree[0] == len)
    
//        ajust(len); // 会导致在 ajust 中 m_tree[parent] 越界
    }


    
const value_type& current_value() const
    
{
        assert(
!m_tree.empty());
    
//    assert(!is_end()); // 允许访问末尾的 guardValue, 便于简化 app
        return current_value_aux(cache_category());
    }


    
/**
     @brief return current way NO.
     
*/

    
int current_way() const
    
{
        assert(
!m_tree.empty());
        assert(
!is_end());
        
return m_tree[0];
    }


    size_t total_ways() 
const
    
{
        
return m_tree.size();
    }


    
bool is_any_way_end() const
    
{
        
return is_end();
    }


    
bool is_end() const
    
{
        assert(
!m_tree.empty());
        
const KeyType& cur_key = get_cache_key(m_tree[0], cache_category());
        
return !m_comp(cur_key, m_max_key); // cur_key >= max_value
    }


    
void increment()
    
{
        assert(
!m_tree.empty());
        assert(
!is_end());
        
int top = m_tree[0];
        input_cache_item(top, 
++*(m_beg + top));
        ajust(top);
    }


    
void ajust_for_update_top()
    
{
        assert(
!m_tree.empty());
        
int top = m_tree[0];
        input_cache_item(top, 
*(m_beg + top));
        ajust(top);
    }


    way_iter_t
& top()
    
{
        assert(
!m_tree.empty());
        
return *(m_beg + m_tree[0]);
    }


    
void reserve(int maxTreeSize)
    
{
        m_tree.reserve(maxTreeSize);
        resize_cache(maxTreeSize);
    }


protected:
    
void ajust(int s)
    
{
        
int parent = int(s + m_tree.size()) >> 1;
        
while (parent > 0)
        
{
            
if (comp_cache_item(m_tree[parent], s, cache_category(), is_stable_sort()))
            
{
                std::swap(s, m_tree[parent]);
            }

            parent 
>>= 1;
        }

        m_tree[
0= s;
    }


    
void exter_loser_winner(int& loser, int& winner, int parent, int len) const
    
{
        
int left  = 2 * parent  len;
        
int right = left + 1;
        get_loser_winner(loser, winner, left, right);
    }

    
void inner_loser_winner(int& loser, int& winner, int parent, const std::vector<int>& winner_vec) const
    
{
        
int left  = 2 * parent;
        
int right = 2 * parent + 1;
        left 
= winner_vec[left];
        right 
= winner_vec[right];
        get_loser_winner(loser, winner, left, right);
    }

    
void get_loser_winner(int& loser, int& winner, int left, int right) const
    
{
        
if (comp_cache_item(left, right, cache_category(), is_stable_sort()))
        
{
            loser  
= right;
            winner 
= left;
        }

        
else
        
{
            loser 
= left;
            winner 
= right;
        }

    }


    
const value_type& current_value_aux(tag_cache_none) const
    
{
        assert(m_tree[
0< int(m_tree.size()));
        
return **(m_beg + m_tree[0]);
    }

    
const value_type& current_value_aux(tag_cache_key) const
    
{
        assert(m_tree[
0< int(m_tree.size()));
        
return **(m_beg + m_tree[0]);
    }

    
const value_type& current_value_aux(tag_cache_value) const
    
{
        assert(m_tree[
0< int(m_tree.size()));
        
return this->m_cache[m_tree[0]];
    }


    
using super::get_cache_key;
    inline 
const KeyType get_cache_key(int nth, tag_cache_none) const
    
{
        
return this->m_key_extractor(**(m_beg + nth));
    }


    template
<class CacheCategory>
    inline 
bool comp_cache_item(int x, int y,
                                CacheCategory cache_tag,
                                boost::true_type isStableSort) 
const
    
{
        
return comp_key_stable(x, y,
                get_cache_key(x, cache_tag),
                get_cache_key(y, cache_tag),
                typename HasTriCompare
<Compare>::type());
    }


    
bool comp_key_stable(int x, int y, const KeyType& kx, const KeyType& ky,
                         boost::true_type hasTriCompare) 
const
    
{
        
int ret = m_comp.compare(kx, ky);
        
if (ret < 0)
            
return true;
        
if (ret > 0)
            
return false;
        ret 
= m_comp.compare(kx, m_max_key);
        assert(ret 
<= 0);
        
if (0 == ret)
            
return false;
        
else
            
return x < y;
    }

    
bool comp_key_stable(int x, int y, const KeyType& kx, const KeyType& ky,
                         boost::false_type hasTriCompare) 
const
    
{
        
if (m_comp(kx, ky))
            
return true;
        
if (m_comp(ky, kx))
            
return false;
        
if (!m_comp(kx, m_max_key)) // kx >= max_key –> kx == max_key
        // max_key is the max, so must assert this:
            assert(!m_comp(m_max_key, kx));
            
return false;
        }

        
else return x < y;
    }


    template
<class CacheCategory>
    inline 
bool comp_cache_item(int x, int y,
                                CacheCategory cache_tag,
                                boost::false_type isStableSort) 
const
    
{
        
return m_comp(get_cache_key(x, cache_tag), get_cache_key(y, cache_tag));
    }


protected:
    KeyType  m_max_key;
    std::vector
<int> m_tree;
    RandIterOfInput  m_beg;
    RandIterOfInput  m_end;
    Compare          m_comp;
}
;

使用示例:

 

// test_multi_way.cpp : Defines the entry point for the console application.
//

#include 
"stdafx.h"

using namespace std;
using namespace febird;
//using namespace febird::prefix_zip;
using namespace febird::multi_way;

template
<class _Cont>
void printResult(const char* title, const _Cont& result)
{
    cout 
<< title << ": ";
    
for (typename _Cont::const_iterator i = result.begin(); i != result.end(); ++i)
        cout 
<< *<< ",";
    cout 
<< endl;
}


template
<class _Cont>
void printKeyValue(const char* title, const _Cont& result)
{
    cout 
<< title << ": ";
    
for (typename _Cont::const_iterator i = result.begin(); i != result.end(); ++i)
        cout 
<< "(" << result.key(i) << "," << *<< "),";
    cout 
<< endl;
}


template
<class _Cont>
void printPairCont(const char* title, const _Cont& result)
{
    cout 
<< title << ": ";
    
for (typename _Cont::const_iterator i = result.begin(); i != result.end(); ++i)
        cout 
<< "(" << i->first << "," << i->second << "),";
    cout 
<< endl;
}


int main(int argc, char* argv[])
{
//    cout << setw(5) << setiosflags(ios::right) << 100 << setw(20) << setiosflags(ios::left) << "abcd" << endl;
//    cout << setw(5) << setiosflags(ios::right) << 100 << setw(20) << left << "abcd" << endl;

    
int ivals[][11=
    
{
        
{182031475475829399, INT_MAX},
        
{172030485376819598, INT_MAX},
        
{361720354249739091, INT_MAX},
        
{241920465173889697, INT_MAX},
        
{241520465173889697, INT_MAX},
    }
;
    vector
<int> intersect;
    vector
<int> unionvec;
    vector
<int*> ilower, iupper;
    
for (int i = 0; i < 5++i)    ilower.push_back(ivals[i]);
    
for (int i = 0; i < 5++i)    iupper.push_back(ivals[i] + 10);
    LoserTree
<vector<int*>::iterator> loserTree(ilower.begin(), ilower.end(), INT_MAX);
    loserTree.intersection(back_inserter(intersect));

    
for (int i = 0; i < 5++i)    ilower.push_back(ivals[i]);
    loserTree.init(ilower.begin(), ilower.end(), INT_MAX);
    loserTree.union_set(back_inserter(unionvec));

    vector
<int> v1;
    copy(
&ivals[0][0], &ivals[5][0], back_inserter(v1));
    printResult(
"all_values", v1);
    printResult(
"intersection_result", intersect);
    printResult(
"union_result", unionvec);

    vector
<pair<int*int*> > range, range2;
    
for (int i = 0; i < 5++i)
    
{
        range.push_back(make_pair(ivals[i], ivals[i] 
+ 10));
    }

    intersect.clear();
    range2 
= range;
    HeapMultiWay
<vector<pair<int*int*> >::iterator> heap(range.begin(), range.end());

    heap.intersection(back_inserter(intersect));
    printResult(
"intersection_result2", intersect);

    range 
= range2;
    
{
        vector
<int> copyset;
        heap.init(range.begin(), range.end());
        heap.copy_if2(back_inserter(copyset), MultiWay_CopyAtLeastDup(
3));
        printResult(
"MultiWay_CopyAtLeastDup(3)", copyset);
    }


    range 
= range2;
    
{
        map
<intint> counting;
        heap.init(range.begin(), range.end());
        heap.copy_if2((
int*)(0), MultiWay_GetCountTable(counting, 2));
        printPairCont(
"MultiWay_GetCountMap", counting);
    }


    range 
= range2;
    
{
        vector
<pair<intint> > counting;
        heap.init(range.begin(), range.end());
        heap.copy_if2((
int*)(0), MultiWay_GetCountSequence(counting, 2));
        printPairCont(
"MultiWay_GetCountSequence", counting);
    }


    range 
= range2;
    
{
    
//    MultiWayTable<int, int> counting(16);
        map<intint> counting;
        heap.init(range.begin(), range.end());
        heap.copy_if2((
int*)(0), MultiWay_GetCountTable(counting, 2));
    
//    printKeyValue("MultiWay_GetCountTable", counting);
        printPairCont("MultiWay_GetCountTable", counting);
    }


    range 
= range2;
    
{
    
//    PackedTable<int, int> counting;
        map<intint> counting;
        heap.init(range.begin(), range.end());
        heap.copy_if2((
int*)(0), MultiWay_GetCountTable(counting, 2));
    
//    printKeyValue("MultiWay_GetCountTable", counting);
        printPairCont("MultiWay_GetCountTable", counting);
    }


    
return 0;
}


 

 

一个很强大的Comparator生成器

阅读更多关于《一个很强大的Comparator生成器》

 

Comparator 将 M×N 转化成 M+N

阅读更多关于《Comparator 将 M×N 转化成 M+N》

用C++写程序经常需要写一些很小的functor,最常见的例子就是 compare functor,排序的,查找的,自己每定义一个数据结构,就要定义一个 compare functor,甚至多个(对不同字段)。甚至,针对指针的,智能指针的……的compare,这件工作很繁琐,很容易使人厌倦。

举个例子,同一个数据结构有M个字段,这些字段有P种类型,还有有N种不同的访问方式(直接提取、通过指针、通过智能指针、甚至通过反序列化等等),要实现所有这些情况的查找/排序,就需要 M×N 个 compare functor 的定义!

从 boost::multi_index 中学到一点,将 KeyExtractor 和 Comparator 分离,这样,只需要写 P 个Comparator,M+N个KeyExtractor,一般情况下,甚至不需要写Comparator,因为字段类型大多是内建类型,Comparator是默认的。举个例子吧:


using namespace std;

using boost::shared_ptr;

//using boost::intrusive_ptr;

 

struct mydata

{

   int d1, d2, d3, d4, d5;

   string s1, s2, s3;

};

 


struct mydata_get_int

{

   int offset;

 

   mydata_get_int(int offset) : offset(offset) {}

 

   int operator()(const mydata& x) const

   {

      return *(int*)(offset + (unsigned char*)&x);

   }

   // 假定T 就是mydata* 或者智能指针

   template<class T>

   int operator()(const T& x) const

   {

      return *(int*)(offset + (unsigned char*)&(*x));

   }

};

struct mydata_get_str

{

   int offset;

 

   mydata_get_str(int offset) : offset(offset) {}

 

   const string& operator()(const mydata& x) const

   {

      return *(string*)(offset + (unsigned char*)&x);

   }

   // 假定T 就是mydata* 或者智能指针

   template<class T>

   const string& operator()(const T& x) const

   {

      return *(string*)(offset + (unsigned char*)&(*x));

   }

};

 

 

 

class ExtractCompare

{

   KeyExtractor m_extractor;

   KeyCompare   m_comp;

 

public:

   ExtractCompare(const KeyExtractor& ext = KeyExtractor(),

                const KeyCompare& comp = KeyCompare())

      : m_extractor(ext), m_comp(comp) {}

 

   template<class T1, class T2>

   bool operator()(const T1& t1, const T2& t2) const

   {

      return m_comp(m_extractor(t1), m_extractor(t2));

   }

};

template<class KeyExtractor, class KeyCompare>
ExtractCompare<KeyExtractor, KeyCompare>
make_ec(const KeyExtractor& kex, const KeyCompare& comp)
{
   return ExtractCompare<KeyExtractor, KeyCompare>(kex, comp);
}
 

 


int main(int argc, char* argv[])

{

   vector<mydata> v1;

   vector<mydata*> v2;

   vector<shared_ptr<mydata> > v3;

   //vector<intrusive_ptr<mydata> > v4;

 

   // …. fill some data to v1, v2, v3

 

   sort(v1.begin(), v1.end(), make_ec(mydata_get_int(FIELD_OFFSET(mydata, d1)), less<int>()));


   sort(v2.begin(), v2.end(), make_ec(mydata_get_int(FIELD_OFFSET(mydata, d2)), less<int>()));

   sort(v1.begin(), v1.end(), make_ec(mydata_get_int(FIELD_OFFSET(mydata, d3)), less<int>()));

  

   sort(v3.begin(), v3.end(), make_ec(mydata_get_str(FIELD_OFFSET(mydata, s3)), less<string>()));

 

   return 0;

}

 

其中的ExtractCompare和make_ec可以作为公用代码,在多个程序中使用。

使用FIELD_OFFSET,在不降低效率的前提下,避免了代码膨胀。当然,这个例子中因为vector元素类型不同,会生成sort的3个不同版本,但是,如果不使用FIELD_OFFSET,而是直接再写一个extractor,这里会生成sort的4个版本。当然,一般情况下,不会像这样同时使用三个不同类型的vector。

C++0X 问世以后,其中的closure功能或许会使得这种方法显得过时,但是在没有closure的当前编译器上,这种方法还是很实用的。

C++ 中 Bool functor 的优化

阅读更多关于《C++ 中 Bool functor 的优化》

原以为足够现代的编译器的优化能力很强,看来我是高估了。GCC 没测过,VC 2008 刚刚被证实了。

    pair<int,int> x(rand(), rand()), y(rand(), rand());

   for (int i = 0; i < 1000; ++i)

   {

      if (x < y)

         x.first += y.second, y.second -= x.second;

   }

 

// pair.< 的定义

template<class _Ty1, class _Ty2> inline

bool operator<(const pair<_Ty1, _Ty2>& _Left,

               const pair<_Ty1, _Ty2>& _Right)

{  // test if _Left < _Right for pairs

   return (_Left.first < _Right.first ||

      !(_Right.first < _Left.first) && _Left.second < _Right.second);

}

主要是看其中的 if (x < y) 会被优化成什么样子。我原估计的是会在 inline 展开之后,将函数的返回值直接映射到true/false分支,没想到让我大失所望:

 

; 93   :  {
; 94   :   if (x < y)

  003d2 8b 45 e4  mov  eax, DWORD PTR _x$[ebp]
  003d5 3b 45 ec  cmp  eax, DWORD PTR _y$[ebp]
  003d8 7c 1c   jl  SHORT $LN71@TestBoolOp
  003da 8b 4d ec  mov  ecx, DWORD PTR _y$[ebp]
  003dd 3b 4d e4  cmp  ecx, DWORD PTR _x$[ebp]
  003e0 7c 08   jl  SHORT $LN70@TestBoolOp
  003e2 8b 55 e8  mov  edx, DWORD PTR _x$[ebp+4]
  003e5 3b 55 f0  cmp  edx, DWORD PTR _y$[ebp+4]
  003e8 7c 0c   jl  SHORT $LN71@TestBoolOp
$LN70@TestBoolOp:
  003ea c7 85 40 ff ff
 ff 00 00 00 00  mov  DWORD PTR tv84[ebp], 0
  003f4 eb 0a   jmp  SHORT
$LN68@TestBoolOp
$LN71@TestBoolOp:
  003f6 c7 85 40 ff ff
 ff 01 00 00 00  mov  DWORD PTR tv84[ebp], 1
$LN68@TestBoolOp:
  00400 0f b6 85 40 ff
 ff ff   movzx  eax, BYTE PTR tv84[ebp]
  00407 85 c0   test  eax, eax
  00409 74 12   je  SHORT
$LN1@TestBoolOp

; 95   :    x.first += y.second, y.second -= x.second;

  0040b 8b 4d e4  mov  ecx, DWORD PTR _x$[ebp]
  0040e 03 4d f0  add  ecx, DWORD PTR _y$[ebp+4]
  00411 89 4d e4  mov  DWORD PTR _x$[ebp], ecx
  00414 8b 55 f0  mov  edx, DWORD PTR _y$[ebp+4]
  00417 2b 55 e8  sub  edx, DWORD PTR _x$[ebp+4]
  0041a 89 55 f0  mov  DWORD PTR _y$[ebp+4], edx
$LN1@TestBoolOp:

; 96   :  }

  0041d eb a1   jmp  SHORT $LN3@TestBoolOp
$LN2@TestBoolOp:

我原乐观地以为编译器的代码不会出现紫色背景的代码,而红色背景的跳转会直接到黄色处,绿色背景的跳转会跳过黄色。是我对编译器要求太高了吗?可是,这样functor在排序、查找等应用中是随处可见的,要浪费多少CPU 啊!真的是硬件水平高了,软件就可以随意挥霍了吗?

与其这样,类似 std::sort 这样的函数还不如不要 inline Compare Functor,直接使用 qsort 算了,还不用那么多的代码膨胀。感觉这样的代码膨胀带来的效益太少了,好像某权威测试对 std::sort 和 qsort  的对简单数据的排序对比说明它仅比 qsort 快 20%~30%。可是在一个典型应用中它带来的代码膨胀恐怕 2000%~3000%都有吧。

现在用习惯了 C++ 的 Template,不用它就感觉不舒服。但是经常一个小规模的 C++ 程序编译出的执行程序就有2~3M,这还是优化过,去掉所有调试信息的。真不知道如何办好!

数据库集合查询的优化

阅读更多关于《数据库集合查询的优化》

有一个应用,需要经常做类似这样的查询 select * from SomeTable where key in (KeySet) ,其中 KeySet 可能很大,比如包含几百甚至几千个元素。理想中的情况,数据库应该先在 BTree 中查找到 KeySet 中的 Key 所在的物理页面地址,然后再对这物理地址排序,最后按顺序读入这些页面内容并填充结果。如果这样做,那么在最坏情况下,KeySet 中元素的逻辑排序完全不等于其物理顺序,并且,每个Key所在的页面还不在相同的磁盘柱面上,这样,查询集合中所有 key 所花的磁盘时间就等于 Key 的数目乘以磁盘的“平均潜伏时间”(Average Latency)再加上“柱面切换时间”(Cylinder Switch Time) 和 传输时间(Transfer Time)

 T = KeySet.size * (al + cst + tt)

其中 al = Average Latency, cst = Cylinder Switch Time,实际上我这里说的 cst 比Cylinder Switch Time要大一些,因为柱面不一定相邻,中间可能像个几十个甚至上百的柱面,但这个时间跟cst应该比较接近。

这里假定内部处理所花的时间基本上可以忽略,在一般情况下也的确如此。一般情况下一个物理页面很小,如8K到16K。

对于高速的每分钟15000转的服务器硬盘,Average Seek Time 是8ms,连续传输速度是 200M,al=2ms,cst=1ms,使用16K的页面尺寸,则 tt = 0.025ms,因此可以先忽略tt。

如果KeySet中有4000个元素,并且BTree的所有内部节点(索引结点)都已经缓存,那么一次这样的查询需要 4000*(2ms + 1ms) = 12 秒!

而如果每查找一个key都启动一次在整个 BTree 上的查找,也假定BTree的所有内部节点(索引结点)都已经缓存,那么这个时间就是:4000*(al+ast) = 4000 * 10ms = 40秒

虽然经过优化,这个优化版的时间仍然很长,但是,这是查询4000个Key的最坏情况,很可能这些Key有一些局部性,比如他们只位于200个不同的柱面上(平均每个柱面20个key),这是非常可能的。这样,这个时间就缩短到了0.6秒,如果我们把数据库进行集群,比如10个服务器结点,那么这个数据可以缩短到0.06秒,这个时间就基本上可以接受了。对于传统方法,在这种情况下是0.2秒。

我在一个项目中碰到这样的瓶颈(数据量大约2T,使用磁盘阵列),找不到支持这种优化的系统。使用BDB(BerkeleyDB),最多也只能减少数据拷贝和网络传输的时间,这样的一个查询经常需要40秒以上的时间。让我难以忍受,因为事先根据我的分析和计算,Key是有一定局部性的,这个时间应该在2秒以内,再不行也应该在5秒内。最后我自己写了一个只读的BTree索引系统(完整的BTree处理太复杂),使用了这种优化,查询时间一下子缩短到了1秒以内,最后使用了10个分开的阵列,时间缩短到了0.1秒,基本可以满足需求。这个效果比我预先估计的要快很多,因为基于BerkeleyDB的效果,我猜想可能也就比BerkeleyDB快4倍的时间。

不过这个结果也证实了我之前的猜想:数据有一定的局部性,虽然KeySet中有4000个Key,但是这些Key中有很多是相邻的。并且,BerkeleyDB的实现可能也没有我想象的好,也有可能是其它原因,比如可能每次查询的间隔时间较大,使得即使数据相邻,也需要重新调度磁头等等。

如果随着技术的发展,磁盘最终被其它介质(如Flash)代替,这种方法是不是就没了用武之地?

munmap 注意事项

阅读更多关于《munmap 注意事项》

linux/unix 下,或者说posix 的munmap,很简单,只有两个参数:

int munmap(void *start, size_t length);

其中 length 必须是 mmap 时的 length,如果小于当初 mmap 时的那个 length,并且正好少的部分跨越了一个page,那就麻烦了,我就犯了这个错误,非常严重的后果!内存泄漏,不是泄露了刚好少 unmap 的那个 page,而是整个 [start, length) 区域都不会成功被 unmap,也许内存中的更改已经写入文件,但是虚拟内存空间[start,length)未被释放!如此多次,会造成 ENOMEM!

感谢上帝,搞了半天,这个问题终于被发现了。

感觉吧,Windows 的很简单 UnmapViewOfFile 就一个参数,就是 MapViewOfFile 返回的那个地址,UnmapViewOfFile 时整个 map 区域都被释放,而 posix 的 munmap,从理论上讲,可以一次 mmap 一大块区域,然后多次 unmap 这个区域中的不同部分,这的确提高了一些灵活性,但是……。

 

腾讯 QQ 真的不是一般的烂!

阅读更多关于《腾讯 QQ 真的不是一般的烂!》

qq 目录下有几个微软的动态库(见图):

如果删掉,按照常规,会加载系统最新的库,但是,当我把这几个文件中的任意一个删除的时候,qq 都无法正常运行。

做软件做到这个份上,不如去死了算了!

 

gcc 一个恶心的 bug

阅读更多关于《gcc 一个恶心的 bug》

我的测试项目下面有 4 个 .cpp 文件, 测试我写的模版(在另一些 .h 中)。

测试的编译选项主要是 -g3 -O0 ,无优化

当我改变了模版头文件,因为用到的测试代码在 main.cpp 中(包含了模版头文件),我把 main.o 删掉,重新编译,结果模版头文件的修改就是不能生效,跟没改一样,搞了很多次,都是不对,我一直以为是自己的代码有问题。在我快要疯掉时,我 make clean 所有 .o 删掉,再编译,竟然好了!

我操!我又没有使用预编译头,所有测试代码都在 main.cpp 中,它竟然把 main.cpp 代码编译到了其它 .o 中!

千万注意,不要 hack std::string

阅读更多关于《千万注意,不要 hack std::string》

前段时间被一个bug折磨了两个星期,最后发现竟然是如此一个陷阱——我为了减少内存用量并且减少一次内存拷贝,直接通过string.data()修改了string的内部表示。这与其说是一个陷阱,不如说是我自己给自己造了一个陷阱然后把自己给掉进去了。发病机制可以用如下代码简单的勾画出来:

using namespace std;

 

int main(int argc, char* argv[])

{

    string str1 = "abcde";

    string str2 = str1;

 

    strcpy(const_cast<char*>(str2.data()), "1234");

 

    cout << "str1=" << str1 << endl

         << "str2=" << str2 << endl;

 

    return 0;

}

 

在windows+msvc 中的输出是:

str1=abcde
str2=1234

在linux+gcc中的输出是:

str1=1234
str2=1234

在boost::serialization中,对string的load也是采用这样的hack方式,目的也是为了减少内存用量并且减少一次内存拷贝。使用boost::serialization的同志们需要注意,不要掉进这个陷阱!

我们可以看出,在msvc中,string拷贝时是真拷贝,而在gcc中,必定是用了引用计数+copy on write。str1和str2内部引用的是同一块内存。因为string.data()和string.c_str()都是const成员,所以不会有copy,只会增加引用计数。所以导致修改str2实际上也修改了str1。

c++标准甚至允许把const string的成员放入带写保护的内存区域中,或者把string的成员实际上存储在不相邻的内存块中,而仅在调用 string.data() 或 string.c_str() 时将数据拷贝到一块临时内存中然后返回,这块临时内存将在下一次调用string的一个非const成员函数时释放,如果目标平台真这样实现,往 string.data()中写数据就会导致更加微妙的错误。

 

前缀压缩词典

阅读更多关于《前缀压缩词典》

包含多个固定索引,一个可变索引,

固定索引使用一个内存池和一个数组保存项目在内存中的偏移,并且使用前缀压缩,使用空间最小(每个词条4个字节的索引空间)  

可变索引不压缩,并且可以动态插入词条,占用空间较大(每个词条20个字节的索引空间)

存储 1000 万个词,占用内存 100M 左右,平均每个词10个字节(包括了字符串空间和索引空间)。

接口采用 stl 容器的风格