多线程 Pipeline 的改进
如果一个任务的执行分多个步骤,有些步骤慢,有些步骤快,如果在处理时间长的步骤上使用更多线程,那么因为队列的缓冲作用,在平均处理时间上,这些步骤就可以大致持平了,从而导致更大的吞吐量。
以前的 Pipeline 完全胜任这样的需求,但是,如果有一个这样的需求,考虑如下例子:
有若干篇文章(百万以上),需要对这些文章进行分析并索引,使用Pipeline,分成以下步骤:
步骤 |
平均耗费时间(每篇文章) |
顺序 |
注释 |
一,读取文章 |
很快, 5毫秒 |
按文章ID顺序读取 |
|
二,分析文章 |
较慢, 30毫秒 |
处理顺序无所谓 |
|
三,索引文章 |
较快,8毫秒 |
按文章ID顺序建立索引 |
基于效率上的考虑,顺序建索引的速度快得多,实现上也更简单 |
为了平衡速度,在第二步可以使用多个线程,从而减少平均延迟时间,大略计算一下,需要4个线程,理论上整个流水线的平均延迟时间是8毫秒。
但是有个问题,在第二步,因为是多线程执行,会打乱第一步处理交给它的文档顺序,从而,第三步接收到的文档就不是按ID顺序的了。这是否就没办法了,就只能让第二步也使用单线程,而忍受30毫秒的平均延迟时间?
仔细分析一下,第三步收到的文章并不是完全乱序的,这种乱序只是局部的,可以在一个窗口内,对文章进行排序,然后按排过的顺序处理,这其实跟网络通讯中对数据包的排序是一个道理。只是,在这里,无法事先确定窗口大小。
另外还有一个问题,对过短的文章,垃圾文章,在第二步就丢弃了,不进入第三步的处理,从而导致第三步收到的文章ID不是连续的,这样,就不能使用文章ID作为排序依据,因为无法判断文章是被丢弃了,还是被阻塞在第二步。
所以,必须使用其它排序字段,处理起来也比较简单,只需要在第一步产生一个连续ID就可以了。在第三步的排序中,使用堆可以达到最好的性能。代码段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
// 省略.................... // class FEBIRD_DLL_EXPORT PipelineQueueItem { public: unsigned long plserial; PipelineTask* task; PipelineQueueItem(unsigned long plserial, PipelineTask* task) : plserial(plserial) , task(task) {} PipelineQueueItem() : plserial(0) , task(0) {} }; // 省略.................... // namespace { SAME_NAME_MEMBER_COMPARATOR_EX(Compare_plserial_greater, unsigned long,unsigned long, .plserial, std::greater<unsigned long>) SAME_NAME_MEMBER_COMPARATOR_EX(Compare_plserial_less , unsigned long,unsigned long, .plserial, std::less <unsigned long>) } void PipelineStep::serial_step_do_mid(PipelineQueueItem& item) { m_out_queue->push(item); } void PipelineStep::serial_step_do_last(PipelineQueueItem& item) { if (item.task) m_owner->destroyTask(item.task); } voidPipelineStep::run_serial_step(intthreadno, void(PipelineStep::*fdo)(PipelineQueueItem&) ) { assert(ple_keep==m_pl_enum); assert(m_threads.size()==1); std::vector<PipelineQueueItem>cache; //unsigned nCached = 0; m_plserial=1; while(isPrevRunning()) { PipelineQueueItemitem; if(!m_prev->m_out_queue->pop(item,m_owner->m_queue_timeout)) continue; #ifdef_DEBUG assert(item.plserial>=m_plserial); #else if (item.plserial < m_plserial) { std::ostringstream oss; oss << "fatal at: " << __FILE__ << ":" << __LINE__ << ", function=" << BOOST_CURRENT_FUNCTION << ", item.plserial=" << item.plserial << ", m_plserial=" << m_plserial ; throw std::runtime_error(oss.str()); } #endif if(item.plserial==m_plserial) {Loop: if(item.task) process(0,&item); (this->*fdo)(item); ++m_plserial; if(!cache.empty()&&(item=cache[0]).plserial==m_plserial) { std::pop_heap(cache.begin(),cache.end(),Compare_plserial_greater()); cache.pop_back(); gotoLoop; } } else// plserial out of order { cache.push_back(item); std::push_heap(cache.begin(),cache.end(),Compare_plserial_greater()); } } std::sort(cache.begin(),cache.end(),Compare_plserial_less()); for(std::vector<PipelineQueueItem>::iteratori=cache.begin();i!=cache.end();++i) { if(i->task) process(0,&*i); (this->*fdo)(*i); } // // boost::mutex::scoped_lock lock(*m_owner->getMutex()); // printf("run_serial: nCached=%u/n", nCached); } // 省略.................... |