- 浏览: 981173 次
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
打算使用xmemcache作为memcache的客户端,由于x ...
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
Channle管道线继承了Inbound、OutBound通道Invoker和Iterable<Entry<String, ChannelHandler>>接口,Channel管道线主要是管理Channel的通道处理器,每个通道有一个Channle管道线。Channle管道线主要定义了添加移除替换通道处理器的相关方法,在添加通道处理器的相关方法中,有一个事件执行器group参数,用于中Inbound和Outbound的相关事件,告诉管道,在不同于IO线程的事件执行器组中,执行通道处理器的事件执行方法,以保证IO线程不会被一个耗时任务阻塞,如果你的业务逻辑完全异步或能够快速的完成,可以添加一个事件执行器组。Channel管道线中的Inbound和Outbound通道处理器,主要通过通道处理器上下文的相关fire-INBOUND_ENT和OUTBOUND_OPR事件方法,传播Inbound和Outbound事件给管道中的下一个通道处理器。
outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理,但读操作实际是通过Channel读取。
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,通道上下文描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。上下文链表的头部为HeadContext,尾部为TailContext。
头部上下文HeadContext的outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理相关事件,但读操作实际是通过Channel读取。HeadContext的通道注册方法channelRegistered,主要是执行通道处理器添加回调任务链中的任务。处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除。
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
Channle管道线继承了Inbound、OutBound通道Invoker和Iterable<Entry<String, ChannelHandler>>接口,Channel管道线主要是管理Channel的通道处理器,每个通道有一个Channle管道线。Channle管道线主要定义了添加移除替换通道处理器的相关方法,在添加通道处理器的相关方法中,有一个事件执行器group参数,用于中Inbound和Outbound的相关事件,告诉管道,在不同于IO线程的事件执行器组中,执行通道处理器的事件执行方法,以保证IO线程不会被一个耗时任务阻塞,如果你的业务逻辑完全异步或能够快速的完成,可以添加一个事件执行器组。Channel管道线中的Inbound和Outbound通道处理器,主要通过通道处理器上下文的相关fire-INBOUND_ENT和OUTBOUND_OPR事件方法,传播Inbound和Outbound事件给管道中的下一个通道处理器。
package io.netty.channel; import io.netty.channel.Channel.Unsafe; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.WeakHashMap; import java.util.concurrent.RejectedExecutionException; /** * The default {@link ChannelPipeline} implementation. It is usually created * by a {@link Channel} implementation when the {@link Channel} is created. */ public class DefaultChannelPipeline implements ChannelPipeline { //log static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); //管道线中通道处理器上下文的头部与尾部名 private static final String HEAD_NAME = generateName0(HeadContext.class); private static final String TAIL_NAME = generateName0(TailContext.class); //从名字来看,是命名缓存,具体作用,遇到再探究 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override protected Map<Class<?>, String> initialValue() throws Exception { return new WeakHashMap<Class<?>, String>(); } }; final AbstractChannelHandlerContext head;//管道线中通道处理器上下文的头部 final AbstractChannelHandlerContext tail;//管道线中通道处理器上下文的尾部 private final Channel channel;//管道线所属Channel private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled(); private Map<EventExecutorGroup, EventExecutor> childExecutors; private MessageSizeEstimator.Handle estimatorHandle; private boolean firstRegistration = true;//是否第一次注册通道处理器上下文 /** * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}. *pendingHandlerCallbackHead为处理添加通道处理器到管道任务链表的头部. * We only keep the head because it is expected that the list is used infrequently and its size is small. * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management * complexity. 因为链表不经常用,同时size较小,所以我们仅保持头部.全迭代插入是节省内存和尾部管理的一个折衷。 */ private PendingHandlerCallback pendingHandlerCallbackHead; /** * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never * change. 一旦通道注册到通道选择器,设置为true,将不会在改变 */ private boolean registered; }
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
public abstract class InternalLoggerFactory { private static volatile InternalLoggerFactory defaultFactory; /** * Creates a new logger instance with the name of the specified class. 创建指定class的log实例 */ public static InternalLogger getInstance(Class<?> clazz) { return getInstance(clazz.getName()); } /** * Creates a new logger instance with the specified name. 创建指定名的log实例 */ public static InternalLogger getInstance(String name) { return getDefaultFactory().newInstance(name); } /** * Returns the default factory. The initial default factory is * {@link JdkLoggerFactory}. 获取默认log工厂 */ public static InternalLoggerFactory getDefaultFactory() { if (defaultFactory == null) { defaultFactory = newDefaultFactory(InternalLoggerFactory.class.getName()); } return defaultFactory; } //创建默认log工厂 @SuppressWarnings("UnusedCatchParameter") private static InternalLoggerFactory newDefaultFactory(String name) { InternalLoggerFactory f; try { f = new Slf4JLoggerFactory(true);//先找SLF4J f.newInstance(name).debug("Using SLF4J as the default logging framework"); } catch (Throwable t1) { try { f = Log4JLoggerFactory.INSTANCE;//再找LOG4j f.newInstance(name).debug("Using Log4J as the default logging framework"); } catch (Throwable t2) { f = JdkLoggerFactory.INSTANCE;//最后JDKLog f.newInstance(name).debug("Using java.util.logging as the default logging framework"); } } return f; }
//管道线中通道处理器上下文的头部与尾部名 private static final String HEAD_NAME = generateName0(HeadContext.class); private static final String TAIL_NAME = generateName0(TailContext.class);
//获取指定类型的名称 private static String generateName0(Class<?> handlerType) { return StringUtil.simpleClassName(handlerType) + "#0"; }
//StringUtil import static io.netty.util.internal.ObjectUtil.*; private static final char PACKAGE_SEPARATOR_CHAR = '.'; /** * Generates a simplified name from a {@link Class}. Similar to {@link Class#getSimpleName()}, but it works fine * with anonymous classes. */ public static String simpleClassName(Class<?> clazz) { //获取类型name String className = checkNotNull(clazz, "clazz").getName(); //获取name最后一个简单类型名的起始索引 final int lastDotIdx = className.lastIndexOf(PACKAGE_SEPARATOR_CHAR); if (lastDotIdx > -1) { return className.substring(lastDotIdx + 1); } return className; }
/** * A grab-bag of useful utility methods. */ public final class ObjectUtil { /** * Checks that the given argument is not null. If it is, throws {@link NullPointerException}. * Otherwise, returns the argument. 检查给定的参数是否为空 */ public static <T> T checkNotNull(T arg, String text) { if (arg == null) { throw new NullPointerException(text); } return arg; } }
//HeadContext final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); //获取管道线所属通道的unsafe unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise); } @Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } //上述方法来看,outbound的相关操作,直接委托给管道线所属通道的unsafe,这个应该是Native API @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { //处理待完成的通道上下文添加任务 invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) { destroy(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); //通道激活时,就开始读取数据 readIfIsAutoRead(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); //读取操作完成,继续处理器通道中为完成的数据 readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { //委托给通道的read方法 channel.read(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } }
outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理,但读操作实际是通过Channel读取。
HeadContext(DefaultChannelPipeline pipeline) { //父类构造 super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); //更新通道上下文状态为添加完毕 setAddComplete(); }
//AbstractChannelHandlerContext private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState"); //设置通道上下文添加完毕 final void setAddComplete() { for (;;) { int oldState = handlerState; // Ensure we never update when the handlerState is REMOVE_COMPLETE already. // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not // exposing ordering guarantees. //确保在更新处理器状态时,通道没有被移除 if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { return; } } }
super(pipeline, null, HEAD_NAME, false, true);
//AbstractChannelHandlerContext abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); volatile AbstractChannelHandlerContext next;//通道处理器上下文后继 volatile AbstractChannelHandlerContext prev;//通道处理器上下文前驱 /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called. */ private static final int ADD_PENDING = 1;//添加状态 /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. */ private static final int ADD_COMPLETE = 2;//添加完成状态 /** * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ private static final int REMOVE_COMPLETE = 3;//移除完成状态 /** * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ private static final int INIT = 0;//初始化状态 private final boolean inbound;//是否为inbound处理器上下文 private final boolean outbound;//是否为outbound处理器上下文 private final DefaultChannelPipeline pipeline;//上下文关联管道线 private final String name; private final boolean ordered;//事件执行器是否为顺序执行器 // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. final EventExecutor executor; private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. // There is no need to make this volatile as at worse it will just create a few more instances then needed. private Runnable invokeChannelReadCompleteTask; private Runnable invokeReadTask; private Runnable invokeChannelWritableStateChangedTask; private Runnable invokeFlushTask; private volatile int handlerState = INIT; AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; } }
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } @Override public ChannelHandler handler() { return handler; } private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } }
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { //处理待完成的通道上下文添加任务 invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } //如果需要,执行待添加通道上下文的Callback任务 final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, // that were added before the registration was done. //注册到事件循环.调用在注册之前,添加通道处理器的回调任务 callHandlerAddedForAllHandlers(); } } private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { //断言当前通道没有注册 assert !registered; // This Channel itself was registered.通道注册到选择器 registered = true; //获取添加处理器任务的回调任务 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. GC this.pendingHandlerCallbackHead = null; } // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside // the EventLoop. //回调任务的执行,不许放在同步外边,因为当持有锁调用handlerAdded方法时,如果handlerAdded尝试从事件循环的外部添加 //另外一个处理器,可能产生一个死锁。 PendingHandlerCallback task = pendingHandlerCallbackHead; //遍历添加通道处理器回调任务列表,执行任务 while (task != null) { task.execute(); task = task.next; } }
//PendingHandlerCallback private abstract static class PendingHandlerCallback implements Runnable { final AbstractChannelHandlerContext ctx; PendingHandlerCallback next; PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; } abstract void execute(); }
//PendingHandlerAddedTask 处理器添加回调任务 private final class PendingHandlerAddedTask extends PendingHandlerCallback { PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); } @Override public void run() { callHandlerAdded0(ctx); } //在通道注册到选择器时,调用 @Override void execute() { //获取通道上下文的事件执行器 EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { //如果当前执行器在事务循环中直接委托为callHandlerAdded0 callHandlerAdded0(ctx); } else { try { //否则执行器,直接执行处理器添加回调任务 executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } //异常则移除通道处理器上下文 remove0(ctx); ctx.setRemoved();//标志为移除 } } } }
//EventExecutor /** * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument 当前线程是否在事务循环中 */ boolean inEventLoop(); /** * Return {@code true} if the given {@link Thread} is executed in the event loop, * {@code false} otherwise. */ boolean inEventLoop(Thread thread);
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { //触发上下文关联通道处理器的handlerAdded事件 ctx.handler().handlerAdded(ctx); //更新上下文状态为添加完毕 ctx.setAddComplete(); } catch (Throwable t) { //异常发生移除通道上下文 boolean removed = false; try { //移除上下文 remove0(ctx); try { //触发通道处理器的handlerRemoved事件 ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved();//标记为已移除 } removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } } if (removed) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t)); } else { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t)); } } }
//从管道线中移除指定通道处理器上下文 private static void remove0(AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; }
@Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered();//触发上下文fireChannelUnregistered // Remove all handlers sequentially if channel is closed and unregistered. //如果通道关闭,并处于反注册状态,移除所有通道处理器 if (!channel.isOpen()) { //移除处理器 destroy(); } }
/** * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger * handlerRemoved(). * 从管道尾部到头部移除所有处理器,并触发处理器handlerRemoved事件 * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)}) * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that * the handlers are removed after all events are handled. *注意,先从当前管道线的位置到尾部,移除处理器(#destroyUp),等到达尾部时,再从头部开始移除(#destroyDown), 以保证在处理器移除前,所有的事件已经处理。 * See: https://github.com/netty/netty/issues/3156 */ private synchronized void destroy() { //完成实际的移除工作 destroyUp(head.next, false); } //从当前位置到尾部 private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) { final Thread currentThread = Thread.currentThread(); final AbstractChannelHandlerContext tail = this.tail; for (;;) { if (ctx == tail) { //如果当前通道上下文为尾部,则转向头部 destroyDown(currentThread, tail.prev, inEventLoop); break; } final EventExecutor executor = ctx.executor(); //如果执行器或消息任务线程不在当前事件循环中,创建一个线程执行移除工作 if (!inEventLoop && !executor.inEventLoop(currentThread)) { final AbstractChannelHandlerContext finalCtx = ctx; executor.execute(new Runnable() { @Override public void run() { destroyUp(finalCtx, true); } }); break; } ctx = ctx.next; inEventLoop = false; } } //到达尾部,从头部开始移除处理器 private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) { // We have reached at tail; now traverse backwards. //从管道头部开始移除通道处理器 final AbstractChannelHandlerContext head = this.head; for (;;) { if (ctx == head) { break; } final EventExecutor executor = ctx.executor(); //如果执行器或消息任务线程在当前事件循环中,则直接移除 if (inEventLoop || executor.inEventLoop(currentThread)) { synchronized (this) { //从管道移除上下文 remove0(ctx); } //触发处理移除事件,并更新上下为状态为已移除 callHandlerRemoved0(ctx); } else { final AbstractChannelHandlerContext finalCtx = ctx; //否则,开启一个新的线程,执行移除工作状态 executor.execute(new Runnable() { @Override public void run() { destroyDown(Thread.currentThread(), finalCtx, true); } }); break; } ctx = ctx.prev; inEventLoop = false; } } //触发处理移除事件,并更新上下为状态为已移除 private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) { // Notify the complete removal. try { try { //触发上下文关联的处理器handlerRemoved事件 ctx.handler().handlerRemoved(ctx); } finally { //更新上下文状态为已移除 ctx.setRemoved(); } } catch (Throwable t) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t)); } }
private final class PendingHandlerRemovedTask extends PendingHandlerCallback { PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) { super(ctx); } @Override public void run() { //触发处理移除事件,并更新上下为状态为已移除状态 callHandlerRemoved0(ctx); } @Override void execute() { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerRemoved0(ctx); } else { //如果移除线程不在当前事务循环中,移除任务交给上下文关联的事件执行器 try { executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," + " removing handler {}.", executor, ctx.name(), e); } // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved(). ctx.setRemoved(); } } } }
// A special catch-all handler that handles both bytes and messages. final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { //构造尾部上下文 TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } @Override//获取通道处理器 public ChannelHandler handler() { return this; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // This may not be a configuration error and so don't log anything. // The event may be superfluous for the current pipeline configuration. //释放触发用户事件对象 ReferenceCountUtil.release(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } }
//异常操作 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); }
/** * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. 当异常没有被用户的handler处理,到达管道的尾部时,调用 */ protected void onUnhandledInboundException(Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception.", cause); } finally { ReferenceCountUtil.release(cause); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }
/** * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. 当消息没有被用户的handler处理,到达管道的尾部时,调用此方法释放消息对象 */ protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
//从名字来看,是命名缓存,具体作用,遇到再探究 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override protected Map<Class<?>, String> initialValue() throws Exception { return new WeakHashMap<Class<?>, String>(); } };
final AbstractChannelHandlerContext head;//管道线中通道处理器上下文的头部 final AbstractChannelHandlerContext tail;//管道线中通道处理器上下文的尾部
private final Channel channel;//管道线所属Channel private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors; private MessageSizeEstimator.Handle estimatorHandle; private boolean firstRegistration = true;//是否第一次注册通道处理器上下文
/** * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}. *pendingHandlerCallbackHead为处理添加通道处理器到管道任务链表的头部. * We only keep the head because it is expected that the list is used infrequently and its size is small. * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management * complexity. 因为链表不经常用,同时size较小,所以我们仅保持头部.全迭代插入是节省内存和尾部管理的一个折衷。 */ private PendingHandlerCallback pendingHandlerCallbackHead;
/** * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never * change. 一旦通道注册到通道选择器,设置为true,将不会在改变 */ private boolean registered;
protected DefaultChannelPipeline(Channel channel) { //检查管道关联的通道是否为null this.channel = ObjectUtil.checkNotNull(channel, "channel"); //新建一个通道完成任务 succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); //初始化头部和尾部上下文 tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,通道上下文描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。上下文链表的头部为HeadContext,尾部为TailContext。
头部上下文HeadContext的outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理相关事件,但读操作实际是通过Channel读取。HeadContext的通道注册方法channelRegistered,主要是执行通道处理器添加回调任务链中的任务。处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除。
package io.netty.channel; import io.netty.util.concurrent.EventExecutor; /** * The {@link CompleteChannelFuture} which is succeeded already. It is * recommended to use {@link Channel#newSucceededFuture()} instead of * calling the constructor of this future. 已经成功完成的任务 */ final class SucceededChannelFuture extends CompleteChannelFuture { /** * Creates a new instance. * * @param channel the {@link Channel} associated with this future */ SucceededChannelFuture(Channel channel, EventExecutor executor) { super(channel, executor); } @Override public Throwable cause() { return null; } @Override public boolean isSuccess() { return true; } }
package io.netty.channel; import io.netty.util.concurrent.CompleteFuture; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; /** * A skeletal {@link ChannelFuture} implementation which represents a * {@link ChannelFuture} which has been completed already. */ abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture { private final Channel channel;//关联通道 /** * Creates a new instance. * * @param channel the {@link Channel} associated with this future */ protected CompleteChannelFuture(Channel channel, EventExecutor executor) { super(executor); if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; } @Override protected EventExecutor executor() { EventExecutor e = super.executor(); if (e == null) { return channel().eventLoop(); } else { return e; } } @Override public ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; } @Override public ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { super.addListeners(listeners); return this; } @Override public ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.removeListener(listener); return this; } @Override public ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { super.removeListeners(listeners); return this; } @Override public ChannelFuture syncUninterruptibly() { return this; } @Override public ChannelFuture sync() throws InterruptedException { return this; } @Override public ChannelFuture await() throws InterruptedException { return this; } @Override public ChannelFuture awaitUninterruptibly() { return this; } @Override public Channel channel() { return channel; } @Override public Void getNow() { return null; } @Override public boolean isVoid() { return false; } }
package io.netty.util.concurrent; import java.util.concurrent.TimeUnit; /** * A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already. 表示一个已经完成的任务 */ public abstract class CompleteFuture<V> extends AbstractFuture<V> { private final EventExecutor executor;//内部事件执行器 /** * Creates a new instance. * * @param executor the {@link EventExecutor} associated with this future */ protected CompleteFuture(EventExecutor executor) { this.executor = executor; } /** * Return the {@link EventExecutor} which is used by this {@link CompleteFuture}. */ protected EventExecutor executor() { return executor; } @Override public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { if (listener == null) { throw new NullPointerException("listener"); } DefaultPromise.notifyListener(executor(), this, listener); return this; } @Override public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { if (listeners == null) { throw new NullPointerException("listeners"); } for (GenericFutureListener<? extends Future<? super V>> l: listeners) { if (l == null) { break; } DefaultPromise.notifyListener(executor(), this, l); } return this; } @Override public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) { // NOOP return this; } @Override public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { // NOOP return this; } @Override public Future<V> await() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return this; } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return true; } @Override public Future<V> sync() throws InterruptedException { return this; } @Override public Future<V> syncUninterruptibly() { return this; } @Override public boolean await(long timeoutMillis) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return true; } @Override public Future<V> awaitUninterruptibly() { return this; } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { return true; } @Override public boolean awaitUninterruptibly(long timeoutMillis) { return true; } @Override public boolean isDone() { return true; } @Override public boolean isCancellable() { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } }
package io.netty.channel; import io.netty.util.concurrent.AbstractFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.concurrent.TimeUnit; final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise { private final Channel channel;//关联通道 private final boolean fireException;//异常发生时,是否触发异常 /** * Creates a new instance. * * @param channel the {@link Channel} associated with this future */ VoidChannelPromise(Channel channel, boolean fireException) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; this.fireException = fireException; } @Override public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { fail(); return this; } @Override public VoidChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { fail(); return this; } @Override public VoidChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) { // NOOP return this; } @Override public VoidChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { // NOOP return this; } @Override public VoidChannelPromise await() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return this; } @Override public boolean await(long timeout, TimeUnit unit) { fail(); return false; } @Override public boolean await(long timeoutMillis) { fail(); return false; } @Override public VoidChannelPromise awaitUninterruptibly() { fail(); return this; } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { fail(); return false; } @Override public boolean awaitUninterruptibly(long timeoutMillis) { fail(); return false; } @Override public Channel channel() { return channel; } @Override public boolean isDone() { return false; } @Override public boolean isSuccess() { return false; } @Override public boolean setUncancellable() { return true; } @Override public boolean isCancellable() { return false; } @Override public boolean isCancelled() { return false; } @Override public Throwable cause() { return null; } @Override public VoidChannelPromise sync() { fail(); return this; } @Override public VoidChannelPromise syncUninterruptibly() { fail(); return this; } @Override public VoidChannelPromise setFailure(Throwable cause) { fireException(cause); return this; } @Override public VoidChannelPromise setSuccess() { return this; } @Override public boolean tryFailure(Throwable cause) { fireException(cause); return false; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean trySuccess() { return false; } private static void fail() { throw new IllegalStateException("void future"); } @Override public VoidChannelPromise setSuccess(Void result) { return this; } @Override public boolean trySuccess(Void result) { return false; } @Override public Void getNow() { return null; } @Override public ChannelPromise unvoid() { ChannelPromise promise = new DefaultChannelPromise(channel); if (fireException) { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { fireException(future.cause()); } } }); } return promise; } @Override public boolean isVoid() { return true; } private void fireException(Throwable cause) { // Only fire the exception if the channel is open and registered // if not the pipeline is not setup and so it would hit the tail // of the pipeline. // See https://github.com/netty/netty/issues/1517 if (fireException && channel.isRegistered()) { channel.pipeline().fireExceptionCaught(cause); } } }
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2444netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1317netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1594netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1397netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2177netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1078netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1219netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1309netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1855netty 管道线定义-ChannelPipeline:htt ...
Spring Boot的初始化器会自动查找`ServerBootstrap`实例,因此我们可以通过扩展`AbstractServerBootstrapFactory`来自定义服务器初始化: ```java import io.netty.bootstrap.ServerBootstrap; import io.netty....
Netty支持多种序列化方式,其中包括使用Java原生的序列化机制。 Java序列化是通过实现`Serializable`接口来完成的。当一个类实现这个接口时,它的实例就可以被序列化。在Netty中,我们通常会创建一个专门用于序列化...
SpringBoot是Spring框架的一个轻量级容器,它简化了Spring应用程序的初始设置和配置。SpringBoot 2.0带来了许多改进,包括性能提升、更好的错误处理、对Java 9的支持以及对WebFlux的支持,后者是Spring提供的反应式...
- **模块化架构**: Netty采用了高度模块化的设计,允许开发者根据需求灵活选择所需组件,减少了不必要的依赖负担。 - **高度可定制**: Netty允许开发者自定义处理网络事件的逻辑,比如数据解码/编码、异常处理等,...
Bootstrap是Netty中的一个启动类,用于配置并初始化Netty服务。它有两个主要实现:`Bootstrap`和`ServerBootstrap`,分别用于客户端和服务端的初始化。 #### 类AbstractBootstrap 这是一个抽象基类,为`Bootstrap`...
在 Netty 中,"Pipeline"(管道)是其核心设计之一,它提供了一种处理进/出站数据的有效方式。下面我们将深入探讨 Netty 的管道机制以及如何进行测试。 首先,Netty 管道是数据传输路径上的处理链,它由多个处理器...
Netty的事件驱动模型基于“事件循环”(Event Loop)和“管道”(Channel Pipeline)的概念。事件循环是Netty中的核心组件,负责处理I/O事件,并将这些事件分发到对应的处理器。管道则是一系列处理器的链,每个...
Channel是Netty中的核心组件,代表一个网络连接,而ChannelHandler则是处理I/O事件和数据传输的处理器。 书中会详细讲解Netty的管道(Pipeline)机制,这是Netty处理网络请求的独特方式。管道由一系列的...
1. **工厂模式(Factory Pattern)**:Netty的ChannelInitializer就是一个典型的工厂模式,它用于初始化ChannelPipeline,根据需求创建并添加相应的处理器。 2. **单例模式(Singleton Pattern)**:Netty的...
BootStrap则负责Netty应用的启动和初始化。 Netty的高性能主要得益于其零拷贝技术,池化技术以及对滑动窗口的优化。零拷贝技术可以减少不必要的内存拷贝操作,池化技术可以复用池中的Channel、Buffer等资源,减少...
3. **Netty 服务器启动**: 使用 Spring 的 ApplicationListener 或 CommandLineRunner 接口,在应用启动时触发 Netty 服务器的初始化和启动。这样可以确保 Netty 服务器在 Spring 容器启动后自动运行。 4. **...
- 创建一个 ChannelInitializer,用于初始化新建的 Channel,添加 ChannelHandler 到 ChannelPipeline。 - 绑定监听端口,启动服务端。 2. **客户端:** - 创建一个客户端 Bootstrap,设置 EventLoopGroup。 - ...
channelRead0() 方法用于处理接收到的客户端消息,而channelActive() 方法在客户端连接建立后被调用,适合在这个时候发送欢迎消息或进行其他初始化操作。 在实际的代码实现中,我们可能还需要定义自定义的Message...
2. 添加自定义的 ChannelInitializer,用于在 Channel 初始化时设置 ChannelPipeline 中的 Handler。 3. 绑定客户端的 Handler,通常包含编码(将业务对象转换为 ByteBuf)和解码(将 ByteBuf 转换回业务对象)逻辑...
2. **Channel与Pipeline**:Netty中的Channel代表一个网络连接,负责读写数据。Pipeline(管道)则是一系列处理通道事件的处理器链,每个处理器可以对数据进行解码、编码或者执行业务逻辑。 3. **ByteBuf**:Netty...