`
bachmozart
  • 浏览: 111684 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Leader Follower线程模型简单实现

    博客分类:
  • Java
阅读更多
在我们编写网络服务程序时,比较简单的方式是per client per thread模型,这种模型当客户端连接数快速增长是就会出现性能瓶颈,我们不能不断的开启新的线程,当然我们肯定是会使用线程池,但是线程的管理和频繁的线程调度也会影响性能.

java 1.4给我们带来了NIO编程模型,由于它的读写操作都是无阻塞的,这样使我们能够只用一个线程处理所有的IO事件,当然我们不会真的只用一个线程来处理,比较常见的编写NIO网络服务程序的模型是半同步-半异步模式,其实现原理大体上是单线程同步处理网络IO请求,当有请求到达时,将该请求放入一个工作队列中,由另外的线程处理,主线程继续等待新的网络IO请求,这种编程模型的缺点主要是:
1.使用工作队列带来的内存的动态分配问题
2.每次网络IO请求,总是分配给另外一个线程处理,这样频繁的线程的context switching也会 影响性能

解决的方案是使用Leader-Follower线程模型,它的基本思想是所有的线程被分配成两种角色:
Leader和Follower,一般同时只有一个Leader线程,所有的Follower线程排队等待成为Leader线程,线程池启动时自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程,并将其提拔为新的Leader,然后自己去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待重新成为Leader.

这个线程模型主要解决了内存的动态分配问题,我们不需要不断的将到来的网络IO事件放入队列,并且等待网络IO事件的线程在等到网络事件的发生后,是自己处理的这个事件,也就是说没有context switching的过程.

下面是简单的代码演示

首先定义我们的事件模型Event
public interface Event {
	SelectableChannel getChannel();
}

它的实现:
public class DefaultEventImpl implements Event {
	private SelectableChannel channel;

	public DefaultEventImpl(SelectableChannel channel) {
		this.channel = channel;
	}

	@Override
	public SelectableChannel getChannel() {
		return this.channel;
	}
}


然后定义我们的EventHandler
public interface EventHandler {
	/**
	 * this can be blocked
	 * 
	 * @return Event
	 */
	Event pollEvent();

	void handleEvent(Event e);
}


其中的pollEvent()方法会阻塞,当有网络请求时返回一个我们封装的Event对象,其实里面大体上就是select()方法

接下来是我们的核心部分,Leader-Follower线程池的定义
public interface LFThreadPool {
	void start();

	void promoteLeader();

	void waitToBeLeader();
}

通过start方法启动线程池,然后调用一次promoteLeader产生了一个Leader线程等待网络事件的到达,其余线程则是waitToBeLeader的状态.

我们还定义了一个Worker线程模型

public interface WorkerThread extends Runnable {
	void start();
}


start方法主要是启动所有的workers,其实觉得可以省去这个接口的


然后看一下我们的worker thread实现
public class DefaultWorkerThread implements WorkerThread {
	private LFThreadPool pool;
	private EventHandler handler;
	private boolean isActive;

	public DefaultWorkerThread(DefaultLFThreadPoolImpl pool,
			EventHandler handler) {
		this.pool = pool;
		this.handler = handler;
	}

	@Override
	public void run() {
		while (isActive) {
			this.pool.waitToBeLeader();
           //阻塞等待pollEvent返回待处理的事件,此处等待的是Leader线程
			Event event = handler.pollEvent();
           //notify 唤醒一个正在等待的线程成为leader,由它来等待新的
           //网络事件到达,自己则去处理当前的IO事件
			this.pool.promoteLeader();

			handler.handleEvent(event);
           //处理完毕后you重新waitToBeLeader等待成为Leader
		}
	}

	@Override
	public synchronized void start() {
		if (!isActive) {
			isActive = true;
		}
		new Thread(this).start();
	}

}


最后是我们的线程池的实现部分,看一下LFThreadPool的实现
public class DefaultLFThreadPoolImpl implements LFThreadPool {
	private WorkerThread[] workers;
	private boolean isActive;
	private Object semaphore = new Object();

	public DefaultLFThreadPoolImpl(int poolSize, EventHandler handler) {
		workers = new DefaultWorkerThread[poolSize];
		for (int i = 0; i < poolSize; i++) {
			workers[i] = new DefaultWorkerThread(this, handler);
		}
	}

	@Override
	public synchronized void start() {

		if (!isActive) {
           //启动所有的workers
			for (int i = 0; i < workers.length; i++) {
				workers[i].start();
			}
            //保证所有的workers已经启动
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
            //产生一个Leader线程
			promoteLeader();

			isActive = true;
		}
	}

	@Override
	public void promoteLeader() {
		synchronized (semaphore) {
			semaphore.notify();
		}
	}

	@Override
	public void waitToBeLeader() {
		synchronized (semaphore) {
			try {
				semaphore.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}





整个过程中没有动态内存分配,也没有频繁的context switching,应该可以提高部分效率,后续会发出完整的程序,欢迎大家指正.


分享到:
评论
2 楼 anranran 2011-08-10  
Event event = handler.pollEvent();  
这地方就有线程切换
1 楼 anranran 2011-08-10  
好文,代码不全,可否补全

相关推荐

    多线程池之领导者和跟随者模式/leader fllowers模式

    1. **复杂度增加**:相较于简单的单线程模型,引入多线程池和任务调度机制无疑增加了系统的复杂度。 2. **同步问题**:在多线程环境中,可能会出现数据同步的问题,需要额外的机制来保障数据的一致性。 #### 结论 ...

    threadpool

    标题中提到的"threadpool"直译为“线程池”,而描述中提到了两种特定的线程池模型:半同步半异步(Half-Synchronous Half-Asynchronous,简称HSHA)模型和Leader-Follower模型。这两种模型都是为了优化线程池的性能...

    SOFAJRaft是一个基于RAFT一致性算法的生产级高性能Java实现

    6. **线程模型**:采用高效的线程模型,如Reactor模式,来处理并发事件,提高系统吞吐量。 7. **日志压缩**:为了节省存储空间,SOFAJRaft还提供了日志压缩功能,减少无效数据的存储。 8. **模块化设计**:...

    IOCP完成端口原理

    通常与Leader-Follower模式结合使用,但仍存在可扩展性问题。 3. IOCP:是实现高并发服务器的最佳选择,能够提供高吞吐量和良好的可扩展性。 三、IOCP的关键概念 1. 服务吞吐量:IOCP的异步特性使得服务器能够在...

    beansdb设计与实现

    BeansDB采用了N个worker线程的模型,其中Leader/follower模式保证了线程之间的高效协同工作,同时通过写缓存的方式实现了后台定时或定量地将数据写入磁盘,减少了对数据文件的频繁访问,提高了系统的稳定性和性能。...

    zookeeper知识点总结.rar

    4. 分布式锁:通过创建临时节点,可以实现线程级别的互斥访问控制。 5. 领导选举:在分布式环境中,Zookeeper可以帮助选举出集群的领导者。 五、Zookeeper的应用场景 1. HBase:HBase依赖Zookeeper进行元数据管理,...

    Kafka总结.docx

    1. **主题与分区**:Kafka 的数据组织基于主题,主题可以被划分为多个分区,每个分区有一个 Leader 节点和零个或多个 Follower 节点。生产者将消息发送到指定主题的分区,消费者从这些分区消费数据。 2. **副本与 ...

    笔记_zookeeper_源码.zip

    服务器集群中有一个领导者(Leader),其余为跟随者(Follower)。客户端的所有请求首先发送到Leader,由Leader负责协调一致性并转发给Followers。 2. **数据模型** Zookeeper的数据模型是一个层次化的命名空间,...

    分布式锁演进过程.doc

    ZooKeeper的核心特性之一是其强大的一致性模型,通过ZAB协议(ZooKeeper Atomic Broadcast Protocol)实现了数据的一致性。 在使用ZooKeeper实现分布式锁时,客户端首先向服务器发起创建节点的请求。ZooKeeper...

    kafka笔记1

    leader 负责接收生产者发送的消息和响应消费者请求,而 follower 实时同步 leader 的数据。如果 leader 故障,一个 follower 将被选为新的 leader,从而确保服务的连续性。 5. **Kafka配置参数**:Kafka 的配置参数...

    01-JAVA岗位笔试题(A卷)附答案

    `ThreadLocal`的实现依赖于`Thread`类中的一个**ThreadLocalMap**,该映射用于存储每个线程与其对应ThreadLocal变量实例之间的映射关系。 #### 四、ConcurrentHashMap的数据结构 `ConcurrentHashMap`是Java提供的一...

    zookeeper-3.4.10.tar.gz

    Zookeeper基于 zab 协议实现强一致性,并采用类文件系统的数据模型,使得操作简单直观。 二、Zookeeper 3.4.10 版本特性 1. 性能优化:3.4.10 版本对性能进行了大量优化,包括客户端连接、数据同步、请求处理等...

    ZooKeeper-分布式过程协同技术详解.rar

    4. **分布式锁**:ZooKeeper可以实现分布式锁,确保在多线程或分布式环境下,对共享资源的访问有序且互斥。 5. **领导者选举**:ZooKeeper的ZAB协议可以用于分布式环境中的领导者选举,确保集群的高可用性。 **三...

    【DT-BigData】Zookeeper-3.4.5.gz

    Zookeeper是一个分布式的、开放源码的分布式应用程序协调服务,它提供了一种简单有效的机制来管理分布式系统中的数据和配置信息,实现服务发现、命名服务、分布式同步等任务。Zookeeper的设计目标是简化分布式系统的...

    apache-zookeeper-3.5.6-bin.tar

    ZooKeeper的设计目标是简单且高效,它基于一个简单的数据模型,使得它易于理解和使用。在这个模型中,数据存储在称为znodes的节点中,znodes可以有子znodes,类似于文件系统的结构。 **Zookeeper的核心概念:** 1. ...

    zookeeper-3.4.5

    4. 分布式锁:Zookeeper支持实现分布式锁,通过创建临时节点来实现锁的获取和释放,从而解决多线程或多进程间的并发控制问题。 三、Zookeeper的应用场景 1. 分布式协调:Zookeeper可以作为分布式系统中的协调器,...

    spprocpool:UnixLinux 预分叉服务器库

    第一个使用描述符传递,第二个使用Leader/Follower 进程池,第三个使用多处理和多线程模型的组合。 包括一个通用的非服务器进程池,它可以在多线程环境中使用。 变更日志: 0.5 版的变化 (01.05.2007) 添加了一个...

    最新版--Java+最常见的+200++面试题汇总+答案总结汇总.pdf

    * Zookeeper 的架构:Leader 节点、Follower 节点等 * Zookeeper 的应用场景:分布式锁、配置中心等 MySQL 知识点: * MySQL 的存储引擎:InnoDB、MyISAM 等 * MySQL 的索引类型:B-Tree 索引、Hash 索引等 Redis...

    0822分布式协调服务-zookeeper1

    3. **分布式队列**: 实现多个节点间的同步操作,如发布/订阅模型。 在实际部署和使用Zookeeper时,需要满足一定的环境条件,如JDK 1.7以上,Ant构建工具,以及IDEA等开发环境。源代码可以在Apache官方GitHub仓库...

    ZooKeeper笔记.pdf

    - **zoo.cfg**:Zookeeper的配置文件,其中`tickTime`是基本时间单位,`initLimit`定义了follower与leader同步的时限,`syncLimit`定义了follower与leader通信超时时间。`dataDir`和`dataLogDir`分别用于存储数据和...

Global site tag (gtag.js) - Google Analytics