- 浏览: 254234 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
satan_1st:
据说可以用(*this)取得当前的lambda。
Recursive Lambda in C++ -
febird:
微软不死, 天理难容
再抱怨一下Windows7 -
sunzixun:
很有见解ace也可以精简的,我们就用阉割版的
asio/ACE/apr/libevent -
febird:
hamo2008 写道用win7以来基本上没有遇到这种情况了我 ...
造 windows 的微软很脑残 -
hamo2008:
用win7以来基本上没有遇到这种情况了
造 windows 的微软很脑残
一个简单例子:
有很多个html网页,网页的id、title、url、path等信息存在一个数据库表中,网页内容存储在一个磁盘阵列上。现在要把所有网页都读出来,统计其中的html标签、正文等信息,并写入另一个数据库表,怎样的设计最好呢?
一般的想法是使用多个平行的线程,每个线程处理某个ID范围的网页。但是仔细分析就可以发现,对每个网页的处理可以分为以下处理步骤:
- 读取数据库行
- 读取文件内容
- 解析html,生成统计数据
- 将统计结果写入数据库
这几个处理步骤有各自的特征,读取数据库的时间一般主要消耗在数据库服务器响应,读取文件内容一般主要消耗在磁盘IO上,解析、统计消耗在计算上,写统计结果也消耗在数据库服务器响应上。如果我们为这几个过程建立各自的线程,每个任务通过消息队列来传递。就得到如下设计:
在这个设计中,每个处理过程可以根据需要设置不同的线程数,这个例子中,数据库不会是瓶颈,只剩下读文件和计算,如果文件IO够快(如果网页存在不同的阵列上),那么可以增加计算线程(服务器一般都是多CPU的)来达到平衡。
一些例子或许还会有更多的处理步骤。
可以从中得出一个设计模式,甚至可以直接写出实现框架的类:
pipeline.h
/* vim: set tabstop=4 : */
#ifndef __febird_pipeline_h__
#define __febird_pipeline_h__
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
# pragma once
# pragma warning(push)
# pragma warning(disable: 4018)
# pragma warning(disable: 4267)
#endif
#include <vector>
#include <queue>
#include <string>
#include <boost/thread.hpp>
#include "../thread/ConcurrentQueue.h"
//#include "../thread/LockSentry.h"
//#include "../thread/thread.h"
namespace febird { namespace thread {
class PipelineTask
{
public:
virtual ~PipelineTask();
};
class PipelineMultiTask : public PipelineTask
{
public:
// PipelineMultiTask(size_t size = 10);
std::vector<PipelineTask*> m_tasks;
virtual ~PipelineMultiTask();
};
class PipelineStep;
class PipelineThread;
class PipelineProcessor;
class PipelineStep
{
friend class PipelineThread;
friend class PipelineProcessor;
public:
typedef ConcurrentQueue<std::queue<PipelineTask*> > queue_t;
protected:
queue_t* m_out_queue;
PipelineStep *m_prev, *m_next;
PipelineProcessor* m_owner;
std::vector<PipelineThread*> m_threads;
bool m_batchProcess;
void process_wrapper(int threadno, PipelineTask*& task);
void run_wrapper(PipelineThread* pthread);
void run_step_first(PipelineThread* pthread);
void run_step_last(PipelineThread* pthread);
void run_step_mid(PipelineThread* pthread);
bool isPrevRunning();
bool isRunning();
void start(int queue_size);
void join();
protected:
virtual void process(int threadno, PipelineTask*& task) = 0;
virtual void setup(int threadno);
virtual void clean(int threadno);
virtual void run(PipelineThread* pthread);
virtual void onException(int threadno, const std::exception& exp);
public:
std::string m_step_name;
PipelineStep();
PipelineStep(int thread_count, bool batchProcess = false);
virtual ~PipelineStep();
int step_ordinal() const;
const std::string& err(int threadno) const;
// helper functions:
std::string msg_leading(int threadno) const;
boost::mutex* getMutex() const;
queue_t* getInQueue() const { return m_prev->m_out_queue; }
queue_t* getOutQueue() const { return m_out_queue; }
void stop();
};
class FunPipelineStep : public PipelineStep
{
boost::function3<void, PipelineStep*, int, PipelineTask*&> m_process; // take(this, threadno, task)
boost::function2<void, PipelineStep*, int> m_setup; // take(this, threadno)
boost::function2<void, PipelineStep*, int> m_clean; // take(this, threadno)
void process(int threadno, PipelineTask*& task);
void setup(int threadno);
void clean(int threadno);
void default_setup(int threadno);
void default_clean(int threadno);
static void static_default_setup(PipelineStep* self, int threadno);
static void static_default_clean(PipelineStep* self, int threadno);
public:
FunPipelineStep(int thread_count,
const boost::function3<void, PipelineStep*, int, PipelineTask*&>& fprocess,
const boost::function2<void, PipelineStep*, int>& fsetup,
const boost::function2<void, PipelineStep*, int>& fclean);
FunPipelineStep(int thread_count,
const boost::function3<void, PipelineStep*, int, PipelineTask*&>& fprocess,
const std::string& step_name = ""<span style=
发表评论
-
对数复杂度的聚集算法
2010-08-05 18:24 716SQL 有5个标准聚集函数:SUM, AVG, MIN, MA ... -
使用 std::map 查找 IP 范围
2010-08-05 17:40 1995给定这样一个问题: 有一组从IP范围到地理位置信息的数据 ... -
tcmalloc 要点
2010-02-22 11:33 1484google.tcmalloc 线程缓存(th ... -
my PipelineProcessor
2010-02-01 15:43 712刚看到,intel tbb::pipeline 实现的功能,和 ... -
简单的代码生成器创建领域语言
2010-02-01 14:30 1229有一类问题,代码模板相同,但有少部分地方不同,一般可以写一个复 ... -
理想的 lazy load
2010-01-12 20:21 757在linux中碰到一个问题,可以描述如下 编译 liba.s ... -
字符串基数排序
2009-12-09 22:00 1810对字符串使用基数排序,以前,我一直觉得:因为字符串的长度不 ... -
内嵌变长数据结构范例——trbstrmap<mapped>
2009-11-03 17:43 800以前的这篇文章介绍了嵌入的变长数据结构(embeded ... -
memory pool 的高效实现
2009-10-23 02:40 2776memory pool malloc可以分配任意大小的内存, ... -
memory pool 的高效实现(代码)
2009-10-23 03:01 6748mpool.h #ifndef __febird_c_mpo ... -
可变长度数据结构
2009-09-27 17:12 1066固定长度的数据结构很简单,大家每天都在用。 可变长度数据结构 ... -
malloc/free 的开销,如何去掉这种开销?
2009-10-18 15:58 1846一般的malloc实现,对一块已分配的内存,都有两个机器字的簿 ... -
世界上应用最广泛的虚拟机是啥?
2009-10-18 16:19 859别说是JavaVM! 正确答案:x86vm x86本身是一 ... -
memory FILE in C
2009-06-30 17:18 976<script>function StorePag ... -
很基本也很诡异的fread, feof
2009-07-03 18:12 2993先看一段非常简单的程序: #include & ... -
Threaded Red-Black Tree 线索红黑树
2009-05-26 19:19 866项目地址:http://code.google.com/p/f ... -
C 语言实现的 stl-like 算法
2009-04-08 10:46 1031使用类似BOOST.PP技巧,自动生成代码,效率上小胜stl, ... -
使用" 参数化基类" 和" 成员函数指针" 模拟实现虚函数--在实际中的应用
2005-12-02 23:01 1149// 使用" 参数化基类" 和" ... -
千万注意,不要 hack std::string
2007-02-25 14:49 1059前段时间被一个bug折磨了两个星期,最后发现竟然是如此一个陷阱 ... -
一个很强大的Comparator生成器
2008-04-22 15:02 689项目地址:http://code.goog ...
相关推荐
Pipeline(流水线)模式有时一些线程的步奏比较冗长,而且由于每个阶段的结果与下阶段的执行有关系,又不能分开。可以将任务的处理分解为若干个处理阶段,上一个阶段任务的结果交给下一个阶段来处理,这样每个线程的...
《Java多线程设计模式详解》是一本深入探讨Java并发编程和设计模式的专业书籍,它涵盖了多线程环境下的各种核心概念、最佳实践以及常见问题的解决方案。这本书旨在帮助开发者在实际开发过程中更好地理解和应用多线程...
Java多线程设计模式是Java开发中不可或缺的一部分,它涉及到并发编程的核心理论和技术。在Java中,多线程用于提高程序的执行效率,通过同时执行多个任务来利用系统资源。本资料包包含“java多线程设计模式详解”文档...
《Netty+设计模式》这一主题涵盖了两个关键领域:Netty框架和设计模式。Netty是Java领域中的一款高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络服务。设计模式则是软件工程中解决常见问题...
- 可能包含多个文件,如 `pipeline.go`,展示了一个基本的管道实现,包括创建 channel、启动 goroutine 以及数据的发送和接收。 - 可能有其他文件如 `fanout.go` 和 `fanin.go`,分别演示了扇出和扇入的实现。 - ...
在IT行业中,"CalculatricePipeAndFilter_java_pipeline_"这一标题暗示了一个使用Java实现的计算器应用,它采用了管道和过滤器(Pipe and Filter)设计模式。这个设计模式是一种常见于处理流水线系统的架构,尤其...
3. **多线程与并发**:为了提高处理性能,pipeline可能涉及到多线程或并发处理。Java提供了ExecutorService、Thread、synchronized关键字等工具来处理并发问题。 4. **设计模式**:如工厂模式、观察者模式等,可以...
2. 使用多线程:充分利用SOPHON SE-16的多核能力,考虑使用线程池或并发编程模型来并行处理pipeline的不同阶段。 3. 错误处理和调试:在pipeline中加入适当的错误检测和处理机制,同时,考虑到交叉编译环境下调试的...
如文中提到,批量取3500条数据大约只需900毫秒,如果结合多线程或协程,每秒处理1万条数据是完全可行的,能够满足大多数业务场景的需求。 总的来说,理解并善用Python中的Redis pipeline是优化大规模数据操作的关键...
数据库被作为资源管理,支持在多线程的条件下,使用数据库资源。 5、 框架界面。尽管常见的服务运行时表现为一个后台进程,但为了更好的演示服务器的功能,避免繁琐的配置,还是需要一个图形界面来显示状态、设置...
Pipeline是网络库中的重要设计模式,它允许我们将数据的处理过程分解为一系列有序的阶段(Handler)。每个阶段负责特定的处理任务,如解码、业务逻辑处理、编码等。数据在Pipeline中从一端传递到另一端,各个阶段...
通过设置`usePipeline`为`true`,启用Pipeline模式。线程池配置(`JedisPoolConfig`)用于管理连接,确保有足够的资源处理并发任务。`DemoTask`类负责执行具体的键写入操作,通过Pipeline与否来对比性能差异。 5. **...
管道架构(Pipeline Architecture),也称为管道过滤器模型,是一种设计模式,常用于构建高效的数据处理系统。在PARC(Pipeline Architecture Project)项目中,这种架构被用于实现一个基于Java的高性能批处理框架。...
在本设计中,使用NN_PIPELINE模式,保证数据按顺序单向流动,从一个线程到另一个线程,确保爬取和解析的顺序性。 8. **线程池**:使用线程池来管理分析线程,通过`create_threadpool`初始化线程池,`dispatch`函数...
Reactor模型分为单线程模型、多线程模型以及主从多线程模型。单线程模型中,Acceptor和Handler都在同一线程中处理,但这种模型容易受阻塞操作影响,导致整体性能下降。多线程模型则将Acceptor和Handler的处理分离,...
在"pipeline_project"中,开发者可能使用了Java的并发和多线程功能来并行处理任务,提高效率。此外,Java的标准库如`java.util.concurrent`包提供了许多工具类和接口,如ExecutorService和Future,帮助管理线程池和...
C10K问题是指在设计网络服务时,当单个服务器需要同时处理成千上万(10K表示10,000)个客户端连接时,传统的多线程模型和select模型面临性能瓶颈和效率低下的挑战。这个问题的关键在于,随着并发连接数的增加,...
它介绍了一系列适用于多线程、分布式计算和网络通信的模式,如Observer模式用于事件通知,Proxy模式用于远程对象访问,以及Pipeline/Filter模式用于数据处理流水线。 第二卷《Patterns for Enterprise Application ...
2. **Java多线程**:Java提供了丰富的多线程支持,包括线程的创建与启动、同步机制(synchronized、Lock等)、并发工具类(ExecutorService、Semaphore、CyclicBarrier等)以及线程安全问题的处理。理解和熟练运用...