- 浏览: 981058 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
引言:
前面一篇文章我们看了事件执行器组和事件执行器的接口定义,先来回顾一下:
事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable<EventExecutor>管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。
事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取事件执行器组管理的事件执行器和执行任务线程方法。
事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的异步结果。
抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器组的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s,具体定义在抽象事件执行器AbstractEventExecutor中。
抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.<EventExecutor>singleton(this)),是自己单例集,next方法返回的是自己。
我们先把Nio事件循环继承树结构列一下:
事件循环组EventLoopGroup和事件执行器组EventExecutorGroup及事件执行器EventExecutor的关系;
调度执行器ScheduledExecutorService为JUC包中的执行器服务,用迭代器Iterable<EventExecutor>管理组内的
事件执行器。
再来看事件循环组的另一个分支EventLoopGroup
今天我们来看一下多线程事件执行器组:
从上面来看,多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。
我们来看一下事件执行器选择器的定义:
其定义在选择器工厂EventExecutorChooserFactory中
在选择器工厂定义中有一个注解我们来看一下:
回到事件执行器选择工厂的默认实现:
从上面可以看出当,当执行器的数量为2的幂次方时,使用的事件执行器选择器为PowerOfTwoEventExecutorChooser,
否则为GenericEventExecutorChooser。
回到多线程事件执行器组的构造:
从上面的构造函数来看,事件执行器选择器默认为DefaultEventExecutorChooserFactory,然后委托给
再看另外一个种构造:
来看任务线程执行器ThreadPerTaskExecutor
从上面来看默认的线程池执行器为ThreadPerTaskExecutor。
来看实际构造
构造方法中有一个点我们需要关注,创建默认线程池工厂:
默认线程工厂类,我们需要关注的只有以下一点:
//InternalThreadLocalMap,看看即,后具体涉及再看
从上面可以看出,构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
回到多线程执行器组的关闭,超时等待方Terminated法:
从上面可以看出获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
总结:
多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
引言:
前面一篇文章我们看了事件执行器组和事件执行器的接口定义,先来回顾一下:
事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable<EventExecutor>管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。
事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取事件执行器组管理的事件执行器和执行任务线程方法。
事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的异步结果。
抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器组的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s,具体定义在抽象事件执行器AbstractEventExecutor中。
抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.<EventExecutor>singleton(this)),是自己单例集,next方法返回的是自己。
我们先把Nio事件循环继承树结构列一下:
事件循环组EventLoopGroup和事件执行器组EventExecutorGroup及事件执行器EventExecutor的关系;
/** * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. Nio事件循环组NioEventLoopGroup为多线程事件循环组的事件,主要用于基于通道的Nio选择器相关操作。 */ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/** * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at * the same time. 多线程事件循环组MultithreadEventLoopGroup为事件循环组的实现,可以在同一时间多线程处理任务。 */ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
/** * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at * the same time. 多线程事件执行器组MultithreadEventExecutorGroup可以在同一时间多线程处理任务。 */ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
/** * Abstract base class for {@link EventExecutorGroup} implementations. 事件循环中的抽象实现 */ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
/** * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use * via its {@link #next()} method. Besides this, it is also responsible for handling their * life-cycle and allows shutting them down in a global fashion. 事件执行器组通道next方法提供事件执行器。除此之外,负责他们的生命循环,并允许以全局的方式关闭 * */ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
调度执行器ScheduledExecutorService为JUC包中的执行器服务,用迭代器Iterable<EventExecutor>管理组内的
事件执行器。
/** * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes * with some handy methods to see if a {@link Thread} is executed in a event loop. * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic * way to access methods. 事件执行器EventExecutor是一个特殊的事件执行器组,如果线程在事件循环中执行,事件执行器可以处理 相关的操作。除此之外,拓展了事件执行器组的相关方法,可以用一般的方式访问事件执行器组的相关方法。 * */ public interface EventExecutor extends EventExecutorGroup {
再来看事件循环组的另一个分支EventLoopGroup
/** * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get * processed for later selection during the event loop. 事件循环组为一个特殊的事件执行器组,可以注册通道,以便在事件循环中,被后面的选择操作处理器。 * */ public interface EventLoopGroup extends EventExecutorGroup {
今天我们来看一下多线程事件执行器组:
/** * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at * the same time. 多线程事件执行器组MultithreadEventExecutorGroup可以在同一时间多线程处理任务。 */ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { private final EventExecutor[] children;//组内的事件执行器 private final Set<EventExecutor> readonlyChildren;//组内事件执行器集的可读包装集 private final AtomicInteger terminatedChildren = new AtomicInteger();//已关闭的事件执行器数 //termination异步任务结果 private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); private final EventExecutorChooserFactory.EventExecutorChooser chooser;//事件执行器选择器 }
从上面来看,多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。
我们来看一下事件执行器选择器的定义:
其定义在选择器工厂EventExecutorChooserFactory中
package io.netty.util.concurrent; import io.netty.util.internal.UnstableApi; /** * Factory that creates new {@link EventExecutorChooser}s. */ @UnstableApi public interface EventExecutorChooserFactory { /** * Returns a new {@link EventExecutorChooser}. 创建一个事件执行器选择器 */ EventExecutorChooser newChooser(EventExecutor[] executors); /** * Chooses the next {@link EventExecutor} to use. */ @UnstableApi interface EventExecutorChooser { /** * Returns the new {@link EventExecutor} to use. 返回新创建的事件执行功能器 */ EventExecutor next(); } }
在选择器工厂定义中有一个注解我们来看一下:
package io.netty.util.internal; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * Indicates a public API that can change at any time (even in minor/bugfix releases). *预示着公用的API可能随时改变 * Usage guidelines: *使用引导 * [list=1] * [*]Is not needed for things located in *.internal.* packages * [*]Only public accessible classes/interfaces must be annotated 公有方法方式必须注解 * <li>If this annotation is not present the API is considered stable and so no backward compatibility can be * broken in a non-major release!</li> 此注解不表示API不稳定,而是在一个非主线释放版本中,不向后兼容。 * [/list] */ @Retention(RetentionPolicy.SOURCE) @Target({ ElementType.ANNOTATION_TYPE, ElementType.CONSTRUCTOR, ElementType.FIELD, ElementType.METHOD, ElementType.PACKAGE, ElementType.TYPE }) @Documented public @interface UnstableApi { }
回到事件执行器选择工厂的默认实现:
package io.netty.util.concurrent; import io.netty.util.internal.UnstableApi; import java.util.concurrent.atomic.AtomicInteger; /** * Default implementation which uses simple round-robin to choose next {@link EventExecutor}. */ @UnstableApi public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { //默认事件执行器选择器工程实例 public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override //根据执行器的数量是否为2的幂次方决定使用PowerOfTwoEventExecutorChooser还是GenericEventExecutorChooser public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } //判断执行器数是否为2的幂次方 private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } //2的幂次方事件执行器选择器 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } //一般事件执行器选择器 private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } //获取内部执行器id对应的事件执行器 @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } }
从上面可以看出当,当执行器的数量为2的幂次方时,使用的事件执行器选择器为PowerOfTwoEventExecutorChooser,
否则为GenericEventExecutorChooser。
回到多线程事件执行器组的构造:
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. 线程数 * @param executor the Executor to use, or {@code null} if the default should be used. 执行器 * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call 传递给创建事件执行器的参数 */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
从上面的构造函数来看,事件执行器选择器默认为DefaultEventExecutorChooserFactory,然后委托给
MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
再看另外一个种构造:
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args); }
来看任务线程执行器ThreadPerTaskExecutor
package io.netty.util.concurrent; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
从上面来看默认的线程池执行器为ThreadPerTaskExecutor。
来看实际构造
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //检查线程数参数 if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //如果执行器不为空,则初始化线程执行器的线程工厂 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //创建事件执行器集 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创建事件执行器,newChild待子类实现 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { //初始化失败,则关闭事件执行器 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } //Terminated事件执行器 for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //初始化事件执行器选择器 chooser = chooserFactory.newChooser(children); //创建terminated事件执行器监听器 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //添加terminated事件执行器监听器到terminated异步任务结果 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } //包装事件执行器集为只读 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } }
构造方法中有一个点我们需要关注,创建默认线程池工厂:
protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); }
package io.netty.util.concurrent; import io.netty.util.internal.StringUtil; import java.util.Locale; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * A {@link ThreadFactory} implementation with a simple naming rule. */ public class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolId = new AtomicInteger(); private final AtomicInteger nextId = new AtomicInteger();//线程id生成器 private final String prefix;//线程名前缀 private final boolean daemon;//是否为守候模式 private final int priority;//线程有限级 protected final ThreadGroup threadGroup;//线程组 public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); } public DefaultThreadFactory(String poolName) { this(poolName, false, Thread.NORM_PRIORITY); } public DefaultThreadFactory(Class<?> poolType, boolean daemon) { this(poolType, daemon, Thread.NORM_PRIORITY); } public DefaultThreadFactory(String poolName, boolean daemon) { this(poolName, daemon, Thread.NORM_PRIORITY); } public DefaultThreadFactory(Class<?> poolType, int priority) { this(poolType, false, priority); } public DefaultThreadFactory(String poolName, int priority) { this(poolName, false, priority); } public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { this(toPoolName(poolType), daemon, priority); } //获取线程池名称 public static String toPoolName(Class<?> poolType) { if (poolType == null) { throw new NullPointerException("poolType"); } String poolName = StringUtil.simpleClassName(poolType); switch (poolName.length()) { case 0: return "unknown"; case 1: return poolName.toLowerCase(Locale.US); default: if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) { return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1); } else { return poolName; } } } public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null) { throw new NullPointerException("poolName"); } if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } //线程名默认为线程池名+id prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } public DefaultThreadFactory(String poolName, boolean daemon, int priority) { //如果系统安全管理器为空,则线程组为当前线程所属组,否则系统安全管理获取线程组信息 this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); } @Override public Thread newThread(Runnable r) { Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } //包装线程为FastThreadLocalThread protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } //线程包装类 private static final class DefaultRunnableDecorator implements Runnable { private final Runnable r; DefaultRunnableDecorator(Runnable r) { this.r = r; } @Override public void run() { try { r.run(); } finally { //移除所有线程级的变量 FastThreadLocal.removeAll(); } } } }
默认线程工厂类,我们需要关注的只有以下一点:
//包装线程为FastThreadLocalThread protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); }
package io.netty.util.concurrent; import io.netty.util.internal.InternalThreadLocalMap; /** * A special {@link Thread} that provides fast access to {@link FastThreadLocal} variables. */ public class FastThreadLocalThread extends Thread { private InternalThreadLocalMap threadLocalMap;//线程本地变量Map public FastThreadLocalThread() { } public FastThreadLocalThread(Runnable target) { super(target); } public FastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); } public FastThreadLocalThread(String name) { super(name); } public FastThreadLocalThread(ThreadGroup group, String name) { super(group, name); } public FastThreadLocalThread(Runnable target, String name) { super(target, name); } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); } /** * Returns the internal data structure that keeps the thread-local variables bound to this thread. * Note that this method is for internal use only, and thus is subject to change at any time. 返回绑定的当前线程的线程本地变量Map */ public final InternalThreadLocalMap threadLocalMap() { return threadLocalMap; } /** * Sets the internal data structure that keeps the thread-local variables bound to this thread. * Note that this method is for internal use only, and thus is subject to change at any time. */ public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { this.threadLocalMap = threadLocalMap; } }
//InternalThreadLocalMap,看看即,后具体涉及再看
/** * The internal data structure that stores the thread-local variables for Netty and all {@link FastThreadLocal}s. * Note that this class is for internal use only and is subject to change at any time. Use {@link FastThreadLocal} * unless you know what you are doing. 线程本地变量Map */ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8; public static final Object UNSET = new Object(); ... }
从上面可以看出,构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
回到多线程执行器组的关闭,超时等待方Terminated法:
@Override public EventExecutor next() { //委托给事件执行器选择器 return chooser.next(); } @Override public Iterator<EventExecutor> iterator() { //获取只读执行器集的迭代器 return readonlyChildren.iterator(); } /** * Return the number of {@link EventExecutor} this implementation uses. This number is the maps * 1:1 to the threads it use. 返回管理的事件执行器数量 */ public final int executorCount() { return children.length; } /** * Create a new EventExecutor which will later then accessible via the {@link #next()} method. This method will be * called for each thread that will serve this {@link MultithreadEventExecutorGroup}. *创建一个事件执行器,创建后,可以通过next方法访问。此方法由服务多线程事件执行器组线程调用, 待子类实现 */ protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception; //关闭事件执行器组 @Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { //遍历管理的事件执行器集,关闭执行器 for (EventExecutor l: children) { l.shutdownGracefully(quietPeriod, timeout, unit); } //返回异步关闭事件执行器组任务结果 return terminationFuture(); } @Override public Future<?> terminationFuture() { return terminationFuture; } @Override @Deprecated public void shutdown() { for (EventExecutor l: children) { l.shutdown(); } } //所有执行器组内的事件执行器正在关闭,才返回true @Override public boolean isShuttingDown() { for (EventExecutor l: children) { if (!l.isShuttingDown()) { return false; } } return true; } //所有执行器组内的事件执行器已关闭,才返回true @Override public boolean isShutdown() { for (EventExecutor l: children) { if (!l.isShutdown()) { return false; } } return true; } //所有执行器组内的事件执行器已Terminated,才返回true @Override public boolean isTerminated() { for (EventExecutor l: children) { if (!l.isTerminated()) { return false; } } return true; } //超时等待Terminated执行器组 @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.nanoTime() + unit.toNanos(timeout); loop: for (EventExecutor l: children) {//遍历事件执行器组 for (;;) { long timeLeft = deadline - System.nanoTime(); if (timeLeft <= 0) {//超时等待时间耗完,则停止Terminated执行器组 break loop; } //否则,超时剩余等待时间timeLeft,Terminated事件执行器 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { break; } } } return isTerminated(); }
从上面可以看出获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
总结:
多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2444netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1316netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1594netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1397netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2177netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1078netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1219netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1309netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1855netty 管道线定义-ChannelPipeline:htt ...
相关推荐
5. **线程模型**:Netty采用了EventLoop(事件循环)和EventExecutorGroup(事件执行器组)来管理线程,有效地利用了多核处理器资源。 **Socket技术** Socket是网络通信的接口,允许两个进程通过网络进行通信。在...
EventLoop是Netty的核心组件,它是一个单线程执行任务的循环,负责处理I/O事件并分发到对应的处理器。每个EventLoop都包含一个Selector,用于监听多个通道的事件。通过多路复用技术,EventLoop可以高效地处理大量...
6. 线程安全:Netty通过其线程模型保证了在多线程环境下的正确性,例如,它使用了安全的事件分发机制,避免了并发访问的冲突。 总的来说,Netty的线程模型是其高性能的关键之一,通过合理地调度和管理线程,Netty...
1. `NioEventLoop`:这是Netty针对Java NIO实现的EventLoop类,其中包含了对选择器(Selector)的管理和事件的分发。 2. `ChannelHandlerContext`:它是Netty的上下文对象,用于封装了与特定Channel相关的所有操作...
事件循环(EventLoop)是Netty处理事件的关键组件,它负责调度和执行事件处理器。 "粘包与半包"是网络通信中常见的问题。当发送的数据超过缓冲区大小时,可能会导致一个完整的数据包被拆分,或者多个数据包被合并到...
- **多线程模型**:有一组NIO线程处理I/O操作。这种模型可以提高系统的并发处理能力。 - **主从多线程模型**:服务端用于接收客户端连接的是一个独立的NIO线程池。这种模型进一步优化了系统性能,适用于大规模并发...
3. **线程模型**:Netty 的 Boss-Worker 线程模型,Boss 线程负责接收新连接,Worker 线程负责处理 I/O 事件,确保了高并发下的性能。 4. **心跳机制**:Netty 支持自定义心跳包,确保长连接的有效性,防止因网络...
- 首先,我们需要定义一个ServerBootstrap实例,设置EventLoopGroup以管理线程,并配置通道初始化器以添加自定义的ChannelHandler,用于处理接收到的数据。 - 接着,使用bind方法绑定监听的端口,启动服务器。 - ...
5. **EventLoop** 和 **EventLoopGroup**:线程模型,EventLoop 负责执行 Channel 上的 I/O 操作和事件处理,EventLoopGroup 是 EventLoop 的管理器,负责分配和调度 EventLoop。 三、Netty 的事件驱动模型 Netty ...
10. **线程模型**:Netty有灵活的线程模型,可以根据应用需求选择单线程、多线程或者工作窃取模型。 这个"Netty CHM"文件可以帮助开发者快速查找Netty API的使用方法,理解各种类和接口的功能,以及如何将它们组合...
11. **Reactor 模式**: Netty 基于单线程或多线程的 Reactor 模型运行,实现了高效的并发处理。 12. **Extensibility**: Netty 的设计使其易于扩展,可以轻松地添加新的协议支持或者自定义处理逻辑。 在开发过程中...
对于大文件和多文件上传,Netty的异步特性尤为重要,因为它允许在等待I/O操作完成时执行其他任务,从而提高系统效率。 2. **多文件上传**: 在Netty中实现多文件上传,需要设计一个自定义的编解码器(Encoder和...
10. **线程模型**:Netty使用事件驱动和单线程或多线程模型相结合的方式,提高了并发处理能力。例如,一个EventLoop通常负责多个连接,减少了线程切换的开销。 通过学习《Netty In Action》中文版,读者不仅可以...
1. 参数处理:`group` 指定了 `ChannelHandler` 执行的事件选择器,如果为空则使用 `Channel` 注册的事件循环;`name` 是处理器的名称;`handler` 是要添加的处理器实例。 2. 重复添加检查:通过 `...
Netty 使用多线程模型,每个 EventLoop 负责一组 Channel,确保高效并发处理。 在"ChatNetty"这个例子中,我们将创建一个简单的聊天服务器和客户端。以下是实现的主要步骤: 1. **创建 ChannelHandler**:定义...
- **EventLoop**: 负责执行 ChannelHandler 中的任务,它是线程安全的,保证了事件处理的顺序性。 - **ChannelPipeline**: 一系列 ChannelHandler 组成的链,用于处理进来的数据和事件。 2. **Netty 的异步模型**...
5. **线程模型**:Netty的EventLoop(事件循环)和EventLoopGroup(事件循环组)提供了高效的线程管理,减少了线程创建和销毁的开销。 在“Netty3.5代码例子”中,你可能会看到以下几个关键组件: - **Bootstrap**...