如果一个任务的执行分多个步骤,有些步骤慢,有些步骤快,如果在处理时间长的步骤上使用更多线程,那么因为队列的缓冲作用,在平均处理时间上,这些步骤就可以大致持平了,从而导致更大的吞吐量。
以前的 Pipeline 完全胜任这样的需求,但是,如果有一个这样的需求,考虑如下例子:
有若干篇文章(百万以上),需要对这些文章进行分析并索引,使用Pipeline,分成以下步骤:
步骤
|
平均耗费时间(每篇文章)
|
顺序
|
注释
|
一,读取文章
|
很快, 5毫秒
|
按文章ID顺序读取
|
|
二,分析文章
|
较慢, 30毫秒
|
处理顺序无所谓
|
|
三,索引文章
|
较快,8毫秒
|
按文章ID顺序建立索引
|
基于效率上的考虑,顺序建索引的速度快得多,实现上也更简单
|
为了平衡速度,在第二步可以使用多个线程,从而减少平均延迟时间,大略计算一下,需要4个线程,理论上整个流水线的平均延迟时间是8毫秒。
但是有个问题,在第二步,因为是多线程执行,会打乱第一步处理交给它的文档顺序,从而,第三步接收到的文档就不是按ID顺序的了。这是否就没办法了,就只能让第二步也使用单线程,而忍受30毫秒的平均延迟时间?
仔细分析一下,第三步收到的文章并不是完全乱序的,这种乱序只是局部的,可以在一个窗口内,对文章进行排序,然后按排过的顺序处理,这其实跟网络通讯中对数据包的排序是一个道理。只是,在这里,无法事先确定窗口大小。
另外还有一个问题,对过短的文章,垃圾文章,在第二步就丢弃了,不进入第三步的处理,从而导致第三步收到的文章ID不是连续的,这样,就不能使用文章ID作为排序依据,因为无法判断文章是被丢弃了,还是被阻塞在第二步。
所以,必须使用其它排序字段,处理起来也比较简单,只需要在第一步产生一个连续ID就可以了。在第三步的排序中,使用堆可以达到最好的性能。代码段如下:
// 省略....................
//
class FEBIRD_DLL_EXPORT PipelineQueueItem
{
public:
unsigned long plserial;
PipelineTask* task;
PipelineQueueItem(unsigned long plserial, PipelineTask* task)
: plserial(plserial)
, task(task)
{}
PipelineQueueItem()
: plserial(0)
, task(0)
{}
};
// 省略....................
//
namespace {
SAME_NAME_MEMBER_COMPARATOR_EX(Compare_plserial_greater, unsigned long,unsigned long, .plserial, std::greater<unsigned long>)
SAME_NAME_MEMBER_COMPARATOR_EX(Compare_plserial_less , unsigned long,unsigned long, .plserial, std::less <unsigned long>)
}
void PipelineStep::serial_step_do_mid(PipelineQueueItem& item)
{
m_out_queue->push(item);
}
void PipelineStep::serial_step_do_last(PipelineQueueItem& item)
{
if (item.task)
m_owner->destroyTask(item.task);
}
void PipelineStep::run_serial_step(int threadno,
void (PipelineStep::*fdo)(PipelineQueueItem&)
)
{
assert(ple_keep == m_pl_enum);
assert(m_threads.size() == 1);
std::vector<PipelineQueueItem> cache;
// unsigned nCached = 0;
m_plserial = 1;
while (isPrevRunning())
{
PipelineQueueItem item;
if (!m_prev->m_out_queue->pop(item, m_owner->m_queue_timeout))
continue;
#ifdef _DEBUG
assert(item.plserial >= m_plserial);
#else
if (item.plserial < m_plserial)
{
std::ostringstream oss;
oss << "fatal at: " << __FILE__ << ":" << __LINE__
<< ", function=" << BOOST_CURRENT_FUNCTION
<< ", item.plserial=" << item.plserial
<< ", m_plserial=" << m_plserial
;
throw std::runtime_error(oss.str());
}
#endif
if (item.plserial == m_plserial)
{Loop:
if (item.task)
process(0, &item);
(this->*fdo)(item);
++m_plserial;
if (!cache.empty() && (item = cache[0]).plserial == m_plserial)
{
std::pop_heap(cache.begin(), cache.end(), Compare_plserial_greater());
cache.pop_back();
goto Loop;
}
}
else // plserial out of order
{
cache.push_back(item);
std::push_heap(cache.begin(), cache.end(), Compare_plserial_greater());
项目地址:http://code.google.com/p/febird
分享到:
相关推荐
Java的多线程和并发特性使得实现这种管道架构变得更加便捷,可以有效地管理和调度多个过滤器的执行。 开源软件是PARC项目的重要属性,意味着其源代码对公众开放,允许任何人查看、使用、修改和分发。开源软件的这种...
1. **性能优化**:Unity2020.3引入了多项性能提升,例如C# Job System的改进,使得多线程处理更高效;IL2CPP的优化,提高了编译后的代码运行速度;以及图形渲染管线的改进,如Scriptable Render Pipeline(SRP)的...
9. **并发与并行测试**:Python的`concurrent.futures`模块或第三方库如`multiprocessing`可以实现多线程或多进程并发测试,以提高整体测试速度。 10. **测试报告**:生成详细的测试报告对于理解和改进测试流程非常...
- **更好的多线程支持**:增强了对多线程环境的支持,包括对上下文的管理,使得多线程渲染更加高效。 - **增强的调试工具**:提供了更强大的调试工具和选项,帮助开发者更快地定位和解决问题。 #### 三、版权与许可...
- **解决方案**:可以采用多线程或多进程编程技术来加速处理过程;还可以考虑将部分计算密集型任务外包给云服务提供商进行处理。 3. **错误检测与修复**:在处理过程中可能会遇到各种错误,比如文件损坏、格式不...
7. **多线程模型**:Tomcat 7改进了线程模型,减少了线程上下文切换,从而提高了整体性能。 8. **扩展性增强**:7.0版本增加了对WebSocket协议的支持,这是HTML5的一部分,允许双向通信,为实时Web应用提供了可能性...
- **Entity Component System (ECS)**:引入了 ECS 架构,这是一种基于组件的设计模式,能够显著提高游戏性能,特别是在大规模多线程处理场景下。 - **High Definition Render Pipeline (HDRP)**:为了满足高端游戏...
在`pipeline-locks-master`这个压缩包中,我们可以期待找到关于如何实现或使用管道锁的代码示例,可能包括C++、Java或其他支持多线程编程的语言。这些示例可能会演示如何在实际的管道或数据处理流水线中应用管道锁,...
- **Job System**:引入的Job System允许开发者将计算任务分解到多个线程,提高CPU利用率。 7. **XR(虚拟现实和增强现实)支持**: - **XR插件架构**:Unity 2020.3改进了对VR和AR设备的支持,提供了新的插件...
通过阅读 Netty 4.1 的源代码,我们可以学习到如何设计高效的网络框架,理解异步事件驱动模型的工作原理,以及如何优化多线程和内存管理。同时,还能了解到各种网络协议的实现细节,这对于提升网络编程技能和构建高...
1. **连接管理**:Jedis提供了一套完善的连接池机制,通过PooledJedisConnectionFactory来创建和管理连接,确保了多线程环境下的高效和安全。连接池可以设置最大连接数、最大空闲连接数、超时时间等参数,优化资源...
而Netty采用的EventLoop(事件循环)模型,通过一个或多个线程处理多个连接的IO事件,极大地减少了线程切换的开销,提高了效率。 其次,Netty的Pipeline(管道)机制也是其高性能的关键。Pipeline允许自定义处理链...
6. **线程模型**:介绍Netty的多线程模型,包括BossGroup、WorkerGroup以及EventLoop的工作原理。 7. **流式API**:讲解如何利用Netty的流式API简化编程,提高代码可读性和可维护性。 8. **心跳与空闲检测**:讨论...
3. **多线程支持**:为了提高游戏性能,Unity3D支持多线程编程。这在处理复杂计算任务时尤为重要,比如路径寻址和大规模单位控制。 #### 四、素材概述 根据描述中的信息,这些素材是在Unity3D 2019-2020版本下开发...
- **多线程支持**:能够处理复杂的多线程应用程序,确保每个线程的正确性和效率。 - **兼容性**:支持多种不同的MIPS架构版本,包括但不限于MIPS I、II、III、IV等。 ### 4. 使用SDE进行开发和调试的步骤 #### 4.1...
- **Ruby 2.0** 引入了垃圾收集(GC)的改进,增加了一对多线程支持,以及对 Fiber 的改进,用于实现轻量级并发。 - **Ruby 2.1** 开始,性能持续提升,引入了更高效的内存管理,并添加了方法引用和惰性枚举。 - **...
7. **线程局部变量**: 引入了线程局部变量,使得在多线程环境下存储局部状态变得更加安全。 Ruby不仅仅是一个编程语言,它还有丰富的生态系统,包括各种框架、库和工具。其中,Redis是一个非常流行的开源、高性能的...
- **脚本改进**:引入了 C# Job System 和 Entity Component System (ECS),这两大特性使得多线程编程变得更加简单高效。 - **编辑器体验**:增强了编辑器的可用性,比如改进了 Prefab 工作流程和场景视图,提升了...
这种设计允许 Netty 在多线程环境下高效地处理并发连接。 7. **Future 和 Promise**: Netty 提供了 Future 和 Promise 对象,用于异步操作的结果处理。它们使得编写非阻塞代码变得简单。 8. **HTTP/2 和 WebSocket...