- 浏览: 981062 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
引言:
前一篇文章我们看了多线程事件循环组,先来回顾一下:
事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,事件循环组主要所做的工作为通道注册。
事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。抽象事件循环AbstractEventLoop继承了抽象事件执行器AbstractEventExecutor,实现了事件循环接口。
多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。
今天我们要看的是Nio事件循环,先从NioEventLoopGroup来看,Nio事件循环如何创建:
//NioEventLoopGroup
从Nio事件循环组创建事件循环可以看出,事件循环为NioEventLoop。
我们先从抽象调度事件执行器开始看:
我们先来看调度任务ScheduledFutureTask
回到调度任务
在一个构造方法中
//延时调度非重复任务
有一点我们要关注为:
toCallable(runnable, result)//将任务线程与结果类型包装成Callable任务
由于调度任务继承了PromiseTask类,此方法在PromiseTask中定义,如下:
//PromiseTask
回到调度任务
再来看调度任务的其他方法:
这个使我们主要看的方法
从上面来看,调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。调度任务的比较先比较延时时间,如果延时时间相等,则比较任务id(延时时间和id越大,任务执行越靠后)。
看完调度任务,我们回到抽象调度事件执行器:
在调度周期性和间歇性任务,包装任务线程为调度任务时,有这么一句:
//Executors,JUC执行器工厂,这个JUC篇以说,这里简单列出来
从上面可以看出,抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;
移除调度任务的思想与调度任务相同,只不过执行移除操作。
总结:
调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。
抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列
scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;移除调度任务的思想与调度任务相同,只不过执行移除操作。
附:
//PromiseTask,可以异步任务结果重要的部分我们在文中已说,其他的一看就明白,
这里只贴出PromiseTask源码,不在讲解。
来看设置任务不可取消方法
//DefaultPromise
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
引言:
前一篇文章我们看了多线程事件循环组,先来回顾一下:
事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,事件循环组主要所做的工作为通道注册。
事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。抽象事件循环AbstractEventLoop继承了抽象事件执行器AbstractEventExecutor,实现了事件循环接口。
多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。
今天我们要看的是Nio事件循环,先从NioEventLoopGroup来看,Nio事件循环如何创建:
//NioEventLoopGroup
/** * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. */ public class NioEventLoopGroup extends MultithreadEventLoopGroup { ... @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } }
从Nio事件循环组创建事件循环可以看出,事件循环为NioEventLoop。
/** * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a * {@link Selector} and so does the multi-plexing of these in the event loop. * */ public final class NioEventLoop extends SingleThreadEventLoop {
/** * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread. * */ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
/** * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread. * */ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
我们先从抽象调度事件执行器开始看:
package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Abstract base class for {@link EventExecutor}s that want to support scheduling. */ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//调度任务队列 protected AbstractScheduledEventExecutor() { } protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { super(parent); } }
我们先来看调度任务ScheduledFutureTask
package io.netty.util.concurrent; import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
//ScheduledFuture package io.netty.util.concurrent; /** * The result of an scheduled asynchronous operation. 异步调度操作,继承了JUC的ScheduledFuture */ @SuppressWarnings("ClassNameSameAsAncestorName") public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V> { }
回到调度任务
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden") final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> { private static final AtomicLong nextTaskId = new AtomicLong();//调度任务id生成器 private static final long START_TIME = System.nanoTime();//调度开始时间 //当前已经开始的纳秒时间 static long nanoTime() { return System.nanoTime() - START_TIME; } //第一个调度任务延时时间 static long deadlineNanos(long delay) { return nanoTime() + delay; } private final long id = nextTaskId.getAndIncrement();//当前调度任务id private long deadlineNanos;//延时时间 /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ //调度任务间隔时间,0,不重复,大于0,周期性调度,小于零,固定延时调度,这个和JUC的调度任务一样 private final long periodNanos; //延时调度非重复任务 ScheduledFutureTask( AbstractScheduledEventExecutor executor, Runnable runnable, V result, long nanoTime) { this(executor, toCallable(runnable, result), nanoTime); } //延时调度非重复任务 ScheduledFutureTask( AbstractScheduledEventExecutor executor, Callable<V> callable, long nanoTime) { super(executor, callable); deadlineNanos = nanoTime; periodNanos = 0; } //重复调度任务 ScheduledFutureTask( AbstractScheduledEventExecutor executor, Callable<V> callable, long nanoTime, long period) { super(executor, callable); if (period == 0) { throw new IllegalArgumentException("period: 0 (expected: != 0)"); } deadlineNanos = nanoTime; periodNanos = period; } }
在一个构造方法中
//延时调度非重复任务
ScheduledFutureTask( AbstractScheduledEventExecutor executor, Runnable runnable, V result, long nanoTime) { this(executor, toCallable(runnable, result), nanoTime); }
有一点我们要关注为:
toCallable(runnable, result)//将任务线程与结果类型包装成Callable任务
由于调度任务继承了PromiseTask类,此方法在PromiseTask中定义,如下:
//PromiseTask
package io.netty.util.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.RunnableFuture; class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> { //任务线程与结果类型包装成Callable任务 static <T> Callable<T> toCallable(Runnable runnable, T result) { return new RunnableAdapter<T>(runnable, result); } //返回值线程包装类 private static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } @Override public T call() { task.run(); return result; } @Override public String toString() { return "Callable(task: " + task + ", result: " + result + ')'; } } protected final Callable<V> task;//内部任务线程 PromiseTask(EventExecutor executor, Runnable runnable, V result) { this(executor, toCallable(runnable, result)); } PromiseTask(EventExecutor executor, Callable<V> callable) { super(executor); task = callable; } ... }
回到调度任务
再来看调度任务的其他方法:
//获取调度任务时间执行器 @Override protected EventExecutor executor() { return super.executor(); } //调度任务延时时间 public long deadlineNanos() { return deadlineNanos; } //剩余延时时间 public long delayNanos() { return Math.max(0, deadlineNanos() - nanoTime()); } //根据当前时间,计算剩余延时时间 public long delayNanos(long currentTimeNanos) { return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME)); } //转换延时时间为纳秒 @Override public long getDelay(TimeUnit unit) { return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); } //比较延时任务,先比较延时时间,相等,则比较id @Override public int compareTo(Delayed o) { if (this == o) { return 0; } ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o; long d = deadlineNanos() - that.deadlineNanos(); if (d < 0) { return -1; } else if (d > 0) { return 1; } else if (id < that.id) { return -1; } else if (id == that.id) { throw new Error(); } else { return 1; } }
这个使我们主要看的方法
@Override public void run() { //如果执行器不在当前事件循环中,则断言失败 assert executor().inEventLoop(); try { if (periodNanos == 0) { //非重复调度任务,设置任务不可取消 if (setUncancellableInternal()) { //执行任务 V result = task.call(); //设置执行结果 setSuccessInternal(result); } } else { // check if is done as it may was cancelled //若是周期性任务,则检查调度任务是否取消 if (!isCancelled()) { //没有取消,直接执行调度任务 task.call(); if (!executor().isShutdown()) { //如果当前事件执行器没有关闭,则重新计算下一次任务调度的延时时间 long p = periodNanos; if (p > 0) { deadlineNanos += p; } else { deadlineNanos = nanoTime() - p; } if (!isCancelled()) { //如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列 // scheduledTaskQueue can never be null as we lazy init it before submit the task! Queue<ScheduledFutureTask<?>> scheduledTaskQueue = ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue; assert scheduledTaskQueue != null; scheduledTaskQueue.add(this); } } } } } catch (Throwable cause) { //设置失败异常 setFailureInternal(cause); } } @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean canceled = super.cancel(mayInterruptIfRunning); if (canceled) { //如果取消成功,从调度事件执行器,调度任务队列移除任务 ((AbstractScheduledEventExecutor) executor()).removeScheduled(this); } return canceled; } //取消调度任务,不从调度事件执行器,调度任务队列移除任务 boolean cancelWithoutRemove(boolean mayInterruptIfRunning) { return super.cancel(mayInterruptIfRunning); }
从上面来看,调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。调度任务的比较先比较延时时间,如果延时时间相等,则比较任务id(延时时间和id越大,任务执行越靠后)。
看完调度任务,我们回到抽象调度事件执行器:
package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Abstract base class for {@link EventExecutor}s that want to support scheduling. */ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//调度任务队列,类型为PriorityQueue(平衡二叉树) protected AbstractScheduledEventExecutor() { } protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { super(parent); } protected static long nanoTime() { return ScheduledFutureTask.nanoTime(); } //获取调度任务队列 Queue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { //调度任务队列,以调度任务的延时时间和任务ID为比较因子,两者越大,越靠后调度 //优先级队列是一个平衡二叉树,延时时间小的任务在树的左边,大的在右边 scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>(); } return scheduledTaskQueue; } //判断任务队列是否为空 private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) { return queue == null || queue.isEmpty(); } /** * Cancel all scheduled tasks. *取消所有调度任务,调用方法的线程必须在当前事件循环中 * This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected void cancelScheduledTasks() { assert inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; if (isNullOrEmpty(scheduledTaskQueue)) { return; } final ScheduledFutureTask<?>[] scheduledTasks = scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]); //遍历调度任务队列中的任务,取消任务 for (ScheduledFutureTask<?> task: scheduledTasks) { task.cancelWithoutRemove(false); } //清空调度任务队列 scheduledTaskQueue.clear(); } /** * @see #pollScheduledTask(long) 拉取调度任务 */ protected final Runnable pollScheduledTask() { return pollScheduledTask(nanoTime()); } /** * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}. 返回延时时间已到的调度任务,及准备就绪待执行的调度任务 */ protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //从调度任务队列peek一个调度任务 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null;//空,则返回null } //否则如果队列头部的任务延时时间小于,延时任务已运行的时间,则返回调度任务,待执行 if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } return null; } /** * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. 返回下一个调度任务,调度需要等待的时间,没有任务返回-1 */ protected final long nextScheduledTaskNano() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return -1; } //0,表示延时时间已到,调度任务 return Math.max(0, scheduledTask.deadlineNanos() - nanoTime()); } //获取调度任务队列头部的任务 final ScheduledFutureTask<?> peekScheduledTask() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; if (scheduledTaskQueue == null) { return null; } return scheduledTaskQueue.peek(); } /** * Returns {@code true} if a scheduled task is ready for processing. 是否有延时时间已到的调度任务 */ protected final boolean hasScheduledTasks() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); } //调度Runnable任务 @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { //检查线程,事件单元是否为空 ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { //如果延时时间小于0,延时时间为0 delay = 0; } //将Runnable任务包装成调度任务,委托给schedule(final ScheduledFutureTask<V> task) return schedule(new ScheduledFutureTask<Void>( this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } //下面几个调度延时任务,周期性任务,间歇任务,都是先将任务包装成调度任务ScheduledFutureTask, //然后委托给schedule(final ScheduledFutureTask<V> task)方法 @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(callable, "callable"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { delay = 0; } return schedule(new ScheduledFutureTask<V>( this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (period <= 0) { throw new IllegalArgumentException( String.format("period: %d (expected: > 0)", period)); } return schedule(new ScheduledFutureTask<Void>( this, Executors.<Void>callable(command, null), ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (delay <= 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: > 0)", delay)); } return schedule(new ScheduledFutureTask<Void>( this, Executors.<Void>callable(command, null), ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); } //调度任务 <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { //如果线程在当前事务循环中,则添加调度任务到调度任务队列 scheduledTaskQueue().add(task); } else { //否则直接创建一个线程,完成添加调度任务到调度任务队列工作 execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; } //移除调度任务队列 final void removeScheduled(final ScheduledFutureTask<?> task) { if (inEventLoop()) { //如果线程在当前事务循环中,则直接从调度任务队列,移除调度任务 scheduledTaskQueue().remove(task); } else { //否则创建一个线程,完成移除工作 execute(new Runnable() { @Override public void run() { removeScheduled(task); } }); } } }
在调度周期性和间歇性任务,包装任务线程为调度任务时,有这么一句:
Executors.<Void>callable(command, null)
//Executors,JUC执行器工厂,这个JUC篇以说,这里简单列出来
/** * Returns a {@link Callable} object that, when * called, runs the given task and returns the given result. This * can be useful when applying methods requiring a * <tt>Callable</tt> to an otherwise resultless action. * @param task the task to run * @param result the result to return * @return a callable object * @throws NullPointerException if task null */ public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); //包装成RunnableAdapter return new RunnableAdapter<T>(task, result); }
从上面可以看出,抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;
移除调度任务的思想与调度任务相同,只不过执行移除操作。
总结:
调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。
抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列
scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;移除调度任务的思想与调度任务相同,只不过执行移除操作。
附:
//PromiseTask,可以异步任务结果重要的部分我们在文中已说,其他的一看就明白,
这里只贴出PromiseTask源码,不在讲解。
package io.netty.util.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.RunnableFuture; class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> { static <T> Callable<T> toCallable(Runnable runnable, T result) { return new RunnableAdapter<T>(runnable, result); } private static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } @Override public T call() { task.run(); return result; } @Override public String toString() { return "Callable(task: " + task + ", result: " + result + ')'; } } protected final Callable<V> task; PromiseTask(EventExecutor executor, Runnable runnable, V result) { this(executor, toCallable(runnable, result)); } PromiseTask(EventExecutor executor, Callable<V> callable) { super(executor); task = callable; } @Override public final int hashCode() { return System.identityHashCode(this); } @Override public final boolean equals(Object obj) { return this == obj; } @Override public void run() { try { if (setUncancellableInternal()) { V result = task.call(); setSuccessInternal(result); } } catch (Throwable e) { setFailureInternal(e); } } @Override public final Promise<V> setFailure(Throwable cause) { throw new IllegalStateException(); } protected final Promise<V> setFailureInternal(Throwable cause) { super.setFailure(cause); return this; } @Override public final boolean tryFailure(Throwable cause) { return false; } protected final boolean tryFailureInternal(Throwable cause) { return super.tryFailure(cause); } @Override public final Promise<V> setSuccess(V result) { throw new IllegalStateException(); } protected final Promise<V> setSuccessInternal(V result) { super.setSuccess(result); return this; } @Override public final boolean trySuccess(V result) { return false; } protected final boolean trySuccessInternal(V result) { return super.trySuccess(result); } @Override public final boolean setUncancellable() { throw new IllegalStateException(); } //设置任务不可取消 protected final boolean setUncancellableInternal() { return super.setUncancellable(); } @Override protected StringBuilder toStringBuilder() { StringBuilder buf = super.toStringBuilder(); buf.setCharAt(buf.length() - 1, ','); return buf.append(" task: ") .append(task) .append(')'); } }
来看设置任务不可取消方法
//DefaultPromise
package io.netty.util.concurrent; import io.netty.util.Signal; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class); private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8)); @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");//任务结果状态 private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");//成功 private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");//不可取消 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace( new CancellationException(), DefaultPromise.class, "cancel(...)")); private volatile Object result; private final EventExecutor executor; /** * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}. * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified. * * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor. */ private Object listeners; /** * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll(). */ private short waiters; /** * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the * executor changes. */ private boolean notifyingListeners; //设置任务不可取消 @Override public boolean setUncancellable() { if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) { return true; } Object result = this.result; return !isDone0(result) || !isCancelled0(result); } }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2444netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1317netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1594netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1397netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2177netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1078netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1219netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1309netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1855netty 管道线定义-ChannelPipeline:htt ...
相关推荐
5. **EventLoop** 和 **EventLoopGroup**:线程模型,EventLoop 负责执行 Channel 上的 I/O 操作和事件处理,EventLoopGroup 是 EventLoop 的管理器,负责分配和调度 EventLoop。 三、Netty 的事件驱动模型 Netty ...
事件循环(EventLoop)是Netty处理事件的关键组件,它负责调度和执行事件处理器。 "粘包与半包"是网络通信中常见的问题。当发送的数据超过缓冲区大小时,可能会导致一个完整的数据包被拆分,或者多个数据包被合并到...
EventLoop 是 Netty 的事件处理机制,用于调度和执行任务。Pipeline 是一系列处理器的链,每个处理器(Handler)都有自己的职责,如解码、编码或者业务逻辑处理。 源码分析时,首先需要关注的是 Netty 的启动流程,...
EventLoop是事件处理循环,处理I/O事件并调度任务。了解这些基础概念是理解和使用Netty的关键。 2. **SocketIO协议**:SocketIO协议是基于WebSocket的,但增加了心跳检测、分帧、错误处理等功能。Netty-SocketIO...
EventLoop是Netty处理IO事件的调度器,它负责监听和处理网络事件,以及执行与Channel相关的任务。 Netty还支持多种协议的编码和解码器,它内置了HTTP、WebSocket、SSL/TLS、SPDY等协议的编码和解码器。开发者可以...
5. **EventLoop**: 事件循环是Netty异步模型的基础,它负责调度和执行事件处理器。每个EventLoop通常对应一个线程,处理多个Channel的事件。 6. **Bootstrap**: Bootstrap是启动服务器或客户端的配置类,可以设置...
EventLoop是事件循环,负责调度和执行事件处理器;Pipeline是事件处理链,负责将接收到的数据转发给相应的ChannelHandler处理。 在实现简单的聊天功能时,我们首先需要配置Bootstrap,指定ServerBootstrap来创建...
7. **线程模型**:Netty的EventLoopGroup负责调度和执行任务,通常分为BossGroup(接收新连接)和WorkerGroup(处理已有连接上的读写事件)。 8. **心跳与Keep-Alive**:Netty提供心跳机制和Keep-Alive策略,可以...
事件循环负责调度和执行任务,使得系统能够高效地处理并发连接。 4. **Channel与Handler**:在Netty中,Channel代表网络连接,负责数据的读写。Handler是处理I/O事件的组件,可以定制化实现特定业务逻辑。...
EventLoopGroup则是一组线程,用于执行I/O操作和调度任务。 Netty 5.0的源码包中包含其所有模块的源代码,这些模块涵盖了各种协议的实现,如TCP、UDP、HTTP、WebSocket等,以及编解码器、缓冲区、线程模型等基础...
EventLoop是事件循环,负责调度和执行任务;ByteBuf是Netty的高效字节缓冲区,用于网络数据的读写;Pipeline则是一系列处理器链,处理进来的和要出去的数据。此外,文档还会涵盖各种编解码器、handler、以及连接管理...
EventLoop则负责调度这些事件并执行相应的Handler。 在Android中使用Netty,需要解决的主要问题是线程安全和内存管理。由于Android系统对应用的内存和CPU使用有限制,因此在设计Netty服务时,需要考虑线程池的大小...
2. **EventLoop**:EventLoop 是 Netty 中的线程模型,它负责处理 I/O 事件,并调度任务到相应的 ChannelHandler。Netty 使用多路复用技术,如 Epoll 或 NIO Selector,来提高性能。 3. **Bootstrap**:Bootstrap ...
Channel是Netty中连接网络资源的抽象,EventLoop负责事件的调度和执行,Pipeline则是一种处理链,允许开发者自定义数据在网络中的传输过程。Buffer是Netty内存管理的关键,它优化了Java的原始I/O操作,提高了性能。 ...
4. **EventLoop和EventLoopGroup**: EventLoop是Netty的事件循环,负责处理I/O事件并调度任务。EventLoopGroup是一组EventLoop的集合,通常每个CPU核心对应一个EventLoop,用于负载均衡。 5. **Pipeline(处理链)*...
EventLoop 是执行事件处理的循环,负责调度和执行 Channel 上的事件处理器。通过 Channel 和 EventLoop 的组合,Netty 实现了高效的I/O操作。 在多Socket服务器集成中,Netty 提供了灵活的协议支持。它包含了一套...
2. **任务执行器**:接收并执行来自调度器的任务,可能在多个节点上分布式运行。 3. **通信模块**:基于Netty实现,确保节点间的高效通信和任务传递。 4. **监控和管理**:提供可视化界面或API,用于监控任务状态、...
事件循环(Eventloop)是Netty的核心组件之一,用于处理I/O操作,并负责调度ChannelHandler中的方法执行。 #### 接口EventExecutor 定义了执行任务的基本行为。 #### 接口EventLoop 继承自`EventExecutor`,专用于...
4. **EventLoop与EventLoopGroup**:EventLoop是执行事件处理的单线程,负责调度任务;EventLoopGroup是一组EventLoop的集合,用于分配和管理多个EventLoop。 5. **ChannelHandlerContext**:它是Handler与Pipeline...