`
berdy
  • 浏览: 512412 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Netty 4 源码分析——EventExecutor

阅读更多
先从EventExecutor开始,因为它是一个很基础的工具类,是对I/O线程的包装。先了解下它的源码会对后面的分析有更好的理解。



先看下EventExecutor的类关系图,这里只是简单的画出了类和接口的继承和实现关系,还有其他的聚合关系没有画出来,为的是便于分析思路的清晰。

说到Executor,很容易联想到jdk中 java.util.concurrent.Executor 接口,这个接口非常简单,就一个方法
void execute(Runnable command);

从方法签名上就能看出这个是为了支持异步模式的。command表示一个命令。当前线程就是命令者角色,Executor内部的去运行Runnable的线程就是执行者。这里没有提供明确的地方供命令者去取得命令的执行结果。

ExecutorService 继承了Executor 接口,增加了对自身生命周期管理的方法,同时提供了一个Future给命令者去获取命令的执行结果。

ScheduledExecutorService 继承了ExecutorService接口,增加了对定时任务的支持。

EventExecutorGroup 继承了ScheduledExecutorService接口,对原来的ExecutorService的关闭接口提供了增强,提供了优雅的关闭接口。从接口名称上可以看出它是对多个EventExecutor的集合,提供了对多个EventExecutor的迭代访问接口。

EventExecutor 继承EventExecutorGroup 看着这个关系真心有些纠结啊。不过细想下还是能理解的。A是B中的一员,但是A也能迭代访问B中的其他成员。这个继承关系支持了迭代访问这个行为。自然的他提供了一个parent接口,来获取所属的EventExecutorGroup 。另外提供了inEventLoop 方法支持查询某个线程是否在EventExecutor所管理的线程中。还有其他一些创建Promise和Future的方法。

AbstractEventExecutor 只是对EventExecutor中某些方法的简单实现

下面重点分析下非常有意思SingleThreadEventExecutor,它也是个抽象类,但是提供了很多重要方法的实现。弄清楚了这个对整个EventExecutor体系都非常有帮助。从类名上可知里面只有一个线程,先搞清楚一个线程的处理过程再理解多线程的就轻松些了。

先从execute方法入口分析
@Override
public void execute(Runnable task) {
	if (task == null) {
		throw new NullPointerException("task");
	}

	boolean inEventLoop = inEventLoop();
	if (inEventLoop) {
		addTask(task);
	} else {
		startThread();
		addTask(task);
		if (isShutdown() && removeTask(task)) {
			reject();
		}
	}

	if (!addTaskWakesUp) {
		wakeup(inEventLoop);
	}
}

先不看这个源码,我们分析下,作为一个Executor,A让你执行命令A,B让你执行命令B。。。命令不是说执行就能执行的吧。总得有个地方保存还没来得及执行的命令吧,总得有个先来后到吧。而这个地方又会被多线程访问,得保证多线程访问可见性,操作的原子性。jdk中提供的BlockingQueue就是为此而生的。BlockingQueue是一个接口,jdk中有很多他的实现。SingleThreadEventExcutor中提供的这个地方叫tasksQueue,类型是jdk中的LinkedBlockingQueue。上面代码中的addTask就是往tasksQueue中添加。

另外SingleThreadEventExcutor实现了ScheduledExecutorService 接口,支持执行定时任务。得有个地方存放定时任务信息。类中的实现是delayedTaskQueue,它是一个PriorityQueue ,也是一个BlockingQueue。不过它里面的元素不是按照先来后到的顺序存取的,而是按照各个元素的优先级判断的。

因为这个execute方法,是可以在外部线程调用,也可以在内部线程调用。也就是说外部成员可以给你下命令,内部成员也可以给你下命令。所以在上面的代码中先调用inEventLoop判断当前下命令的是外部的还是内部的。
如果是外部的,先确定内部线程是否启动,没启动就先启动内部线程同时给自己加一个定时清理的定时任务。这个从下面的代码中可以看出
private void startThread() {
	synchronized (stateLock) {
		if (state == ST_NOT_STARTED) {
			state = ST_STARTED;
			delayedTaskQueue.add(new ScheduledFutureTask<Void>(
					this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
					ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
			thread.start();
		}
	}
}

private final class PurgeTask implements Runnable {
	@Override
	public void run() {
		Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
		while (i.hasNext()) {
			ScheduledFutureTask<?> task = i.next();
			if (task.isCancelled()) {
				i.remove();
			}
		}
	}
}

添加命令完成了,下面就看如何去执行命令了,这个就需要分析下内部线程的执行逻辑了。SingleThreadEventExecutor类中有一个实力变量Thread,它引用的就是当前Executor所拥有的那个thread对象。
thread = threadFactory.newThread(new Runnable() {
	@Override
	public void run() {
		boolean success = false;
		updateLastExecutionTime();
		try {
			SingleThreadEventExecutor.this.run();
			success = true;
		} catch (Throwable t) {
			logger.warn("Unexpected exception from an event executor: ", t);
		} finally {
			// 更改状态
			if (state < ST_SHUTTING_DOWN) {
				state = ST_SHUTTING_DOWN;
			}

			// Check if confirmShutdown() was called at the end of the loop.
			// 这里说明在try块中调用的SingleThreadEventExecutor.this.run();中在方法结束之前必须调用confirmShutdown方法,这个在其之类实现的run方法中得到验证
			if (success && gracefulShutdownStartTime == 0) {
				logger.error(
						"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
						SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
						"before run() implementation terminates.");
			}

			try {
				// Run all remaining tasks and shutdown hooks.
				// 确保tasksQueue和shutDownHooks中的runable都处理完成了,这里的处理完成有可能是超时了
				for (;;) {
					if (confirmShutdown()) {
						break;
					}
				}
			} finally {
				try {
					cleanup();
				} finally {
					synchronized (stateLock) {
						state = ST_TERMINATED;
					}
					//释放信号量,使用Semaphore(0)来让另一个线程一直等待,知道内部线程调用了release()
					threadLock.release();
					if (!taskQueue.isEmpty()) {
							logger.warn(
									"An event executor terminated with " +
									"non-empty task queue (" + taskQueue.size() + ')');
						}

						terminationFuture.setSuccess(null);
					}
				}
			}
		}
	});

上面Thread内部run方法执行的是SingleThreadEventExecutor.this.run(),而这个run方法是一个抽象方法,留给了子类去实现了。不过可以肯定的是子类的run方法是不断的去tasksQueue中取出task去执行。现在重点分析下finally块中的代码。
1、首先更改状态为正在关闭状态。
2、如果子类中的run方法中的loop执行成功了,就得先调用confirmShutdown,确认任务队列中的任务是否都已经被执行了。
3、然后还得再次确认下任务队列中是否已被执行完毕,因为在关闭的过程中外部也是能添加任务的。
4、最终执行清理工作,更改状态为已关闭,释放信号量。
5、如果这个时候还是有任务没执行完,那也只能是无奈了,记个log吧
6、更新整个关闭过程为success

再分析下confirmShutdown,看看是如何保证所有的task执行完成的呢
protected boolean confirmShutdown() {
	// 如果state状态 state < ST_SHUTTING_DOWN则直接return false
	if (!isShuttingDown()) {
		return false;
	}
	// 这个方法必须从内部调用,从修饰符 protected也可以看出
	if (!inEventLoop()) {
		throw new IllegalStateException("must be invoked from an event loop");
	}
	// 取消所有的定时任务
	cancelDelayedTasks();
	
	if (gracefulShutdownStartTime == 0) {
		// 标记shutdown处理的开始时间
		gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
	}
	// 运行tasksQueue或者shutdownHooks中的所有Runnable都处理完成
	if (runAllTasks() || runShutdownHooks()) {
		//分析了下源码,isShutdown()这个只能是在外部线程调用了shutdown()接口的时候才会有可能成为true
		//但是现在这个方法已经@Deprecated,所以这个if块是不会进入的
		if (isShutdown()) {
			// shutdown 成功,没有更多的runnable需要执行
			return true;
		}

		// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
		wakeup(true);
		return false;
	}

	final long nanoTime = ScheduledFutureTask.nanoTime();
	// runAllTasks() 或者runAllTasks() + runShutdownHooks()方法执行时间操作了最大限制
	if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
		return true;
	}
	// 现在时间与上个任务执行完成的时间差小于quietPeriod时间,继续检测
	if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
		// Check if any tasks were added to the queue every 100ms.
		// TODO: Change the behavior of takeTask() so that it returns on timeout.
		wakeup(true);
		try {
			//内部线程sleep 100ms
			Thread.sleep(100);
		} catch (InterruptedException e) {
			// Ignore
		}

		return false;
	}

	// No tasks were added for last quiet period - hopefully safe to shut down.
	// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
	return true;
}

后面再看看这个类中的一些内部变量
private static final Runnable WAKEUP_TASK = new Runnable() {
	@Override
	public void run() {
		// Do nothing.
	}
};

这个WAKEUP_TASK什么也不做,为啥取名wakeup呢?我看源码也没太明白。有人理解的给解释下吧
private final Semaphore threadLock = new Semaphore(0);

threadLock的内部permits设置为0,也就是说acquire()永远获取不到permit,会一直被阻塞着。那有什么用呢?另一种实现wait()/notify()吧

值得注意的是类中是如何来控制定时任务的呢?秘密在这个方法中
private void fetchFromDelayedQueue() {
	long nanoTime = 0L;
	for (;;) {
		ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
		if (delayedTask == null) {
			break;
		}

		if (nanoTime == 0L) {
			nanoTime = ScheduledFutureTask.nanoTime();
		}

		if (delayedTask.deadlineNanos() <= nanoTime) {
			delayedTaskQueue.remove();
			taskQueue.add(delayedTask);
		} else {
			break;
		}
	}
}

ScheduledFutureTask类中有个变量记录这个类被加载进内存中的时间
private static final long START_TIME = System.nanoTime();

static long nanoTime() {
	return System.nanoTime() - START_TIME;
}
// 返回到期时间,到期时间在构造函数中指定了
public long deadlineNanos() {
	return deadlineNanos;
}
// 这个方法决定了排序的优先级
@Override
public int compareTo(Delayed o) {
	if (this == o) {
		return 0;
	}

	ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
	long d = deadlineNanos() - that.deadlineNanos();
	if (d < 0) {
		return -1;
	} else if (d > 0) {
		return 1;
	} else if (id < that.id) {
		return -1;
	} else if (id == that.id) {
		throw new Error();
	} else {
		return 1;
	}
}


所以fetchFromDelayedQueue()方法的逻辑就是先取出即将到期的task,判断是否已经到期,若已经到期就加入到tasksQueue中,等到被执行。

先分析到这里,后面在补上。
有什么不正确的,也请大家指正。
  • 大小: 23.7 KB
1
0
分享到:
评论
1 楼 fatherican 2016-08-22  
写的好。

相关推荐

    netty源码分析教程视频

    一个netty的入门教程以及源码分析视频,适合刚学习的人

    读书笔记:《Netty最佳实践》——《Netty实战》补遗.zip

    读书笔记:《Netty最佳实践》——《Netty实战》补遗

    netty源码深入分析

    《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...

    Netty4.x源码分析详解

    在深入分析 Netty 4.x 源码之前,我们首先需要了解其核心概念和架构。 Netty 的核心组件包括: 1. **ByteBuf**: 作为传统 ByteBuffer 的替代品,ByteBuf 提供了更高效且易用的内存管理机制,支持读写分离,避免了...

    《Netty最佳实践》——《Netty实战》补遗-Netty-Best-Practices.zip

    《Netty最佳实践》是基于Java的高性能网络应用框架Netty的一本补充读物,它旨在为开发者提供更深入、更实用的Netty使用技巧和最佳实践。这本书的内容可能涵盖Netty的基础概念、核心组件、性能优化、异常处理、安全...

    netty4源码

    netty4所有的jar包和源码,资源丰富,值得下载。

    Netty 框架学习 —— 第一个 Netty 应用(csdn)————程序.pdf

    在本篇关于“Netty框架学习——第一个Netty应用”的文章中,我们将深入理解如何使用Netty构建一个简单的Echo服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用程序框架,广泛应用于Java领域的服务器开发。...

    Netty 4.1源码包

    Netty4.1的源码,欢迎大家下载。.............................................................................................................................................................................

    netty4 sources 源码

    这个“netty4 sources 源码”指的是Netty 4.x 版本的源代码,其中4.0.33.Final是特定的版本号。源码分析对于理解Netty的工作原理、优化性能以及定制化开发非常有帮助。 Netty 的核心特性包括: 1. **异步事件驱动*...

    netty4.0源码,netty例子,netty api文档

    通过事件循环(EventLoop)和事件处理器(ChannelHandler),Netty能够处理大量并发连接,显著提高系统的吞吐量。 2. **Channel**:在Netty中,Channel是连接的抽象,它代表了到另一个实体(例如,另一个网络节点)...

    netty源码 4.*版本

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨 Netty 源码之前...通过分析源码,不仅可以提升自己的技术能力,还能为解决实际问题提供灵感和参考。

    netty源码剖析视频.zip

    《Netty源码剖析视频》课程是一份深度探讨Netty框架源码及其实战应用的资源集合。课程分为两个主要部分,旨在帮助开发者深入理解Netty的内部机制,并通过实战项目提升其在实际开发中的应用能力。 第一部分,深入浅...

    以netty4.1源码中的EchoServer为例对netty的源码进行分析.docx

    在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...

    Netty源码分析总结.rar

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的...在分析源码的过程中,我们通常会关注类的设计模式、线程模型、内存管理以及性能优化等方面,这对于提升网络编程和系统架构能力大有裨益。

    netty_learn_netty_源码.zip

    通过阅读和分析这个“netty_learn_netty_源码.zip”中的源代码,你可以深入了解Netty如何实现这些功能,以及它是如何优化网络通信效率的。此外,你还可以学习到Netty如何处理异常、优雅地关闭连接、线程安全等问题,...

    netty源码和相关中文文档

    接下来,我们谈谈 Netty 的源码分析。通过阅读 Netty 源码,我们可以深入了解其设计模式和优化策略: 1. **EventLoop(事件循环)**:Netty 使用单线程的 EventLoop 实现了事件的高效分发,减少了线程切换的开销。 ...

    Netty架构源码剖析_netty_

    深入源码分析,我们可以看到Netty如何优雅地处理了线程安全、内存池管理、心跳机制、解码编码、零拷贝等高级特性。例如,Netty通过内部的DirectBufferPool和HeapBufferPool实现了内存池,减少了内存分配和释放的开销...

    netty4.1源码

    4. **线程模型**:Netty使用EventLoop和EventLoopGroup实现了一种单线程处理多个连接的模型,减少了线程创建和销毁的开销,提高了系统效率。 5. **强大的编码解码器**:Netty提供了一系列预定义的编码解码器,如...

Global site tag (gtag.js) - Google Analytics