`
kavy
  • 浏览: 891336 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Leader/Follower多线程网络模型介绍(转)

 
阅读更多

淘宝目前公开了其网络服务器源代码Tengine。根据官方介绍,Tengine是由淘宝网发起的Web服务器项目。它在Nginx的基础上,针对大访问量网站的需求,添加了很多高级功能和特性。Tengine的性能和稳定性已经在大型的网站如淘宝网,天猫商城等得到了很好的检验。它的最终目标是打造一个高效、稳定、安全、易用的Web平台。它们都采用了多线程非阻塞模式,并使用了LF模型。我最近整理了一下LF的相关资料,和大家分享一下。对淘宝开源的Tengine有兴趣的同学可以到这里checkout代码研究:http://code.taobao.org/svn/tengine/trunk

1、 引言

大家知道,多线程网络服务最简单的方式就是一个连接一个线程,这种模型当客户端连接数快速增长是就会出现性能瓶颈。当然,这时候,我们理所当然会考虑使用线程池,而任何池的使用,都会带来一个管理和切换的问题。 在java 1.4中引入了NIO编程模型,它采用了Reactor模式,或者说观察者模式,由于它的读写操作都是无阻塞的,使得我们能够只用一个线程处理所有的IO事件,这种处理方式是同步的。为了提高性能,当一个线程收到事件后,会考虑启动一个新的线程去处理,而自己继续等待下一个请求。这里可能会有性能问题,就是把工作交给别一个线程的时候的上下文切换,包括数据拷贝。今天向大家介绍一种Leader-Follower模型。

2、 基本思想

所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。

3、 原理分析

显然地,通过预先分配一个线程池,Leader/Follower设计避免了动态线程创建和销毁的额外开销。将线程放在一个自组织的池中,而且无需交换数据,这种方式将上下文切换、同步、数据移动和动态内存管理的开销都降到了最低。

 

不过,这种模式在处理短暂的、原子的、反复的和基于事件的动作上可以取得明显的性能提升,比如接收和分发网络事件或者向数据库存储大量数据记录。事件处理程序所提供的服务越多,其体积也就越大,而处理一个请求所需的时间越长,池中的线程占用的资源也就越多,同时也需要更多的线程。相应的,应用程序中其它功能可用的资源也就越少,从而影响到应用程序的总体性能、吞吐量、可扩展性和可用性。

在大多数LEADER/FOLLOWERS设计中共享的事件源封装在一个分配器组件中。如果在一个设计中联合使用了LEADER/FOLLOWERS和REACTOR事件处理基础设施,由reactor组件进行分发。封装事件源将事件分离和分派机制与事件处理程序隔离开来。每个线程有两个方法:一个是join方法,使用这个方法可以把新初始化的线程加入到池中。新加入的线程将自己的执行挂起到线程池监听者条件(monitor condition)上,并开始等待被提升为新的Leader。在它变成一个Leader之后,它便可以访问共享的事件源,等待执行下一个到来的事件。另一个是promote_new_leader方法,当前的Leader线程使用这个方法可以提升新的Leader,其做法是通过线程池监听者条件通知休眠的Follower。收到通知的Follower继续执行(resume)线程池的join方法,访问共享事件源,并等待下一个事件的到来。

4、 代码演示

首先用一段简单的代码演示一下整个角色转换的过程。由于同一时刻只有一个leader,用一个互斥量就可以解决了。每个线程一直在做如下4个步骤循环:

 

    public class WorkThread

    {

        public static Mutex mutex = new Mutex();

        public void start()

        {

            while (true)

            {

                // 等待成为leader

                waitToLeader();

                // 用select或epoll等方式等待消息处理

                simulateReactor();

                // 产生下一个leader

                promoteNewLeader();

                // 处理消息

                simulateDojob();

            }

        }

        private void simulateDojob()

        {

        }

        private void promoteNewLeader()

        {

            Console.WriteLine(Thread.CurrentThread.Name + ": Release leadership to others..");

            mutex.ReleaseMutex();

        }

        private void simulateReactor()

        {

            

        }

        private void waitToLeader()

        {

            Console.WriteLine(Thread.CurrentThread.Name + ": Waiting to be Leader..");

            mutex.WaitOne();

        }

    }

 

详细的代码可以参见附件。

5、 代码分析

接下来我们来看一下一个典型的开源代码实现:spserver。抄段官网的话,spserver 是一个实现了半同步/半异步(Half-Sync/Half-Async)和领导者/追随者(Leader/Follower) 模式的服务器框架,能够简化 TCP server 的开发工作。spserver 使用 c++ 实现,目前实现了以下功能:

Ø  封装了 TCP server 中接受连接的功能

Ø  使用非阻塞型I/O和事件驱动模型,由主线程负责处理所有 TCP 连接上的数据读取和发送,因此连接数不受线程数的限制

Ø  主线程读取到的数据放入队列,由一个线程池处理实际的业务

Ø  一个 http 服务器框架,即嵌入式 web 服务器

Spserver的每个版本都有一定的修改。早先版本V0.5还没有引入Leader/Follower模式,在V0.8版本中已经有了sp_lfserver。在V0.9版本中将其改为了sp_iocplfserver,引入了iocp完成端口的名字,但事实上之前版本已经使用了完成端口的技术。简单地说,iocp就是事件io操作由操作系统完成,完成后才由线程接收处理事件。先看一下代码,server启动以后开始监听,并将线程池启动起来。线程入口函数lfHandler一直在循环执行handleOneEvent:

 

int SP_LFServer :: run()

{

     int ret = 0;

     int listenFD = -1;

     ret = SP_IOUtils::tcpListen( mBindIP, mPort, &listenFD, 0 );

     if( 0 == ret ) {

         mThreadPool = new SP_ThreadPool( mMaxThreads );

         forint i = 0; i < mMaxThreads; i++ ) {

              mThreadPool->dispatch( lfHandler, this );

         }

     }

     return ret;

}

void SP_LFServer :: lfHandler( void * arg )

{

     SP_LFServer * server = (SP_LFServer*)arg;

     for( ; 0 == server->mIsShutdown; ) {

         server->handleOneEvent();

     }

}

 

接下来看一下handleOneEvent的处理,和上面的演示程序一样,先mutexlock争取leader权,然后去等待读、写事件,最后释放leadership给其它人,自己执行读完成事件处理函数task->run()或写事件的完成端口事件completionMessage,这个completionMessage会做一些清理工作,例如delete msg:

 

void SP_LFServer :: handleOneEvent()

{

     SP_Task * task = NULL;

     SP_Message * msg = NULL;

     pthread_mutex_lock( &mMutex );

     for( ; 0 == mIsShutdown && NULL == task && NULL == msg; ) {

         if( mEventArg->getInputResultQueue()->getLength() > 0 ) {

              task = (SP_Task*)mEventArg->getInputResultQueue()->pop();

         } else if( mEventArg->getOutputResultQueue()->getLength() > 0 ) {

              msg = (SP_Message*)mEventArg->getOutputResultQueue()->pop();

         }

         if( NULL == task && NULL == msg ) {

              event_base_loop( mEventArg->getEventBase(), EVLOOP_ONCE );

         }

     }

     pthread_mutex_unlock( &mMutex );

     if( NULL != task ) task->run();

     if( NULL != msg ) mCompletionHandler->completionMessage( msg );

}

 

6、 框架使用

和之前介绍的框架一样,采用spserver构建server非常快捷,如下,只要把SP_TestHandler里的几个处理事件实现即可。

 

class SP_TestHandler : public SP_Handler {

public:

     SP_ TestHandler (){}

     virtual ~SP_ TestHandler (){}

     virtual int start( SP_Request * request, SP_Response * response ) {}

     virtual int handle( SP_Request * request, SP_Response * response ) {}

     virtual void error( SP_Response * response ) {}

     virtual void timeout( SP_Response * response ) {}

     virtual void close() {}

};

 

class SP_TestHandlerFactory : public SP_HandlerFactory {

public:

     SP_ TestHandlerFactory () {}

     virtual ~SP_ TestHandlerFactory () {}

     virtual SP_Handler * create() const {

         return new SP_TestHandler();

     }

};

 

int main( int argc, char * argv[] )

{

     int port = 3333, maxThreads = 4, maxConnections = 20000;

     int timeout = 120, reqQueueSize = 10000;

     const char * serverType = "lf";

     SP_IocpLFServer server( "", port, new SP_TestHandlerFactory() );

     server.setTimeout( timeout );

     server.setMaxThreads( maxThreads );

     server.setMaxConnections( maxConnections );

     server.runForever();

     return 0;

}

 

    Spserver的代码可以在这里看到:http://spserver.googlecode.com/svn/trunk/spserver/。spserver同时实现了一个与leader/follower齐名的网络编程模型:HAHS,翻译为半异步半同步模型。本文暂不作介绍。

 

goldlevi

http://blog.csdn.net/goldlevi/article/details/7705180

分享到:
评论

相关推荐

    多线程池之领导者和跟随者模式/leader fllowers模式

    1. **复杂度增加**:相较于简单的单线程模型,引入多线程池和任务调度机制无疑增加了系统的复杂度。 2. **同步问题**:在多线程环境中,可能会出现数据同步的问题,需要额外的机制来保障数据的一致性。 #### 结论 ...

    spprocpool:UnixLinux 预分叉服务器库

    第一个使用描述符传递,第二个使用Leader/Follower 进程池,第三个使用多处理和多线程模型的组合。 包括一个通用的非服务器进程池,它可以在多线程环境中使用。 变更日志: 0.5 版的变化 (01.05.2007) 添加了一个...

    threadpool

    总的来说,理解并掌握线程池模型对于优化多线程程序的性能至关重要,特别是在处理高并发场景时。HSHA模型和Leader-Follower模型提供了两种不同的策略,可以根据具体需求选择合适的模型来提升系统效率。通过深入学习...

    最新版--Java+最常见的+200++面试题汇总+答案总结汇总.pdf

    在这篇文章中,我们将总结了 Java 面试中的 200 多个问题,涵盖了 Java 基础、容器、多线程、反射、对象拷贝、Java Web 、异常、网络、设计模式、Spring/Spring MVC、Spring Boot/Spring Cloud、Hibernate、MyBatis...

    kafka笔记1

    5. **Kafka配置参数**:Kafka 的配置参数如 `num.io.threads` 控制磁盘 I/O 线程数量,`socket.send.buffer.bytes` 和 `socket.receive.buffer.bytes` 设置套接字缓冲区大小,以优化网络传输效率。`log.retention....

    beansdb设计与实现

    #### 十二、线程模型 BeansDB采用了N个worker线程的模型,其中Leader/follower模式保证了线程之间的高效协同工作,同时通过写缓存的方式实现了后台定时或定量地将数据写入磁盘,减少了对数据文件的频繁访问,提高了...

    apache-zookeeper-3.5.6-bin.tar

    4. **分布式锁**: ZooKeeper可以用于实现分布式锁,避免多个进程或线程同时访问同一资源。 5. **领导选举**: 在分布式系统中,通过ZooKeeper可以进行领导者选举,确保只有一个节点成为领导者。 **Zookeeper的架构:...

    Kafka总结.docx

    1. **主题与分区**:Kafka 的数据组织基于主题,主题可以被划分为多个分区,每个分区有一个 Leader 节点和零个或多个 Follower 节点。生产者将消息发送到指定主题的分区,消费者从这些分区消费数据。 2. **副本与 ...

    0822分布式协调服务-zookeeper1

    2. **分布式锁**:提供读锁和写锁,保证多线程并发访问时的数据一致性。 3. **分布式队列**: 实现多个节点间的同步操作,如发布/订阅模型。 在实际部署和使用Zookeeper时,需要满足一定的环境条件,如JDK 1.7以上,...

    zookeeper-3.4.10.tar.gz

    Zookeeper集群由多个Server组成,分为follower、leader和observer三种角色。follower和observer负责接收客户端请求,而leader负责处理写操作和ZAB协议的协调。observer的角色是在不参与选举的情况下提供读服务,增加...

    zookeeper知识点总结.rar

    这些节点通过一个领导者(Leader)和多个跟随者(Follower)的模式运行,确保了数据的一致性。当有更新操作时,由客户端发送请求给Leader,Leader再向Follower广播,待大多数Follower确认后,更新才会生效。 三、...

    zookeeper-3.4.5

    4. 分布式锁:Zookeeper支持实现分布式锁,通过创建临时节点来实现锁的获取和释放,从而解决多线程或多进程间的并发控制问题。 三、Zookeeper的应用场景 1. 分布式协调:Zookeeper可以作为分布式系统中的协调器,...

    【DT-BigData】Zookeeper-3.4.5.gz

    4. 分布式锁:实现共享锁,确保多线程或分布式环境下的数据一致性。 5. 分区仲裁:在分布式环境中,Zookeeper可以用来决定哪个节点是活跃的,实现主节点选举。 五、Zookeeper 3.4.5新特性 Zookeeper 3.4.5版本相较...

    ZooKeeper-分布式过程协同技术详解.rar

    4. **分布式锁**:ZooKeeper可以实现分布式锁,确保在多线程或分布式环境下,对共享资源的访问有序且互斥。 5. **领导者选举**:ZooKeeper的ZAB协议可以用于分布式环境中的领导者选举,确保集群的高可用性。 **三...

    分布式锁演进过程.doc

    这是因为即使Leader节点宕机,由于大多数Follower节点已经收到了创建节点的消息,新的Leader节点在选举出来后,会继续完成节点的创建过程,从而保证了锁的一致性。 ### 结论 分布式锁是分布式系统中一个非常重要的...

    Java技术栈有空复习复习

    面试中可能会讨论其角色(如Leader、Follower、Observer)、数据模型(ZNode)和常用命令。 4. **Dubbo**:Dubbo是阿里巴巴开源的RPC框架,用于构建高性能、透明化的远程服务调用方案。面试中可能涉及服务注册与...

    SOFAJRaft是一个基于RAFT一致性算法的生产级高性能Java实现

    6. **线程模型**:采用高效的线程模型,如Reactor模式,来处理并发事件,提高系统吞吐量。 7. **日志压缩**:为了节省存储空间,SOFAJRaft还提供了日志压缩功能,减少无效数据的存储。 8. **模块化设计**:...

    笔记_zookeeper_源码.zip

    服务器集群中有一个领导者(Leader),其余为跟随者(Follower)。客户端的所有请求首先发送到Leader,由Leader负责协调一致性并转发给Followers。 2. **数据模型** Zookeeper的数据模型是一个层次化的命名空间,...

    大数据开发面试学习指南

    - **并发编程**:理解线程的基本概念,包括创建线程的方式、线程生命周期、同步机制(synchronized 关键字、ReentrantLock 等)、volatile 变量等。 #### MySQL MySQL 是关系型数据库管理系统中最常用的一种,其...

Global site tag (gtag.js) - Google Analytics