先从EventExecutor开始,因为它是一个很基础的工具类,是对I/O线程的包装。先了解下它的源码会对后面的分析有更好的理解。
先看下EventExecutor的类关系图,这里只是简单的画出了类和接口的继承和实现关系,还有其他的聚合关系没有画出来,为的是便于分析思路的清晰。
说到Executor,很容易联想到jdk中 java.util.concurrent.Executor 接口,这个接口非常简单,就一个方法
void execute(Runnable command);
从方法签名上就能看出这个是为了支持异步模式的。command表示一个命令。当前线程就是命令者角色,Executor内部的去运行Runnable的线程就是执行者。这里没有提供明确的地方供命令者去取得命令的执行结果。
ExecutorService 继承了Executor 接口,增加了对自身生命周期管理的方法,同时提供了一个Future给命令者去获取命令的执行结果。
ScheduledExecutorService 继承了ExecutorService接口,增加了对定时任务的支持。
EventExecutorGroup 继承了ScheduledExecutorService接口,对原来的ExecutorService的关闭接口提供了增强,提供了优雅的关闭接口。从接口名称上可以看出它是对多个EventExecutor的集合,提供了对多个EventExecutor的迭代访问接口。
EventExecutor 继承EventExecutorGroup 看着这个关系真心有些纠结啊。不过细想下还是能理解的。A是B中的一员,但是A也能迭代访问B中的其他成员。这个继承关系支持了迭代访问这个行为。自然的他提供了一个parent接口,来获取所属的EventExecutorGroup 。另外提供了inEventLoop 方法支持查询某个线程是否在EventExecutor所管理的线程中。还有其他一些创建Promise和Future的方法。
AbstractEventExecutor 只是对EventExecutor中某些方法的简单实现
下面重点分析下非常有意思SingleThreadEventExecutor,它也是个抽象类,但是提供了很多重要方法的实现。弄清楚了这个对整个EventExecutor体系都非常有帮助。从类名上可知里面只有一个线程,先搞清楚一个线程的处理过程再理解多线程的就轻松些了。
先从execute方法入口分析
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
先不看这个源码,我们分析下,作为一个Executor,A让你执行命令A,B让你执行命令B。。。命令不是说执行就能执行的吧。总得有个地方保存还没来得及执行的命令吧,总得有个先来后到吧。而这个地方又会被多线程访问,得保证多线程访问可见性,操作的原子性。jdk中提供的BlockingQueue就是为此而生的。BlockingQueue是一个接口,jdk中有很多他的实现。SingleThreadEventExcutor中提供的这个地方叫tasksQueue,类型是jdk中的LinkedBlockingQueue。上面代码中的addTask就是往tasksQueue中添加。
另外SingleThreadEventExcutor实现了ScheduledExecutorService 接口,支持执行定时任务。得有个地方存放定时任务信息。类中的实现是delayedTaskQueue,它是一个PriorityQueue ,也是一个BlockingQueue。不过它里面的元素不是按照先来后到的顺序存取的,而是按照各个元素的优先级判断的。
因为这个execute方法,是可以在外部线程调用,也可以在内部线程调用。也就是说外部成员可以给你下命令,内部成员也可以给你下命令。所以在上面的代码中先调用inEventLoop判断当前下命令的是外部的还是内部的。
如果是外部的,先确定内部线程是否启动,没启动就先启动内部线程同时给自己加一个定时清理的定时任务。这个从下面的代码中可以看出
private void startThread() {
synchronized (stateLock) {
if (state == ST_NOT_STARTED) {
state = ST_STARTED;
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
thread.start();
}
}
}
private final class PurgeTask implements Runnable {
@Override
public void run() {
Iterator<ScheduledFutureTask<?>> i = delayedTaskQueue.iterator();
while (i.hasNext()) {
ScheduledFutureTask<?> task = i.next();
if (task.isCancelled()) {
i.remove();
}
}
}
}
添加命令完成了,下面就看如何去执行命令了,这个就需要分析下内部线程的执行逻辑了。SingleThreadEventExecutor类中有一个实力变量Thread,它引用的就是当前Executor所拥有的那个thread对象。
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 更改状态
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}
// Check if confirmShutdown() was called at the end of the loop.
// 这里说明在try块中调用的SingleThreadEventExecutor.this.run();中在方法结束之前必须调用confirmShutdown方法,这个在其之类实现的run方法中得到验证
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
// 确保tasksQueue和shutDownHooks中的runable都处理完成了,这里的处理完成有可能是超时了
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
//释放信号量,使用Semaphore(0)来让另一个线程一直等待,知道内部线程调用了release()
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
上面Thread内部run方法执行的是SingleThreadEventExecutor.this.run(),而这个run方法是一个抽象方法,留给了子类去实现了。不过可以肯定的是子类的run方法是不断的去tasksQueue中取出task去执行。现在重点分析下finally块中的代码。
1、首先更改状态为正在关闭状态。
2、如果子类中的run方法中的loop执行成功了,就得先调用confirmShutdown,确认任务队列中的任务是否都已经被执行了。
3、然后还得再次确认下任务队列中是否已被执行完毕,因为在关闭的过程中外部也是能添加任务的。
4、最终执行清理工作,更改状态为已关闭,释放信号量。
5、如果这个时候还是有任务没执行完,那也只能是无奈了,记个log吧
6、更新整个关闭过程为success
再分析下confirmShutdown,看看是如何保证所有的task执行完成的呢
protected boolean confirmShutdown() {
// 如果state状态 state < ST_SHUTTING_DOWN则直接return false
if (!isShuttingDown()) {
return false;
}
// 这个方法必须从内部调用,从修饰符 protected也可以看出
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
// 取消所有的定时任务
cancelDelayedTasks();
if (gracefulShutdownStartTime == 0) {
// 标记shutdown处理的开始时间
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
// 运行tasksQueue或者shutdownHooks中的所有Runnable都处理完成
if (runAllTasks() || runShutdownHooks()) {
//分析了下源码,isShutdown()这个只能是在外部线程调用了shutdown()接口的时候才会有可能成为true
//但是现在这个方法已经@Deprecated,所以这个if块是不会进入的
if (isShutdown()) {
// shutdown 成功,没有更多的runnable需要执行
return true;
}
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
wakeup(true);
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
// runAllTasks() 或者runAllTasks() + runShutdownHooks()方法执行时间操作了最大限制
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
// 现在时间与上个任务执行完成的时间差小于quietPeriod时间,继续检测
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
wakeup(true);
try {
//内部线程sleep 100ms
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// No tasks were added for last quiet period - hopefully safe to shut down.
// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
return true;
}
后面再看看这个类中的一些内部变量
private static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
这个WAKEUP_TASK什么也不做,为啥取名wakeup呢?我看源码也没太明白。有人理解的给解释下吧
private final Semaphore threadLock = new Semaphore(0);
threadLock的内部permits设置为0,也就是说acquire()永远获取不到permit,会一直被阻塞着。那有什么用呢?另一种实现wait()/notify()吧
值得注意的是类中是如何来控制定时任务的呢?秘密在这个方法中
private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}
if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
}
}
}
ScheduledFutureTask类中有个变量记录这个类被加载进内存中的时间
private static final long START_TIME = System.nanoTime();
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
// 返回到期时间,到期时间在构造函数中指定了
public long deadlineNanos() {
return deadlineNanos;
}
// 这个方法决定了排序的优先级
@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
所以fetchFromDelayedQueue()方法的逻辑就是先取出即将到期的task,判断是否已经到期,若已经到期就加入到tasksQueue中,等到被执行。
先分析到这里,后面在补上。
有什么不正确的,也请大家指正。
- 大小: 23.7 KB
分享到:
相关推荐
一个netty的入门教程以及源码分析视频,适合刚学习的人
读书笔记:《Netty最佳实践》——《Netty实战》补遗
《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...
在深入分析 Netty 4.x 源码之前,我们首先需要了解其核心概念和架构。 Netty 的核心组件包括: 1. **ByteBuf**: 作为传统 ByteBuffer 的替代品,ByteBuf 提供了更高效且易用的内存管理机制,支持读写分离,避免了...
《Netty最佳实践》是基于Java的高性能网络应用框架Netty的一本补充读物,它旨在为开发者提供更深入、更实用的Netty使用技巧和最佳实践。这本书的内容可能涵盖Netty的基础概念、核心组件、性能优化、异常处理、安全...
netty4所有的jar包和源码,资源丰富,值得下载。
在本篇关于“Netty框架学习——第一个Netty应用”的文章中,我们将深入理解如何使用Netty构建一个简单的Echo服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用程序框架,广泛应用于Java领域的服务器开发。...
Netty4.1的源码,欢迎大家下载。.............................................................................................................................................................................
这个“netty4 sources 源码”指的是Netty 4.x 版本的源代码,其中4.0.33.Final是特定的版本号。源码分析对于理解Netty的工作原理、优化性能以及定制化开发非常有帮助。 Netty 的核心特性包括: 1. **异步事件驱动*...
通过事件循环(EventLoop)和事件处理器(ChannelHandler),Netty能够处理大量并发连接,显著提高系统的吞吐量。 2. **Channel**:在Netty中,Channel是连接的抽象,它代表了到另一个实体(例如,另一个网络节点)...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨 Netty 源码之前...通过分析源码,不仅可以提升自己的技术能力,还能为解决实际问题提供灵感和参考。
《Netty源码剖析视频》课程是一份深度探讨Netty框架源码及其实战应用的资源集合。课程分为两个主要部分,旨在帮助开发者深入理解Netty的内部机制,并通过实战项目提升其在实际开发中的应用能力。 第一部分,深入浅...
在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的...在分析源码的过程中,我们通常会关注类的设计模式、线程模型、内存管理以及性能优化等方面,这对于提升网络编程和系统架构能力大有裨益。
通过阅读和分析这个“netty_learn_netty_源码.zip”中的源代码,你可以深入了解Netty如何实现这些功能,以及它是如何优化网络通信效率的。此外,你还可以学习到Netty如何处理异常、优雅地关闭连接、线程安全等问题,...
接下来,我们谈谈 Netty 的源码分析。通过阅读 Netty 源码,我们可以深入了解其设计模式和优化策略: 1. **EventLoop(事件循环)**:Netty 使用单线程的 EventLoop 实现了事件的高效分发,减少了线程切换的开销。 ...
深入源码分析,我们可以看到Netty如何优雅地处理了线程安全、内存池管理、心跳机制、解码编码、零拷贝等高级特性。例如,Netty通过内部的DirectBufferPool和HeapBufferPool实现了内存池,减少了内存分配和释放的开销...
4. **线程模型**:Netty使用EventLoop和EventLoopGroup实现了一种单线程处理多个连接的模型,减少了线程创建和销毁的开销,提高了系统效率。 5. **强大的编码解码器**:Netty提供了一系列预定义的编码解码器,如...