`
febird
  • 浏览: 254234 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

多线程的 pipeline 设计模式

    博客分类:
  • C++
阅读更多

 

一个简单例子:

有很多个html网页,网页的id、title、url、path等信息存在一个数据库表中,网页内容存储在一个磁盘阵列上。现在要把所有网页都读出来,统计其中的html标签、正文等信息,并写入另一个数据库表,怎样的设计最好呢?

一般的想法是使用多个平行的线程,每个线程处理某个ID范围的网页。但是仔细分析就可以发现,对每个网页的处理可以分为以下处理步骤:

  1. 读取数据库行
  2. 读取文件内容
  3. 解析html,生成统计数据
  4. 将统计结果写入数据库

这几个处理步骤有各自的特征,读取数据库的时间一般主要消耗在数据库服务器响应,读取文件内容一般主要消耗在磁盘IO上,解析、统计消耗在计算上,写统计结果也消耗在数据库服务器响应上。如果我们为这几个过程建立各自的线程,每个任务通过消息队列来传递。就得到如下设计:

 

在这个设计中,每个处理过程可以根据需要设置不同的线程数,这个例子中,数据库不会是瓶颈,只剩下读文件和计算,如果文件IO够快(如果网页存在不同的阵列上),那么可以增加计算线程(服务器一般都是多CPU的)来达到平衡。

一些例子或许还会有更多的处理步骤。

可以从中得出一个设计模式,甚至可以直接写出实现框架的类:

pipeline.h

 

/* vim: set tabstop=4 : */

#ifndef __febird_pipeline_h__

#define __febird_pipeline_h__

 

#if defined(_MSC_VER) && (_MSC_VER >= 1020)

# pragma once

# pragma warning(push)

# pragma warning(disable: 4018)

# pragma warning(disable: 4267)

#endif

 

#include <vector>

#include <queue>

#include <string>

#include <boost/thread.hpp>

#include "../thread/ConcurrentQueue.h"

//#include "../thread/LockSentry.h"

//#include "../thread/thread.h"

 

namespace febird { namespace thread {

 

class PipelineTask

{

public:

   virtual ~PipelineTask();

};

 

class PipelineMultiTask : public PipelineTask

{

public:

// PipelineMultiTask(size_t size = 10);

   std::vector<PipelineTask*> m_tasks;

   virtual ~PipelineMultiTask();

};

 

class PipelineStep;

class PipelineThread;

class PipelineProcessor;

 

class PipelineStep

{

   friend class PipelineThread;

   friend class PipelineProcessor;

 

public:

   typedef ConcurrentQueue<std::queue<PipelineTask*> > queue_t;

 

protected:

   queue_t* m_out_queue;

 

   PipelineStep *m_prev, *m_next;

   PipelineProcessor* m_owner;

 

   std::vector<PipelineThread*> m_threads;

   bool m_batchProcess;

 

   void process_wrapper(int threadno, PipelineTask*& task);

 

   void run_wrapper(PipelineThread* pthread);

 

   void run_step_first(PipelineThread* pthread);

   void run_step_last(PipelineThread* pthread);

   void run_step_mid(PipelineThread* pthread);

 

   bool isPrevRunning();

   bool isRunning();

   void start(int queue_size);

   void join();

 

protected:

   virtual void process(int threadno, PipelineTask*& task) = 0;

 

   virtual void setup(int threadno);

   virtual void clean(int threadno);

 

   virtual void run(PipelineThread* pthread);

   virtual void onException(int threadno, const std::exception& exp);

 

public:

   std::string m_step_name;

 

   PipelineStep();

   PipelineStep(int thread_count, bool batchProcess = false);

 

   virtual ~PipelineStep();

 

   int step_ordinal() const;

   const std::string& err(int threadno) const;

 

   // helper functions:

   std::string msg_leading(int threadno) const;

   boost::mutex* getMutex() const;

   queue_t* getInQueue()  const { return m_prev->m_out_queue; }

   queue_t* getOutQueue() const { return m_out_queue; }

 

   void stop();

};

 

class FunPipelineStep : public PipelineStep

{

   boost::function3<void, PipelineStep*, int, PipelineTask*&> m_process; // take(this, threadno, task)

   boost::function2<void, PipelineStep*, int> m_setup; // take(this, threadno)

   boost::function2<void, PipelineStep*, int> m_clean; // take(this, threadno)

 

   void process(int threadno, PipelineTask*& task);

   void setup(int threadno);

   void clean(int threadno);

 

   void default_setup(int threadno);

   void default_clean(int threadno);

   static void static_default_setup(PipelineStep* self, int threadno);

   static void static_default_clean(PipelineStep* self, int threadno);

 

public:

   FunPipelineStep(int thread_count,

                const boost::function3<void, PipelineStep*, int, PipelineTask*&>& fprocess,

                const boost::function2<void, PipelineStep*, int>& fsetup,

                const boost::function2<void, PipelineStep*, int>& fclean);

   FunPipelineStep(int thread_count,

                const boost::function3<void, PipelineStep*, int, PipelineTask*&>& fprocess,

                const std::string& step_name = ""<span style=

分享到:
评论

相关推荐

    多线程设计模式——Pipeline(流水线)模式

    Pipeline(流水线)模式有时一些线程的步奏比较冗长,而且由于每个阶段的结果与下阶段的执行有关系,又不能分开。可以将任务的处理分解为若干个处理阶段,上一个阶段任务的结果交给下一个阶段来处理,这样每个线程的...

    java多线程设计模式详解(PDF及源码)

    《Java多线程设计模式详解》是一本深入探讨Java并发编程和设计模式的专业书籍,它涵盖了多线程环境下的各种核心概念、最佳实践以及常见问题的解决方案。这本书旨在帮助开发者在实际开发过程中更好地理解和应用多线程...

    java多线程设计模式详解+源码

    Java多线程设计模式是Java开发中不可或缺的一部分,它涉及到并发编程的核心理论和技术。在Java中,多线程用于提高程序的执行效率,通过同时执行多个任务来利用系统资源。本资料包包含“java多线程设计模式详解”文档...

    Netty+设计模式

    《Netty+设计模式》这一主题涵盖了两个关键领域:Netty框架和设计模式。Netty是Java领域中的一款高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络服务。设计模式则是软件工程中解决常见问题...

    pipeline:goast 的管道并发模式

    - 可能包含多个文件,如 `pipeline.go`,展示了一个基本的管道实现,包括创建 channel、启动 goroutine 以及数据的发送和接收。 - 可能有其他文件如 `fanout.go` 和 `fanin.go`,分别演示了扇出和扇入的实现。 - ...

    CalculatricePipeAndFilter_java_pipeline_

    在IT行业中,"CalculatricePipeAndFilter_java_pipeline_"这一标题暗示了一个使用Java实现的计算器应用,它采用了管道和过滤器(Pipe and Filter)设计模式。这个设计模式是一种常见于处理流水线系统的架构,尤其...

    pipeline_example:pipeline_example

    3. **多线程与并发**:为了提高处理性能,pipeline可能涉及到多线程或并发处理。Java提供了ExecutorService、Thread、synchronized关键字等工具来处理并发问题。 4. **设计模式**:如工厂模式、观察者模式等,可以...

    SOPHON算能盒子SE-16中c++版本pipeline的环境配置,中已经编译完成的算能安装sdk环境

    2. 使用多线程:充分利用SOPHON SE-16的多核能力,考虑使用线程池或并发编程模型来并行处理pipeline的不同阶段。 3. 错误处理和调试:在pipeline中加入适当的错误检测和处理机制,同时,考虑到交叉编译环境下调试的...

    python使用pipeline批量读写redis的方法

    如文中提到,批量取3500条数据大约只需900毫秒,如果结合多线程或协程,每秒处理1万条数据是完全可行的,能够满足大多数业务场景的需求。 总的来说,理解并善用Python中的Redis pipeline是优化大规模数据操作的关键...

    基于Qt的多线程流水线异步服务器稳定版

    数据库被作为资源管理,支持在多线程的条件下,使用数据库资源。 5、 框架界面。尽管常见的服务运行时表现为一个后台进程,但为了更好的演示服务器的功能,避免繁琐的配置,还是需要一个图形界面来显示状态、设置...

    基于eventloop的java非阻塞网络库,实现了事件驱动,无锁的基于最小堆的定时器,便于扩展的pipeline机.zip

    Pipeline是网络库中的重要设计模式,它允许我们将数据的处理过程分解为一系列有序的阶段(Handler)。每个阶段负责特定的处理任务,如解码、业务逻辑处理、编码等。数据在Pipeline中从一端传递到另一端,各个阶段...

    redis通过pipeline提升吞吐量的方法

    通过设置`usePipeline`为`true`,启用Pipeline模式。线程池配置(`JedisPoolConfig`)用于管理连接,确保有足够的资源处理并发任务。`DemoTask`类负责执行具体的键写入操作,通过Pipeline与否来对比性能差异。 5. **...

    Pipeline Architecture-开源

    管道架构(Pipeline Architecture),也称为管道过滤器模型,是一种设计模式,常用于构建高效的数据处理系统。在PARC(Pipeline Architecture Project)项目中,这种架构被用于实现一个基于Java的高性能批处理框架。...

    分布式网络爬虫设计毕业设计.pdf

    在本设计中,使用NN_PIPELINE模式,保证数据按顺序单向流动,从一个线程到另一个线程,确保爬取和解析的顺序性。 8. **线程池**:使用线程池来管理分析线程,通过`create_threadpool`初始化线程池,`dispatch`函数...

    03-04-06-大名鼎鼎的EventLoop1

    Reactor模型分为单线程模型、多线程模型以及主从多线程模型。单线程模型中,Acceptor和Handler都在同一线程中处理,但这种模型容易受阻塞操作影响,导致整体性能下降。多线程模型则将Acceptor和Handler的处理分离,...

    pipeline_project

    在"pipeline_project"中,开发者可能使用了Java的并发和多线程功能来并行处理任务,提高效率。此外,Java的标准库如`java.util.concurrent`包提供了许多工具类和接口,如ExecutorService和Future,帮助管理线程池和...

    搜狗实验室技术交流文档C10K问题

    C10K问题是指在设计网络服务时,当单个服务器需要同时处理成千上万(10K表示10,000)个客户端连接时,传统的多线程模型和select模型面临性能瓶颈和效率低下的挑战。这个问题的关键在于,随着并发连接数的增加,...

    Pattern-Oriented Software Architecture 全集(vol1到vol5)

    它介绍了一系列适用于多线程、分布式计算和网络通信的模式,如Observer模式用于事件通知,Proxy模式用于远程对象访问,以及Pipeline/Filter模式用于数据处理流水线。 第二卷《Patterns for Enterprise Application ...

    JAVA核心面试知识整理【书签完整】【高清可复制】

    2. **Java多线程**:Java提供了丰富的多线程支持,包括线程的创建与启动、同步机制(synchronized、Lock等)、并发工具类(ExecutorService、Semaphore、CyclicBarrier等)以及线程安全问题的处理。理解和熟练运用...

Global site tag (gtag.js) - Google Analytics