- 浏览: 996143 次
-
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
引言:
前面一篇文章我们看了抽象调度事件执行器,来回顾一下:
调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。
抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;移除调度任务的思想与调度任务相同,只不过执行移除操作。
今天我们来看单线程事件执行器:
从上面可以看出单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;
再来看构造
从上面来看,单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。
总结:
单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;
单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略,默认拒绝策略为直接抛出拒绝执行器异常。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。
附:
在单线程事件执行器的变量声明和构造中有几点,需要我们关注,分别如下:
1.线程工厂
//ThreadPerTaskExecutor 线程执行器
//默认线程工厂DefaultThreadFactory
2.
//全局事务执行器 GlobalEventExecutor
从上面可以看出全局事件执行器,执行任务时,首先添加任务到任务队列,如果线程在当前事务循环中,则等待执行,否则启动一个工作线程TaskRunner,不断从任务队列中take任务执行。如果任务队列为空,且调度任务队列为空或调度任务队列只有一个待调度的任务,则关闭事件执行器,这样做的目的是保证工作线程互质地从任务队列消费任务。
3.
//任务拒绝执行策略RejectedExecutionHandlers
//SingleThreadEventExecutor
//RejectedExecutionHandler
4.线程属性
//线程属性ThreadProperties,一看就明白,就不说了
来看单线程执行器内部的实现:
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
引言:
前面一篇文章我们看了抽象调度事件执行器,来回顾一下:
调度任务ScheduledFutureTask,内部有一个任务调度延时变量deadlineNanos,用于记录下一次调度的延时时间;调度任务间隔时间periodNanos为0,调度任务非周期性任务,大于0,周期性调度,小于零,固定延时调度;对于创建Runnable形式的调度,要先包装成Callable任务;调度任务执行时,对于非周期性任务,则直接执行,而周期性与间歇性任务,计算任务下一次任务调度的延时时间,如果调度任务没取消,则添加调度任务到关联的调度事件执行器调度任务队列。
抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;移除调度任务的思想与调度任务相同,只不过执行移除操作。
今天我们来看单线程事件执行器:
package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.lang.Thread.State; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread. * */ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //最大执行任数,最小为16 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE)); private static final InternalLogger logger = InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class); //事件执行器状态 private static final int ST_NOT_STARTED = 1;//就绪 private static final int ST_STARTED = 2;//开始 private static final int ST_SHUTTING_DOWN = 3;//正在关闭 private static final int ST_SHUTDOWN = 4;//已关闭 private static final int ST_TERMINATED = 5;//终止 //唤醒任务 private static final Runnable WAKEUP_TASK = new Runnable() { @Override public void run() { // Do nothing. } }; //空任务 private static final Runnable NOOP_TASK = new Runnable() { @Override public void run() { // Do nothing. } }; //事件执行器状态 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); //线程属性 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER = AtomicReferenceFieldUpdater.newUpdater( SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties"); //任务队列 private final Queue<Runnable> taskQueue; private volatile Thread thread;//当前事件执行器线程 @SuppressWarnings("unused")//线程属性 private volatile ThreadProperties threadProperties; private final Executor executor; private volatile boolean interrupted;//是否中断 private final Semaphore threadLock = new Semaphore(0);//事件执行器关闭信号量 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();//关闭Hooks任务 private final boolean addTaskWakesUp; private final int maxPendingTasks;//最大执行器任务 private final RejectedExecutionHandler rejectedExecutionHandler;//任务拒绝策略 private long lastExecutionTime;//上次执行器时间 @SuppressWarnings({ "FieldMayBeFinal", "unused" }) private volatile int state = ST_NOT_STARTED; private volatile long gracefulShutdownQuietPeriod;//关闭间隔QuietPeriod private volatile long gracefulShutdownTimeout;//关闭超时时间 private long gracefulShutdownStartTime;//关闭开始时间 //终止异步任务结果 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE); }
从上面可以看出单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;
再来看构造
/** * Create a new instance * * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it 父事件执行器 * @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread} 线程工厂 * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread 添加任务时,是否唤醒执行器 */ protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp); } /** * Create a new instance * * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it * @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread} * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. 最大任务数 * @param rejectedHandler the {@link RejectedExecutionHandler} to use. 拒绝任务策略 */ protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler); } /** * Create a new instance * * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it * @param executor the {@link Executor} which will be used for executing * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject()); } /** * Create a new instance * * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it * @param executor the {@link Executor} which will be used for executing * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * executor thread * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. * @param rejectedHandler the {@link RejectedExecutionHandler} to use. */ protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); //创建任务度列 taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } /** * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant * implementation that does not support blocking operations at all. 创建一个队列,存放任务,以待执行,默认实现为LinkedBlockingQueue */ protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { return new LinkedBlockingQueue<Runnable>(maxPendingTasks); }
从上面来看,单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。
总结:
单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;
单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略,默认拒绝策略为直接抛出拒绝执行器异常。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。
附:
在单线程事件执行器的变量声明和构造中有几点,需要我们关注,分别如下:
1.线程工厂
protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler); }
//ThreadPerTaskExecutor 线程执行器
import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory;//线程工厂 public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } //执行任务 @Override public void execute(Runnable command) { //将任务包装成 threadFactory.newThread(command).start(); } }
//默认线程工厂DefaultThreadFactory
package io.netty.util.concurrent; import io.netty.util.internal.StringUtil; import java.util.Locale; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * A {@link ThreadFactory} implementation with a simple naming rule. */ public class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolId = new AtomicInteger();//线程池id生成器 private final AtomicInteger nextId = new AtomicInteger();//线程id生成器 private final String prefix;//线程名前缀 private final boolean daemon;//是否为守候模式 private final int priority;//优先级 protected final ThreadGroup threadGroup;//线程组 public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); } public DefaultThreadFactory(String poolName) { this(poolName, false, Thread.NORM_PRIORITY); } public DefaultThreadFactory(Class<?> poolType, boolean daemon) { this(poolType, daemon, Thread.NORM_PRIORITY); } public DefaultThreadFactory(String poolName, boolean daemon) { this(poolName, daemon, Thread.NORM_PRIORITY); } public DefaultThreadFactory(Class<?> poolType, int priority) { this(poolType, false, priority); } public DefaultThreadFactory(String poolName, int priority) { this(poolName, false, priority); } public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { this(toPoolName(poolType), daemon, priority); } public static String toPoolName(Class<?> poolType) { if (poolType == null) { throw new NullPointerException("poolType"); } String poolName = StringUtil.simpleClassName(poolType); switch (poolName.length()) { case 0: return "unknown"; case 1: return poolName.toLowerCase(Locale.US); default: if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) { return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1); } else { return poolName; } } } public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null) { throw new NullPointerException("poolName"); } if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } public DefaultThreadFactory(String poolName, boolean daemon, int priority) { //如果系统安全管理器没有配置,则为线程当前所属的组,否则获取安全管理器配置的线程组 this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); } //创建任务线程 @Override public Thread newThread(Runnable r) { Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } //创建FastThreadLocalThread protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } //任务包装线程 private static final class DefaultRunnableDecorator implements Runnable { private final Runnable r; DefaultRunnableDecorator(Runnable r) { this.r = r; } @Override public void run() { try { r.run(); } finally { //释放线程的所有本地变量 FastThreadLocal.removeAll(); } } } }
2.
//终止异步任务结果 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
//全局事务执行器 GlobalEventExecutor
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no * task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to * this executor; use a dedicated executor. GlobalEventExecutor为线程单例事件执行器。自动开始任务,当在一秒内没有任务添加到队列时,停止。 注意此执行不适合调度大量的任务。调度大量任务,可以用专用的执行器 */ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class); //调度间隔 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1); //全局事件执行器实例 public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); //任务队列 final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(); //调度任务 final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>( this, Executors.<Void>callable(new Runnable() { @Override public void run() { // NOOP } }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not // be sticky about its thread group // visible for testing 线程工厂为默认的线程工程,在第二点中我们简单说一下 final ThreadFactory threadFactory = new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null); //任务线程 private final TaskRunner taskRunner = new TaskRunner(); //事件执行器状态 private final AtomicBoolean started = new AtomicBoolean(); volatile Thread thread; //异步终止任务线程 private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException()); private GlobalEventExecutor() { scheduledTaskQueue().add(quietPeriodTask); } /** * Take the next {@link Runnable} from the task queue and so will block if no task is currently present. *从任务队列获取任务,任务队列为空,则阻塞,直到有任务 * @return {@code null} if the executor thread has been interrupted or waken up. */ Runnable takeTask() { BlockingQueue<Runnable> taskQueue = this.taskQueue; for (;;) { //获取调度任务 ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null) { Runnable task = null; try { //如果调度任务队列为空,则从任务队列中take一个任务 task = taskQueue.take(); } catch (InterruptedException e) { // Ignore } return task; } else { //否则获取调度任务延时 long delayNanos = scheduledTask.delayNanos(); Runnable task; if (delayNanos > 0) { try { //如果延时大于0,则从任务队列延时poll一个任务 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { // Waken up. return null; } } else { //否者直接从队列拉取一个任务 task = taskQueue.poll(); } if (task == null) { //从调度任务队列抓取调度任务,添加到任务队列 fetchFromScheduledTaskQueue(); task = taskQueue.poll(); } if (task != null) { return task; } } } } //从调度任务队列抓取调度任务,添加到任务队列 private void fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //拉取调度任务 Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { //将调度任务添加到任务队列 taskQueue.add(scheduledTask); scheduledTask = pollScheduledTask(nanoTime); } } /** * Return the number of tasks that are pending for processing. *返回任务队列中的任务数量 * [b]Be aware that this operation may be expensive as it depends on the internal implementation of the * SingleThreadEventExecutor. So use it was care![/b] */ public int pendingTasks() { return taskQueue.size(); } /** * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown * before. 添加任务到任务队列,如果事件执行器已关闭,则抛出拒绝执行器异常 */ private void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } taskQueue.add(task); } //判断线程是否在当前事件循环中 @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; } //关闭事件执行器 @Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return terminationFuture(); } //返回关闭异步任务结果 @Override public Future<?> terminationFuture() { return terminationFuture; } //关闭事件执行器 @Override @Deprecated public void shutdown() { throw new UnsupportedOperationException(); } //下面3个方法为判断事件执行器是否关闭,正在关闭,已终止 @Override public boolean isShuttingDown() { return false; } @Override public boolean isShutdown() { return false; } @Override public boolean isTerminated() { return false; } //不支持超时终止 @Override public boolean awaitTermination(long timeout, TimeUnit unit) { return false; } /** * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself. * Because a new worker thread will be started again when a new task is submitted, this operation is only useful * when you want to ensure that the worker thread is terminated [b]after[/b] your application is shut * down and there's no chance of submitting a new task afterwards. 等待执行器的工作线程没有任务可执行,终止。当任务提交到执行器时,新建一个工作线程,当你想保证在工作线程终止前,没有任务提交到 执行器,这个方法非常有用。 * * @return {@code true} if and only if the worker thread has been terminated */ public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException { if (unit == null) { throw new NullPointerException("unit"); } final Thread thread = this.thread; if (thread == null) { throw new IllegalStateException("thread was not started"); } // * Waits at most {@code millis} milliseconds for this thread to // * die. A timeout of {@code 0} means to wait forever. thread.join(unit.toMillis(timeout)); return !thread.isAlive(); } //执行任务 @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //添加任务到任务队列 addTask(task); if (!inEventLoop()) { //如果任务线程不在当前事件循环中,则直接执行线程 startThread(); } } //根据任务 private void startThread() { //事件执行器已开始 if (started.compareAndSet(false, true)) { //创建任务执行线程 Thread t = threadFactory.newThread(taskRunner); // Set the thread before starting it as otherwise inEventLoop() may return false and so produce // an assert error. // See https://github.com/netty/netty/issues/4357 thread = t; //直接启动任务执行线程 t.start(); } } //任务线程 final class TaskRunner implements Runnable { @Override public void run() { for (;;) { //从任务队列take任务 Runnable task = takeTask(); if (task != null) { try { //任务不为空,则执行任务 task.run(); } catch (Throwable t) { logger.warn("Unexpected exception from the global event executor: ", t); } //如果任务非quietPeriodTask,跳出当前循环 if (task != quietPeriodTask) { continue; } } //获取调度任务队列 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue; // Terminate if there is no task in the queue (except the noop task). if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) { //如果任务队列为空且调度任务队列为空或调度任务队列只有一个待调度的任务 // Mark the current thread as stopped. // The following CAS must always success and must be uncontended, // because only one thread should be running at the same time. //设置事件执行器已关闭 boolean stopped = started.compareAndSet(true, false); assert stopped; // Check if there are pending entries added by execute() or schedule*() while we do CAS above. if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) { // A) No new task was added and thus there's nothing to handle // -> safe to terminate because there's nothing left to do // B) A new thread started and handled all the new tasks. // -> safe to terminate the new thread will take care the rest break; } // There are pending tasks added again. if (!started.compareAndSet(false, true)) { // startThread() started a new thread and set 'started' to true. // -> terminate this thread so that the new thread reads from taskQueue exclusively. //如果线程不能重新开始,则终止线程,这样做的目的是保证工作线程互质地,从任务队列消费任务。 break; } // New tasks were added, but this worker was faster to set 'started' to true. // i.e. a new worker thread was not started by startThread(). // -> keep this thread alive to handle the newly added entries. } } } } }
从上面可以看出全局事件执行器,执行任务时,首先添加任务到任务队列,如果线程在当前事务循环中,则等待执行,否则启动一个工作线程TaskRunner,不断从任务队列中take任务执行。如果任务队列为空,且调度任务队列为空或调度任务队列只有一个待调度的任务,则关闭事件执行器,这样做的目的是保证工作线程互质地从任务队列消费任务。
3.
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject()); }
//任务拒绝执行策略RejectedExecutionHandlers
package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; /** * Expose helper methods which create different {@link RejectedExecutionHandler}s. */ public final class RejectedExecutionHandlers { //直接抛出异常 private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() { @Override public void rejected(Runnable task, SingleThreadEventExecutor executor) { throw new RejectedExecutionException(); } }; private RejectedExecutionHandlers() { } /** * Returns a {@link RejectedExecutionHandler} that will always just throw a {@link RejectedExecutionException}. */ public static RejectedExecutionHandler reject() { return REJECT; } /** * Tries to backoff when the task can not be added due restrictions for an configured amount of time. This * is only done if the task was added from outside of the event loop which means * {@link EventExecutor#inEventLoop()} returns {@code false}. 当事件执行器不在当前事件循环的处理策略, */ public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) { ObjectUtil.checkPositive(retries, "retries"); final long backOffNanos = unit.toNanos(backoffAmount); return new RejectedExecutionHandler() { @Override public void rejected(Runnable task, SingleThreadEventExecutor executor) { //如果执行器不在当前事件循环中,则尝试唤醒执行器,并清空任务队列,然后添加任务到执行器任务队列 if (!executor.inEventLoop()) { for (int i = 0; i < retries; i++) { // Try to wake up the executor so it will empty its task queue. //唤醒执行器,清空任务队列 executor.wakeup(false); LockSupport.parkNanos(backOffNanos); //添加任务到执行器任务队列 if (executor.offerTask(task)) { return; } } } // Either we tried to add the task from within the EventLoop or we was not able to add it even with // backoff. throw new RejectedExecutionException(); } }; } }
//SingleThreadEventExecutor
protected void wakeup(boolean inEventLoop) { if (!inEventLoop || state == ST_SHUTTING_DOWN) { // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there // is already something in the queue. //添加唤醒任务线程到队列 taskQueue.offer(WAKEUP_TASK); } }
//RejectedExecutionHandler
package io.netty.util.concurrent; /** * Similar to {@link java.util.concurrent.RejectedExecutionHandler} but specific to {@link SingleThreadEventExecutor}. */ public interface RejectedExecutionHandler { /** * Called when someone tried to add a task to {@link SingleThreadEventExecutor} but this failed due capacity * restrictions. */ void rejected(Runnable task, SingleThreadEventExecutor executor); }
4.线程属性
@SuppressWarnings("unused")//线程属性 private volatile ThreadProperties threadProperties;
//线程属性ThreadProperties,一看就明白,就不说了
package io.netty.util.concurrent; /** * Expose details for a {@link Thread}. */ public interface ThreadProperties { /** * @see Thread#getState() */ Thread.State state(); /** * @see Thread#getPriority() */ int priority(); /** * @see Thread#isInterrupted() */ boolean isInterrupted(); /** * @see Thread#isDaemon() */ boolean isDaemon(); /** * @see Thread#getName() */ String name(); /** * @see Thread#getId() */ long id(); /** * @see Thread#getStackTrace() */ StackTraceElement[] stackTrace(); /** * @see Thread#isAlive() */ boolean isAlive(); }
来看单线程执行器内部的实现:
private static final class DefaultThreadProperties implements ThreadProperties { private final Thread t; DefaultThreadProperties(Thread t) { this.t = t; } @Override public State state() { return t.getState(); } @Override public int priority() { return t.getPriority(); } @Override public boolean isInterrupted() { return t.isInterrupted(); } @Override public boolean isDaemon() { return t.isDaemon(); } @Override public String name() { return t.getName(); } @Override public long id() { return t.getId(); } @Override public StackTraceElement[] stackTrace() { return t.getStackTrace(); } @Override public boolean isAlive() { return t.isAlive(); } }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1359netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2092netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2490netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1340netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1340netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1620netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1867netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1432netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2868netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2217netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2059netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1105netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1904netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1251netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1233netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 984netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1330netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2215netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1123netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1887netty 管道线定义-ChannelPipeline:htt ...
相关推荐
EventLoop是Netty的核心组件,它是一个单线程执行任务的循环,负责处理I/O事件并分发到对应的处理器。每个EventLoop都包含一个Selector,用于监听多个通道的事件。通过多路复用技术,EventLoop可以高效地处理大量...
5. **Bootstrap**: Bootstrap 类用于初始化服务器端(ServerBootstrap)或客户端(Bootstrap)的配置,包括选择器、事件循环组、通道类等。 6. **Codec**: Netty 提供了一系列的编解码器,用于将各种数据类型转换为...
10. **线程模型**:Netty使用事件驱动和单线程或多线程模型相结合的方式,提高了并发处理能力。例如,一个EventLoop通常负责多个连接,减少了线程切换的开销。 通过学习《Netty In Action》中文版,读者不仅可以...
- **EventLoop**: 负责执行事件处理的线程,Netty使用了单线程模型来处理每个连接。 - **Buffer**: Netty 提供了ByteBuf,用于高效地处理网络数据。 3. **Netty的编码与解码** Netty 提供了多种编解码器,如 ...
- **Reactor模式**:Netty采用了Reactor模式来处理I/O事件,该模式的核心在于事件循环(Event Loop)和多路复用器(Multiplexer),使得单个线程能够处理多个Channel的I/O操作。 - **Zero-Copy**:Netty支持零拷贝技术,...
选择器用于监控多个通道,当通道上有可读、可写事件发生时,选择器会通知用户线程,这样可以实现单线程管理多个连接,降低了系统的线程开销。 Netty是一个高性能、异步事件驱动的网络应用框架,专为Java设计,用于...
单线程版本的`EventExecutor`,所有任务都在同一个线程中执行。 #### 类SingleThreadEventLoop `SingleThreadEventExecutor`在Netty中的具体实现,专门用于处理单个线程中的事件。 #### 类NioEventLoop 继承自`...
Netty是一个高性能、异步事件驱动的网络应用框架,它为Java开发人员提供了构建高性能、稳定、可扩展的网络服务器和客户端的API。在Android应用开发中,内网推送服务通常用于实现实时的消息传递,比如即时通讯或者...
- Netty的非阻塞I/O模型使得它可以高效地处理大量并发连接,这与Spring的单线程模型不同。 - 需要理解如何在Spring中正确地处理Netty的异步回调,以避免线程安全问题。 5. **整合Netty的事件循环和Spring的任务...
Netty 使用多线程模型,EventLoop 负责执行事件处理器(ChannelHandler)的方法。 3. **ChannelFuture** - ChannelFuture 代表了一个I/O操作的未来结果,它是异步编程的关键。你可以注册监听器来接收操作完成的通知...
- **好处**: 分散负载,避免单线程处理过多 Channel 导致的性能瓶颈。 ##### **2.3 事件处理** - **机制**: Netty 通过事件驱动模型来处理 I/O 事件,包括但不限于连接建立、读写事件等。 - **注册**: 当 Channel ...
6. 启动引导:Netty通过其Bootstrap类提供了灵活的网络应用程序启动和初始化机制,支持服务端和客户端的启动配置。 7. 示例:Netty的书籍通常包括大量示例,如单元测试、使用WebSockets、SPDY协议、以及使用UDP进行...
1. **NIO(Non-blocking I/O)**:Netty 基于 Java NIO(非阻塞I/O)构建,允许单线程处理多个并发连接,从而提高了性能和效率。 2. **Channel**:在 Netty 中,Channel 是网络连接的抽象,它可以是 TCP、UDP 或...
3. **高度可定制的线程模型**:Netty 允许开发者根据应用需求定制线程模型,比如单线程、单个线程池或多线程池等。 4. **无连接数据报支持**:自 3.1 版本起,Netty 支持无连接的数据报套接字,进一步拓宽了应用场景...
ServerBootstrap在Netty服务器中扮演着配置中心的角色,用于初始化服务器端的通道设置。通过ServerBootstrap实例,我们可以设置各种属性,如channel类型、通道选项、属性以及处理器等。这些属性方法可以分为两类:一...
Bootstrap是启动器,用于配置服务器或客户端的初始化设置。Channel是网络连接的抽象,代表一个到另一端的连接,可以读写数据。EventLoopGroup是一组事件循环,负责执行任务和处理I/O事件。Pipeline则是一个责任链...