- 浏览: 981115 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
netty 单线程事件执行器初始化:http://donald-draper.iteye.com/blog/2391895
引言:
上一篇文章我们看一单线程任务的内部变量和初始化,先来回顾一下:
单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;
单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略,默认拒绝策略为直接抛出拒绝执行器异常。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。
今天我们来看单线程事件执行器,执行任务等方法:
先把单线程事件执行器内部变量贴出来以便理解相关方法,
下面先来看执行任务方法
上面方法有一下几点要看:
1.
2.
3.
4.
下面分别来看这几点
1.
2.
从这一点来看,当执行器关闭时,直接直接抛出拒绝执行任务异常,如果没关闭,
则将拒绝执行的任务委托给拒绝执行任务Handler处理。
3.
在实际完成事件执行器启动工作方法中,事件完成工作通过一个线程完成,并有内部执行器执行;线程事件工作在一个try,finally语句块中,在finally语句块中自旋等待执行器关闭,并完成关闭任务。
来看上述方法需要关注的几点:
3.1
3.2
3.3
来看取消调度任务
在来看运行关闭Hook线程:
3.4
4.
从上面可看出,单线程事件执行器,执行任务,首先判断任务是否为null,为空抛出空指针异常,否则,判断线程是否在当前事件循环中,在则添加任务到任务队列,否则开启当前单线程事件执行器,并添加任务到任务队列,如果此时事件执行器已关闭,并可以移除任务,则抛出拒绝执行器任务异常;如果需要启动事件执行器唤醒线程,则添加唤醒线程到任务队列。
再来看其他方法:
从上面可以看出,添加,移除,poll任务操作,实际委托给任务队列,
添加,移除hook线程操作委托给关闭hooks线程集合。
来看从任务队列take任务
从上面可以看出,单线程事件执行器take任务,首先从调度任务队列peek头部调度任务,
如果任务不为空,则获取调度任务延时时间,如果延时时间大于0,则从任务队列超时poll任务,否则从调度任务队列抓取调度任务,添加到任务队列,并从任务队列poll任务;如果调度任务为空,则从任务队列take一个任务,如果是唤醒任务,则忽略。
再来看超时运行所有任务:
从上面可以看出,超时运行所有任务,即从调度任务队列,抓取任务,方法任务队列,
从任务队列拉取任务,并执行,如果任务执行时间超过timeoutNanos,则停止执行任务。
来看关闭单线程执行器
从上面可以看出,关闭单线程执行器,首先检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间,
如果已经关闭,则返回异步关闭任务结果,否则检查线程是否在当前事务循环中,如果是则更新状态为正在关闭,
并计算计算关闭间隔和超时时间。
来看已经丢弃的关闭方法
shutdown与shutdownGracefully方法的不同在于,关闭执行器时,shutdownGracefully有一个缓冲的时间和任务执行的超时时间,以便将任务队列中的任务尽量在超时时间内执行完。
再来看超时等待终止执行器
再来看获取执行器线程属性:
再来看其他方法:
下面几个方法的思想都是首先检查线程是否在当前事件循环中,如果不在,则抛出拒绝执行器异常,否则将相应的操作委托给父类。
总结:
单线程事件执行器,执行任务,首先判断任务是否为null,为空抛出空指针异常,否则,判断线程是否在当前事件循环中,在则添加任务到任务队列,否则开启当前单线程事件执行器,并添加任务到任务队列,如果此时事件执行器已关闭,并可以移除任务,则抛出拒绝执行器任务异常;如果需要启动事件执行器唤醒线程,则添加唤醒线程到任务队列。
添加,移除,poll任务操作,实际委托给任务队列,添加,移除hook线程操作委托给关闭hooks线程集合。
单线程事件执行器take任务,首先从调度任务队列peek头部调度任务,如果任务不为空,则获取调度任务延时时间,如果延时时间大于0,则从任务队列超时poll任务,否则从调度任务队列抓取调度任务,添加到任务队列,并从任务队列poll任务;如果调度任务为空,则从任务队列take一个任务,如果是唤醒任务,则忽略。
关闭单线程执行器,首先检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间,如果已经关闭,则返回异步关闭任务结果,否则检查线程是否在当前事务循环中,如果是则更新状态为正在关闭,并计算计算关闭间隔和超时时间。
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
netty 单线程事件执行器初始化:http://donald-draper.iteye.com/blog/2391895
引言:
上一篇文章我们看一单线程任务的内部变量和初始化,先来回顾一下:
单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;
单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略,默认拒绝策略为直接抛出拒绝执行器异常。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。
今天我们来看单线程事件执行器,执行任务等方法:
先把单线程事件执行器内部变量贴出来以便理解相关方法,
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //最大执行任数,最小为16 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE)); private static final InternalLogger logger = InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); //事件执行器状态 private static final int ST_NOT_STARTED = 1;//就绪 private static final int ST_STARTED = 2;//开始 private static final int ST_SHUTTING_DOWN = 3;//正在关闭 private static final int ST_SHUTDOWN = 4;//已关闭 private static final int ST_TERMINATED = 5;//终止 //唤醒任务 private static final Runnable WAKEUP_TASK = new Runnable() { @Override public void run() { // Do nothing. } }; //空任务 private static final Runnable NOOP_TASK = new Runnable() { @Override public void run() { // Do nothing. } }; //事件执行器状态 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); //线程属性 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER = AtomicReferenceFieldUpdater.newUpdater( SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties"); //任务队列 private final Queue<Runnable> taskQueue; private volatile Thread thread;//当前事件执行器线程 @SuppressWarnings("unused")//线程属性 private volatile ThreadProperties threadProperties; private final Executor executor;//内部执行器 private volatile boolean interrupted;//是否中断 private final Semaphore threadLock = new Semaphore(0);//事件执行器终止信号量 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();//关闭Hooks任务 private final boolean addTaskWakesUp; private final int maxPendingTasks;//最大执行器任务 private final RejectedExecutionHandler rejectedExecutionHandler;//任务拒绝策略 private long lastExecutionTime;//上次执行器时间 @SuppressWarnings({ "FieldMayBeFinal", "unused" }) private volatile int state = ST_NOT_STARTED;//执行器初始状态 private volatile long gracefulShutdownQuietPeriod;//关闭间隔QuietPeriod private volatile long gracefulShutdownTimeout;//关闭超时时间 private long gracefulShutdownStartTime;//关闭开始时间 //终止异步任务结果 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE); }
下面先来看执行任务方法
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //判断线程是否在当前事务循环中 boolean inEventLoop = inEventLoop(); if (inEventLoop) { //如果在事务循环中,则添加任务到任务队列 addTask(task); } else { //否则,当前事件执行器线程 startThread(); //添加任务到任务队列 addTask(task); //如果事件执行器,已关闭,则移除任务,抛出拒绝执行异常 if (isShutdown() && removeTask(task)) { reject(); } } //如果添加任务不唤醒事件循环且执行任务唤醒事件循环,则唤醒事件循环 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
上面方法有一下几点要看:
1.
//判断线程是否在当前事务循环中 boolean inEventLoop = inEventLoop();
2.
if (inEventLoop) { //如果在事务循环中,则添加任务到任务队列 addTask(task); }
3.
else { //否则,当前事件执行器线程 startThread(); //添加任务到任务队列 addTask(task); //如果事件执行器,已关闭,则移除任务,抛出拒绝执行异常 if (isShutdown() && removeTask(task)) { reject(); } }
4.
//如果添加任务不唤醒事件循环且执行任务唤醒事件循环,则唤醒事件循环 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }
下面分别来看这几点
1.
//判断线程是否在当前事务循环中 boolean inEventLoop = inEventLoop();
//AbstractEventExecutor @Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
//SingleThreadEventExecutor @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
2.
if (inEventLoop) { //如果在事务循环中,则添加任务到任务队列 addTask(task); }
/** * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown * before. 在单线程事件执行器关闭前,添加任务到任务队列,或者抛出拒绝执行任务异常 */ protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //如果添加任务到队列失败,则抛出抛出拒绝执行任务异常 if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { //如果关闭,则拒绝执行器任务,否则添加任务到任务队列 if (isShutdown()) { reject(); } return taskQueue.offer(task); } /** * Offers the task to the associated {@link RejectedExecutionHandler}. 将拒绝执行的任务委托给拒绝执行任务Handler处理 * @param task to reject. */ protected final void reject(Runnable task) { rejectedExecutionHandler.rejected(task, this); } //直接抛出拒绝执行任务异常 protected static void reject() { throw new RejectedExecutionException("event executor terminated"); }
从这一点来看,当执行器关闭时,直接直接抛出拒绝执行任务异常,如果没关闭,
则将拒绝执行的任务委托给拒绝执行任务Handler处理。
3.
else { //否则,当前事件执行器线程 startThread(); //添加任务到任务队列 addTask(task); //如果事件执行器,已关闭,则移除任务,抛出拒绝执行异常 if (isShutdown() && removeTask(task)) { reject(); } }
//启动事件执行器线程 private void startThread() { if (state == ST_NOT_STARTED) { //更新执行器状态为已启动,并完成实际启动工作 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } } } //完成实际启动工作 private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { //初始化事件执行器线程,用于判断线程是否在当前事件循环中 thread = Thread.currentThread(); if (interrupted) { //如果执行器线程中断,消除中断位 thread.interrupt(); } boolean success = false; //更新执行器上次执行器事件 updateLastExecutionTime(); try { //启动事件执行器线程 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { //自旋,等待执行器关闭,并更新执行器状态为已关闭 int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. //检查confirmShutdown方法是否在循环最后调用 if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { /确定执行器关闭 if (confirmShutdown()) { break; } } } finally { try { //完成清理工作 cleanup(); } finally { //更新执行器状态为已终止 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); //释放线程锁 threadLock.release(); //如果任务线程队列不为空,则警告 if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } //设置终止任务结果为成功 terminationFuture.setSuccess(null); } } } } }); }
在实际完成事件执行器启动工作方法中,事件完成工作通过一个线程完成,并有内部执行器执行;线程事件工作在一个try,finally语句块中,在finally语句块中自旋等待执行器关闭,并完成关闭任务。
来看上述方法需要关注的几点:
3.1
//更新执行器上次执行器事件 updateLastExecutionTime();
/** * Updates the internal timestamp that tells when a submitted task was executed most recently. * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's * usually no need to call this method. However, if you take the tasks manually using {@link #takeTask()} or * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period * checks. */ protected void updateLastExecutionTime() { lastExecutionTime = ScheduledFutureTask.nanoTime(); }
3.2
//启动事件执行器线程 SingleThreadEventExecutor.this.run();
/** *待子类实现 */ protected abstract void run();
3.3
/确定执行器关闭 if (confirmShutdown()) { break; }
/** * Confirm that the shutdown if the instance should be done now! 确认事件执行器关闭 */ protected boolean confirmShutdown() { //如果没关闭,返回false if (!isShuttingDown()) { return false; } //如果线程不在当前事件循环中,抛出非法状态异常 if (!inEventLoop()) { throw new IllegalStateException("must be invoked from an event loop"); } //取消调度任务 cancelScheduledTasks(); //更新关闭事件 if (gracefulShutdownStartTime == 0) { gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } //执行所有任务或关闭Hooks线程成功 if (runAllTasks() || runShutdownHooks()) { if (isShutdown()) { //已关闭 // Executor shut down - no new tasks anymore. return true; } // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or // terminate if the quiet period is 0. // See https://github.com/netty/netty/issues/4241 if (gracefulShutdownQuietPeriod == 0) { return true; } wakeup(true); return false; } //获取调度任务当前事件 final long nanoTime = ScheduledFutureTask.nanoTime(); //如果已经关闭,或关闭超时,返回true if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; } //如果关闭间隔时间没过完 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. //则继续执行提交的任务 wakeup(true); try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } return false; } // No tasks were added for last quiet period - hopefully safe to shut down. // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.) return true; }
来看取消调度任务
//取消调度任务 cancelScheduledTasks(); 这个方法,在前面文章中已说,简单看下就行 /** * Cancel all scheduled tasks. * * This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected void cancelScheduledTasks() { assert inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; if (isNullOrEmpty(scheduledTaskQueue)) { return; } final ScheduledFutureTask<?>[] scheduledTasks = scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]); for (ScheduledFutureTask<?> task: scheduledTasks) { task.cancelWithoutRemove(false); } scheduledTaskQueue.clear(); }
再来看执行所有任务或关闭Hooks线程 //执行所有任务或关闭Hooks线程成功 if (runAllTasks() || runShutdownHooks()) { if (isShutdown()) { //已关闭 // Executor shut down - no new tasks anymore. return true; } // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or // terminate if the quiet period is 0. // See https://github.com/netty/netty/issues/4241 if (gracefulShutdownQuietPeriod == 0) { return true; } wakeup(true); return false; } /** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. *从任务队列take所有任务,并执行器 * @return {@code true} if and only if at least one task was run */ protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { //从调度任务队列抓取任务,添加到任务队列中 fetchedAll = fetchFromScheduledTaskQueue(); //从任务队列take任务,并执行 if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { //更新最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); } //完成运行所有任务后的工作 afterRunningAllTasks(); return ranAtLeastOne; } //从调度任务队列抓取任务,添加到任务队列中 private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; } /** * Runs all tasks from the passed {@code taskQueue}. *从任务队列take任务,并执行 * @param taskQueue To poll and execute all tasks. * * @return {@code true} if at least one task was executed. */ protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { safeExecute(task); task = pollTaskFrom(taskQueue); if (task == null) { return true; } } } //从任务队列拉取任务 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { //如果是唤醒任务,则跳过 continue; } return task; } } //AbstractEventExecutor /** * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}. */ protected static void safeExecute(Runnable task) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } } /** * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}. 待子类实现 */ @UnstableApi protected void afterRunningAllTasks() { }
在来看运行关闭Hook线程:
private boolean runShutdownHooks() { boolean ran = false; // Note shutdown hooks can add / remove shutdown hooks. while (!shutdownHooks.isEmpty()) { //如果关闭hook线程集合不为空,执行hook线程。 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks); shutdownHooks.clear(); for (Runnable task: copy) { try { task.run(); } catch (Throwable t) { logger.warn("Shutdown hook raised an exception.", t); } finally { ran = true; } } } if (ran) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } return ran; }
3.4
//完成清理工作 cleanup();
/** * Do nothing, sub-classes may override 待子类实现 */ protected void cleanup() { // NOOP }
4.
//如果添加任务不唤醒事件循环且执行任务唤醒事件循环,则唤醒事件循环 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }
@SuppressWarnings("unused") protected boolean wakesUpForTask(Runnable task) { return true; } //添加唤醒线程到任务队列 protected void wakeup(boolean inEventLoop) { if (!inEventLoop || state == ST_SHUTTING_DOWN) { // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there // is already something in the queue. taskQueue.offer(WAKEUP_TASK); } }
从上面可看出,单线程事件执行器,执行任务,首先判断任务是否为null,为空抛出空指针异常,否则,判断线程是否在当前事件循环中,在则添加任务到任务队列,否则开启当前单线程事件执行器,并添加任务到任务队列,如果此时事件执行器已关闭,并可以移除任务,则抛出拒绝执行器任务异常;如果需要启动事件执行器唤醒线程,则添加唤醒线程到任务队列。
再来看其他方法:
/** * Interrupt the current running {@link Thread}. 中断时间执行器线程 */ protected void interruptThread() { Thread currentThread = thread; if (currentThread == null) { interrupted = true; } else { currentThread.interrupt(); } } //从任务队列拉取任务 /** * @see Queue#poll() */ protected Runnable pollTask() { assert inEventLoop(); return pollTaskFrom(taskQueue); } protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { continue; } return task; } } //移除任务 /** * @see Queue#remove(Object) */ protected boolean removeTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } return taskQueue.remove(task); } //检查队头任务 /** * @see Queue#peek() */ protected Runnable peekTask() { assert inEventLoop(); return taskQueue.peek(); } //判断任务队列是否还有任务 /** * @see Queue#isEmpty() */ protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); } /** * Return the number of tasks that are pending for processing. *获取任务队列当前任务数 * [b]Be aware that this operation may be expensive as it depends on the internal implementation of the * SingleThreadEventExecutor. So use it was care![/b] */ public int pendingTasks() { return taskQueue.size(); } //添加hook线程到,关闭hook线程集合 /** * Add a {@link Runnable} which will be executed on shutdown of this instance */ public void addShutdownHook(final Runnable task) { if (inEventLoop()) { shutdownHooks.add(task); } else { execute(new Runnable() { @Override public void run() { shutdownHooks.add(task); } }); } } //从关闭hook线程集合中,移除hook线程 /** * Remove a previous added {@link Runnable} as a shutdown hook */ public void removeShutdownHook(final Runnable task) { if (inEventLoop()) { shutdownHooks.remove(task); } else { execute(new Runnable() { @Override public void run() { shutdownHooks.remove(task); } }); } } //返回下一个调度任务的延时时间 /** * Returns the amount of time left until the scheduled task with the closest dead line is executed. */ protected long delayNanos(long currentTimeNanos) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; } return scheduledTask.delayNanos(currentTimeNanos); }
从上面可以看出,添加,移除,poll任务操作,实际委托给任务队列,
添加,移除hook线程操作委托给关闭hooks线程集合。
来看从任务队列take任务
/** * Take the next {@link Runnable} from the task queue and so will block if no task is currently present. * * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}. * * * @return {@code null} if the executor thread has been interrupted or waken up. */ protected Runnable takeTask() { assert inEventLoop(); if (!(taskQueue instanceof BlockingQueue)) { throw new UnsupportedOperationException(); } //获取当前任务队列 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue; for (;;) { //从调度任务队列peek头部调度任务 ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { Runnable task = null; try { //如果调度任务为空,则从任务队列take一个任务 task = taskQueue.take(); if (task == WAKEUP_TASK) { task = null; } } catch (InterruptedException e) { // Ignore } return task; } else { //否则,获取调度任务延时时间 long delayNanos = scheduledTask.delayNanos(); Runnable task = null; if (delayNanos > 0) { try { //如果延时时间大于0,则从任务队列超时poll任务 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { // Waken up. return null; } } if (task == null) { // We need to fetch the scheduled tasks now as otherwise there may be a chance that // scheduled tasks are never executed if there is always one task in the taskQueue. // This is for example true for the read task of OIO Transport // See https://github.com/netty/netty/issues/1614 //从调度任务队列抓取调度任务,添加到任务队列 fetchFromScheduledTaskQueue(); //从任务队列poll任务 task = taskQueue.poll(); } if (task != null) { return task; } } } }
从上面可以看出,单线程事件执行器take任务,首先从调度任务队列peek头部调度任务,
如果任务不为空,则获取调度任务延时时间,如果延时时间大于0,则从任务队列超时poll任务,否则从调度任务队列抓取调度任务,添加到任务队列,并从任务队列poll任务;如果调度任务为空,则从任务队列take一个任务,如果是唤醒任务,则忽略。
再来看超时运行所有任务:
/** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. 超时运行所有任务,即从任务队列拉取任务,并执行,如果任务执行时间超过timeoutNanos,则停止执行任务 */ protected boolean runAllTasks(long timeoutNanos) { //从调度任务抓取任务队列,并添加到任务队列 fetchFromScheduledTaskQueue(); //从任务队列poll任务 Runnable task = pollTask(); if (task == null) { //任务为空,则完成执行结束任务 afterRunningAllTasks(); return false; } //超时时间 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { //自旋安全执行任务 safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. //没64个任务检查一下任务执行时间有没有超时 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
从上面可以看出,超时运行所有任务,即从调度任务队列,抓取任务,方法任务队列,
从任务队列拉取任务,并执行,如果任务执行时间超过timeoutNanos,则停止执行任务。
来看关闭单线程执行器
@Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { //检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间 if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } if (unit == null) { throw new NullPointerException("unit"); } //如果已经关闭,则返回异步关闭任务结果 if (isShuttingDown()) { return terminationFuture(); } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { //如果已经关闭,则返回异步关闭任务结果 if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; oldState = state; //如果线程在当前事务循环,则更新状态为正在关闭 if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: newState = ST_SHUTTING_DOWN; break; default: newState = oldState; wakeup = false; } } //更新状态 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } //计算关闭间隔和超时时间,这个会在执行任务方法中的finally语句块中,用到 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { doStartThread(); } if (wakeup) { //执行唤醒任务线程 wakeup(inEventLoop); } return terminationFuture(); }
从上面可以看出,关闭单线程执行器,首先检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间,
如果已经关闭,则返回异步关闭任务结果,否则检查线程是否在当前事务循环中,如果是则更新状态为正在关闭,
并计算计算关闭间隔和超时时间。
来看已经丢弃的关闭方法
@Override @Deprecated public void shutdown() { if (isShutdown()) { return; } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { if (isShuttingDown()) { return; } int newState; wakeup = true; oldState = state; if (inEventLoop) { newState = ST_SHUTDOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: case ST_SHUTTING_DOWN: newState = ST_SHUTDOWN; break; default: newState = oldState; wakeup = false; } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } if (oldState == ST_NOT_STARTED) { doStartThread(); } if (wakeup) { wakeup(inEventLoop); } }
shutdown与shutdownGracefully方法的不同在于,关闭执行器时,shutdownGracefully有一个缓冲的时间和任务执行的超时时间,以便将任务队列中的任务尽量在超时时间内执行完。
@Override public boolean isShuttingDown() { return state >= ST_SHUTTING_DOWN; } @Override public boolean isShutdown() { return state >= ST_SHUTDOWN; } @Override public boolean isTerminated() { return state == ST_TERMINATED; } @Override public Future<?> terminationFuture() { return terminationFuture; }
再来看超时等待终止执行器
@Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { if (unit == null) { throw new NullPointerException("unit"); } if (inEventLoop()) { throw new IllegalStateException("cannot await termination of the current thread"); } //尝试获取线程锁,这个锁在执行器终止时,释放。 if (threadLock.tryAcquire(timeout, unit)) { threadLock.release(); } return isTerminated(); }
再来看获取执行器线程属性:
/** * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}. * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until the * it is fully started. 返回执行器线程的属性,如果执行器没启动,则阻塞到其启动 */ public final ThreadProperties threadProperties() { ThreadProperties threadProperties = this.threadProperties; if (threadProperties == null) { Thread thread = this.thread; if (thread == null) { assert !inEventLoop(); submit(NOOP_TASK).syncUninterruptibly(); thread = this.thread; assert thread != null; } //构造线程属性,并更新 threadProperties = new DefaultThreadProperties(thread); if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) { threadProperties = this.threadProperties; } } return threadProperties; }
再来看其他方法:
下面几个方法的思想都是首先检查线程是否在当前事件循环中,如果不在,则抛出拒绝执行器异常,否则将相应的操作委托给父类。
@Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { throwIfInEventLoop("invokeAny"); return super.invokeAny(tasks); } @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { throwIfInEventLoop("invokeAny"); return super.invokeAny(tasks, timeout, unit); } @Override public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { throwIfInEventLoop("invokeAll"); return super.invokeAll(tasks); } @Override public <T> List<java.util.concurrent.Future<T>> invokeAll( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { throwIfInEventLoop("invokeAll"); return super.invokeAll(tasks, timeout, unit); } //如果不在当前事件循环中,则抛出拒绝执行器异常 private void throwIfInEventLoop(String method) { if (inEventLoop()) { throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed"); } }
总结:
单线程事件执行器,执行任务,首先判断任务是否为null,为空抛出空指针异常,否则,判断线程是否在当前事件循环中,在则添加任务到任务队列,否则开启当前单线程事件执行器,并添加任务到任务队列,如果此时事件执行器已关闭,并可以移除任务,则抛出拒绝执行器任务异常;如果需要启动事件执行器唤醒线程,则添加唤醒线程到任务队列。
添加,移除,poll任务操作,实际委托给任务队列,添加,移除hook线程操作委托给关闭hooks线程集合。
单线程事件执行器take任务,首先从调度任务队列peek头部调度任务,如果任务不为空,则获取调度任务延时时间,如果延时时间大于0,则从任务队列超时poll任务,否则从调度任务队列抓取调度任务,添加到任务队列,并从任务队列poll任务;如果调度任务为空,则从任务队列take一个任务,如果是唤醒任务,则忽略。
关闭单线程执行器,首先检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间,如果已经关闭,则返回异步关闭任务结果,否则检查线程是否在当前事务循环中,如果是则更新状态为正在关闭,并计算计算关闭间隔和超时时间。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2444netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1317netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1594netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1397netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2177netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1078netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1219netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1309netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1855netty 管道线定义-ChannelPipeline:htt ...
相关推荐
在分析Netty5多线程编程之前,首先需要了解Java内存模型与多线程编程的关系。硬件技术的进步,特别是多核处理器的发展和成本的降低,使得多任务处理成为操作系统的基本功能。多核处理器能够同时执行多个线程,使得...
每个 EventLoop 都是一个独立的线程,它们负责处理 I/O 事件和执行用户定义的任务。EventLoopGroup 是 EventLoop 的集合,通常会有多个 EventLoop 实例,形成一个线程池。 3. **ChannelHandler 和 ...
EventLoop是Netty的核心组件,它是一个单线程执行任务的循环,负责处理I/O事件并分发到对应的处理器。每个EventLoop都包含一个Selector,用于监听多个通道的事件。通过多路复用技术,EventLoop可以高效地处理大量...
1. **EventLoopGroup**:Netty 中的 EventLoopGroup 是一组事件循环(EventLoop)的集合,每个事件循环对应一个独立的工作线程。在服务器端,通常会创建两个 EventLoopGroup,一个用于接收客户端连接(BossGroup),...
Netty多线程并发编程知识点总结 Netty多线程并发编程是指在Netty框架中使用多线程技术来实现高性能、高并发的网络编程。下面是关于Netty多线程并发编程的知识点总结: 一、 JAVA 内存模型与多线程编程 在Java中,...
"Netty多线程案例集锦" Netty 多线程案例集锦是指在 Netty 框架中应用多线程技术来提高网络编程的性能和可扩展性。多线程技术可以让程序同时处理多个任务,从而提高程序的运行效率和响应速度。在 Netty 框架中,多...
5. **线程模型**:Netty采用了EventLoop(事件循环)和EventExecutorGroup(事件执行器组)来管理线程,有效地利用了多核处理器资源。 **Socket技术** Socket是网络通信的接口,允许两个进程通过网络进行通信。在...
Netty服务器线程模型概览_线程模型
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨“Netty 通过端口调用关闭”的主题,这通常涉及到网络服务器的生命周期管理和资源...
1. `NioEventLoop`:这是Netty实现的事件循环,每个`NioEventLoop`对应一个线程,负责执行I/O操作和用户任务。`NioEventLoop`内部维护了一个任务队列,用于存放待执行的任务。 2. `EventLoopGroup`:它是一个线程池...
Netty通过轮询的方式执行队列中的任务,保证了公平性和高效率。此外,EventLoop还负责调度任务,如定时任务和延迟任务。 4. ChannelHandler:在Netty中,业务逻辑是通过ChannelHandler链路来处理的。每个Handler...
这是一本详细介绍了netty线程模型的书籍,包括 nio核心
Boss 线程处理 accept 事件,Worker 线程则处理读写事件和任务队列中的任务,确保了 I/O 操作和业务逻辑的高效分离。 Netty 的 Pipeline(管道)机制是另一个核心概念。Pipeline 中包含多个 Handler,每个 Handler ...
- **EventLoop与EventLoopGroup**:EventLoop是执行I/O操作和处理回调事件的单线程实体,EventLoopGroup是一组EventLoop,负责分配EventLoop给不同的Channel。 2. **Netty的组件** - **ByteBuf**:Netty自定义的...
Netty的EventLoop是Reactor的核心,每个EventLoop都有一个与之关联的I/O线程,负责执行注册在其上的Channel的任务。EventLoop内部维护了一个待处理事件队列,当事件发生时,如新连接、数据到达或通道关闭等,这些...
Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在这个项目中,我们关注的是如何利用Netty内置的时间轮(TimeWheel)工具来处理大批量的定时或超时任务。 ...
《多线程并发编程在Netty中的应用分析》 ...在阅读《多线程并发编程在Netty中的应用分析》这份文档时,建议重点关注Netty的线程模型、事件驱动模型、ByteBuf内存管理以及异步编程机制,这些是理解和应用Netty的关键点。