- 浏览: 996660 次
-
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
引言:
上一篇文章我们看了默认Channel管道线初始化,先来回顾一下:
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,通道上下文描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。上下文链表的头部为HeadContext,尾部为TailContext。
头部上下文HeadContext的outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理相关事件,但读操作实际是通过Channel读取。HeadContext的通道注册方法channelRegistered,主要是执行通道处理器添加回调任务链中的任务。处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除。
Channel的管道线的通道处理器上下文链的尾部TailContext是一个傀儡,不同于尾部上下文,头部上下文,在处理inbound事件时,触发通道处理器上下文相关的方法,在处理outbound事件时,委托给管道线关联的Channle的内部unsafe。
默认Channel管道实现内部有两个回调任务PendingHandlerAdded/RemovedTask,一个是添加通道处理器上下文回调任务,一个是移除通道上下文回调任务,主要是触发上下文关联通道处理器的处理器添加移除事件,并更新相应的上下文状态为已添加或已移除。
管道构造,主要是检查管道通道是否为空,初始化管道上下文链的头部与尾部上下文。
netty通道处理器上下文可以说,是Mina中Hanlder和过滤器的集合,整合两者功能,管道线有点Mina过滤链的意味,HeadContext相当于Mina过滤链的头部过滤器,TailContext相当于Mina过滤链的尾部过滤器。
今天我们来看管道线如果添加和移除及替换通道处理器。
先来看添加通道处理器:
添加通道处理器方法有一下几点要看:
1.
2.
3.
//这个上一篇文章已说,我们把它,贴过了:
默认的通达处理器关联一个通道处理器ChannelHandler。
//AbstractChannelHandlerContext
4.
5.
先来看状态更新
方法很容易理解,主要是根据added参数,确定是添加还是移除任务,并创建相应的回调任务,添加到管道的回调任务列表
这一贴出上一篇的添加和移除回调任务以便理解回调任务的作用:
//PendingHandlerAddedTask 处理器添加回调任务
6.
7.
从上面可以看出,添加通道处理器,首次检查通道处理器是否为共享模式,如果非共享,且已添加,则抛出异常;检查通道处理器名在管道内,是否存在对应通道处理器上下文,已存在抛出异常;根据事件执行器,处理器名,及处理器,构造处理器上下文;
添加处理器上限文到管道上下文链;如果通道没有注册到事件循环,上下文添加到管道时,创建添加通道处理器回调任务,并将任务添加管道的回调任务链中,当通道注册到事件循环时,触发通道处理器的handlerAdded事件,已注册则创建一个线程,
用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加,并交由事件执行器执行;最好调用callHandlerAdded0方法,确保调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加。
再来看添加处理器器到管道尾部操作:
//与addFirst方法不同点,添加通道处理器上下文到管道上下文链尾
再看添加处理器到指定处理前面
再来看addAfter操作:
总结:
添加通道处理器到管道头部,首次检查通道处理器是否为共享模式,如果非共享,且已添加,则抛出异常;检查通道处理器名在管道内,是否存在对应通道处理器上下文,已存在抛出异常;根据事件执行器,处理器名,及处理器,构造处理器上下文;添加处理器上限文到管道上下文链头;如果通道没有注册到事件循环,上下文添加到管道时,创建添加通道处理器回调任务,并将任务添加管道的回调任务链中,当通道注册到事件循环时,触发通道处理器的handlerAdded事件,已注册则创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加,并交由事件执行器执行;最好调用callHandlerAdded0方法,确保调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加。其他last(添加到管道尾部),before(添加指定上下文的前面),after(添加指定上下文的后面)操作,基本上与addfirst思路基本相同,不同的是添加到管道上下文链的位置。
附:
以下方法很简单,了解一下即可
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
引言:
上一篇文章我们看了默认Channel管道线初始化,先来回顾一下:
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,通道上下文描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。上下文链表的头部为HeadContext,尾部为TailContext。
头部上下文HeadContext的outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理相关事件,但读操作实际是通过Channel读取。HeadContext的通道注册方法channelRegistered,主要是执行通道处理器添加回调任务链中的任务。处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除。
Channel的管道线的通道处理器上下文链的尾部TailContext是一个傀儡,不同于尾部上下文,头部上下文,在处理inbound事件时,触发通道处理器上下文相关的方法,在处理outbound事件时,委托给管道线关联的Channle的内部unsafe。
默认Channel管道实现内部有两个回调任务PendingHandlerAdded/RemovedTask,一个是添加通道处理器上下文回调任务,一个是移除通道上下文回调任务,主要是触发上下文关联通道处理器的处理器添加移除事件,并更新相应的上下文状态为已添加或已移除。
管道构造,主要是检查管道通道是否为空,初始化管道上下文链的头部与尾部上下文。
netty通道处理器上下文可以说,是Mina中Hanlder和过滤器的集合,整合两者功能,管道线有点Mina过滤链的意味,HeadContext相当于Mina过滤链的头部过滤器,TailContext相当于Mina过滤链的尾部过滤器。
今天我们来看管道线如果添加和移除及替换通道处理器。
先来看添加通道处理器:
@Override public final ChannelPipeline addFirst(String name, ChannelHandler handler) { //根据事件执行器,处理器名及处理器,添加处理器到管道 return addFirst(null, name, handler); } //根据事件执行器,处理器名及处理器,添加处理器到管道 @Override public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //检查处理器是否为共享模式 checkMultiplicity(handler); //检查处理器名 name = filterName(name, handler); //根据事件执行器,处理器名,及处理器,构造处理器上下文 newCtx = newContext(group, name, handler); //添加处理上限文到管道上下文链 addFirst0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. //如果通道没有到事件循环,上下文添加到管道,需要添加一个回调任务, //当通道注册到事件循环时,触发通道处理器的handlerAdded事件 if (!registered) { newCtx.setAddPending();//更新上下文状态为正在添加 //创建添加通道处理器回调任务,并将任务添加管道的回调任务链中 callHandlerCallbackLater(newCtx, true); return this; } //获取上下文的时间执行器 EventExecutor executor = newCtx.executor(); //如果事件执行器不在事件循环中 if (!executor.inEventLoop()) { newCtx.setAddPending();//更新上下文状态为正在添加 //创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加 executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } //调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加 callHandlerAdded0(newCtx); return this; }
添加通道处理器方法有一下几点要看:
1.
//检查处理器是否为共享模式 checkMultiplicity(handler);
//检查处理器是否为共享模式 private static void checkMultiplicity(ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { //如果非共享,且已添加,则抛出异常 throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } h.added = true; } }
2.
//检查处理器名 name = filterName(name, handler);
private String filterName(String name, ChannelHandler handler) { if (name == null) { //如果处理器名为空,则产生处理器对应的名称 return generateName(handler); } //否者检查处理器名是否与管道内的处理器名是否相同 checkDuplicateName(name); return name; } private void checkDuplicateName(String name) { if (context0(name) != null) { //如果处理器名已经存在,则抛出异常 throw new IllegalArgumentException("Duplicate handler name: " + name); } } private AbstractChannelHandlerContext context0(String name) { AbstractChannelHandlerContext context = head.next; //检查管道内的处理器上下文是否存在与name相同的上下文 while (context != tail) { if (context.name().equals(name)) { return context; } context = context.next; } return null; }
3.
//根据事件执行器,处理器名,及处理器,构造处理器上下文 newCtx = newContext(group, name, handler);
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
//这个上一篇文章已说,我们把它,贴过了:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } @Override public ChannelHandler handler() { return handler; } private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } }
默认的通达处理器关联一个通道处理器ChannelHandler。
//AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); volatile AbstractChannelHandlerContext next;//通道处理器上下文后继 volatile AbstractChannelHandlerContext prev;//通道处理器上下文前驱 /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called. */ private static final int ADD_PENDING = 1;//添加状态 /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. */ private static final int ADD_COMPLETE = 2;//添加完成状态 /** * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ private static final int REMOVE_COMPLETE = 3;//移除完成状态 /** * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ private static final int INIT = 0;//初始化状态 private final boolean inbound;//是否为inbound处理器上下文 private final boolean outbound;//是否为outbound处理器上下文 private final DefaultChannelPipeline pipeline;//上下文关联管道线 private final String name;//处理器上下文名 private final boolean ordered;//事件执行器是否为顺序执行器 // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. final EventExecutor executor;//事件执行器 private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. // There is no need to make this volatile as at worse it will just create a few more instances then needed. private Runnable invokeChannelReadCompleteTask; private Runnable invokeReadTask; private Runnable invokeChannelWritableStateChangedTask; private Runnable invokeFlushTask; private volatile int handlerState = INIT; AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; } }
4.
//添加处理器上限文到管道上下文链头 private void addFirst0(AbstractChannelHandlerContext newCtx) { //获取上下文链头 AbstractChannelHandlerContext nextCtx = head.next; newCtx.prev = head; newCtx.next = nextCtx; head.next = newCtx; nextCtx.prev = newCtx; }
5.
// If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. //如果通道没有注册到事件循环,上下文添加到管道,需要添加一个回调任务, //当通道注册到事件循环时,触发通道处理器的handlerAdded事件 if (!registered) { newCtx.setAddPending();//更新上下文状态为正在添加 //创建添加通道处理器回调任务,并将任务添加管道的回调任务链中 callHandlerCallbackLater(newCtx, true); return this; }
先来看状态更新
//AbstractChannelHandlerContext final void setAddPending() { boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING); assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved(). }
//创建添加通道处理器回调任务,并将任务添加管道的回调任务链中 callHandlerCallbackLater(newCtx, true);
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); PendingHandlerCallback pending = pendingHandlerCallbackHead; if (pending == null) { pendingHandlerCallbackHead = task; } else { // Find the tail of the linked-list. while (pending.next != null) { pending = pending.next; } pending.next = task; } }
方法很容易理解,主要是根据added参数,确定是添加还是移除任务,并创建相应的回调任务,添加到管道的回调任务列表
这一贴出上一篇的添加和移除回调任务以便理解回调任务的作用:
//PendingHandlerAddedTask 处理器添加回调任务
private final class PendingHandlerAddedTask extends PendingHandlerCallback { PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); } @Override public void run() { callHandlerAdded0(ctx); } //在通道注册到选择器时,调用 @Override void execute() { //获取通道上下文的事件执行器 EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { //如果当前执行器在事务循环中直接委托为callHandlerAdded0 callHandlerAdded0(ctx); } else { try { //否则执行器,直接执行处理器添加回调任务 executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } //异常则移除通道处理器上下文 remove0(ctx); ctx.setRemoved();//标志为移除 } } } }
6.
//获取上下文的时间执行器 EventExecutor executor = newCtx.executor(); //如果事件执行器不在事件循环中 if (!executor.inEventLoop()) { newCtx.setAddPending();//更新上下文状态为正在添加 //创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加 executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; }
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { //触发上下文关联通道处理器的handlerAdded事件 ctx.handler().handlerAdded(ctx); //更新上下文状态为添加完毕 ctx.setAddComplete(); } catch (Throwable t) { //异常发生移除通道上下文 boolean removed = false; try { //移除上下文 remove0(ctx); try { //触发通道处理器的handlerRemoved事件 ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved();//标记为已移除 } removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } } if (removed) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t)); } else { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t)); } } }
7.
//调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加 callHandlerAdded0(newCtx);
从上面可以看出,添加通道处理器,首次检查通道处理器是否为共享模式,如果非共享,且已添加,则抛出异常;检查通道处理器名在管道内,是否存在对应通道处理器上下文,已存在抛出异常;根据事件执行器,处理器名,及处理器,构造处理器上下文;
添加处理器上限文到管道上下文链;如果通道没有注册到事件循环,上下文添加到管道时,创建添加通道处理器回调任务,并将任务添加管道的回调任务链中,当通道注册到事件循环时,触发通道处理器的handlerAdded事件,已注册则创建一个线程,
用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加,并交由事件执行器执行;最好调用callHandlerAdded0方法,确保调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加。
再来看添加处理器器到管道尾部操作:
@Override public final ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast(null, name, handler); } @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); //与addFirst方法不同点,添加通道处理器上下文到管道上下文链尾 addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }
//与addFirst方法不同点,添加通道处理器上下文到管道上下文链尾
addLast0(newCtx);
private void addLast0(AbstractChannelHandlerContext newCtx) { //获取上下文链尾 AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
再看添加处理器到指定处理前面
@Override public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) { return addBefore(null, baseName, name, handler); } @Override public final ChannelPipeline addBefore( EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; synchronized (this) { checkMultiplicity(handler); name = filterName(name, handler); //获取指定处理器的上下文 ctx = getContextOrDie(baseName); newCtx = newContext(group, name, handler); //添加处理器到指定上下文的前面 addBefore0(ctx, newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }
//获取指定处理器的上下文 ctx = getContextOrDie(baseName);
private AbstractChannelHandlerContext getContextOrDie(String name) { //获取name对应的通道处理器上下文 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name); if (ctx == null) { throw new NoSuchElementException(name); } else { return ctx; } } @Override public final ChannelHandlerContext context(String name) { if (name == null) { throw new NullPointerException("name"); } //委托给context0 return context0(name); }
private AbstractChannelHandlerContext context0(String name) { AbstractChannelHandlerContext context = head.next; //遍历管道上下文链,找到name对应的上下文 while (context != tail) { if (context.name().equals(name)) { return context; } context = context.next; } return null; }
//添加处理器到指定上下文的前面 addBefore0(ctx, newCtx);
private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { newCtx.prev = ctx.prev; newCtx.next = ctx; ctx.prev.next = newCtx; ctx.prev = newCtx; }
再来看addAfter操作:
@Override public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) { return addAfter(null, baseName, name, handler); } @Override public final ChannelPipeline addAfter( EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; synchronized (this) { checkMultiplicity(handler); name = filterName(name, handler); //获取baseName对应的上下文 ctx = getContextOrDie(baseName); newCtx = newContext(group, name, handler); //不同点,在这 addAfter0(ctx, newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we remove the context from the pipeline and add a task that will call // ChannelHandler.handlerRemoved(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; } private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { newCtx.prev = ctx; newCtx.next = ctx.next; ctx.next.prev = newCtx; ctx.next = newCtx; }
总结:
添加通道处理器到管道头部,首次检查通道处理器是否为共享模式,如果非共享,且已添加,则抛出异常;检查通道处理器名在管道内,是否存在对应通道处理器上下文,已存在抛出异常;根据事件执行器,处理器名,及处理器,构造处理器上下文;添加处理器上限文到管道上下文链头;如果通道没有注册到事件循环,上下文添加到管道时,创建添加通道处理器回调任务,并将任务添加管道的回调任务链中,当通道注册到事件循环时,触发通道处理器的handlerAdded事件,已注册则创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加,并交由事件执行器执行;最好调用callHandlerAdded0方法,确保调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加。其他last(添加到管道尾部),before(添加指定上下文的前面),after(添加指定上下文的后面)操作,基本上与addfirst思路基本相同,不同的是添加到管道上下文链的位置。
附:
以下方法很简单,了解一下即可
//添加多个通道处理器到管道的头部 @Override public final ChannelPipeline addFirst(ChannelHandler... handlers) { return addFirst(null, handlers); } @Override public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } //检查处理器数组长度 if (handlers.length == 0 || handlers[0] == null) { return this; } int size; //检查处理器数组中的元素是否为null for (size = 1; size < handlers.length; size ++) { if (handlers[size] == null) { break; } } //遍历处理器数组,添加处理器到管道 for (int i = size - 1; i >= 0; i --) { ChannelHandler h = handlers[i]; addFirst(executor, null, h); } return this; } //添加多个通道处理器到管道的尾部 @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1360netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2092netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2491netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1342netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1342netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1621netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1869netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1432netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2870netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2219netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2061netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1109netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1906netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1251netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1236netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 985netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1330netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2216netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1125netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1890netty 管道线定义-ChannelPipeline:htt ...
相关推荐
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:netty-transport-native-unix-common-4.1.74.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.74....
赠送jar包:netty-transport-native-unix-common-4.1.73.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.73....
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:netty-transport-native-unix-common-4.1.68.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.68....
赠送jar包:netty-transport-classes-epoll-4.1.74.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.74.Final-sources.jar;...
赠送jar包:netty-transport-classes-epoll-4.1.73.Final.jar; 赠送原API文档:netty-transport-classes-epoll-4.1.73.Final-javadoc.jar; 赠送源代码:netty-transport-classes-epoll-4.1.73.Final-sources.jar;...
赠送jar包:reactor-netty-http-1.0.11.jar; 赠送原API文档:reactor-netty-http-1.0.11-javadoc.jar; 赠送源代码:reactor-netty-http-1.0.11-sources.jar; 赠送Maven依赖信息文件:reactor-netty-...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:reactor-netty-core-1.0.15.jar; 赠送原API文档:reactor-netty-core-1.0.15-javadoc.jar; 赠送源代码:reactor-netty-core-1.0.15-sources.jar; 赠送Maven依赖信息文件:reactor-netty-core-1.0.15....
赠送jar包:netty-codec-mqtt-4.1.74.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.74.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-codec-haproxy-4.1.73.Final.jar; 赠送原API文档:netty-codec-haproxy-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-haproxy-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-tcnative-classes-2.0.46.Final.jar; 赠送原API文档:netty-tcnative-classes-2.0.46.Final-javadoc.jar; 赠送源代码:netty-tcnative-classes-2.0.46.Final-sources.jar; 赠送Maven依赖信息...
赠送jar包:netty-codec-dns-4.1.65.Final.jar; 赠送原API文档:netty-codec-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-codec-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-dns-...
赠送jar包:netty-codec-dns-4.1.65.Final.jar; 赠送原API文档:netty-codec-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-codec-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-dns-...
赠送jar包:netty-resolver-dns-4.1.65.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.65.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
赠送jar包:netty-codec-mqtt-4.1.73.Final.jar; 赠送原API文档:netty-codec-mqtt-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-mqtt-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...
赠送jar包:netty-resolver-dns-4.1.73.Final.jar; 赠送原API文档:netty-resolver-dns-4.1.73.Final-javadoc.jar; 赠送源代码:netty-resolver-dns-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-...
netty-socketio-netty-socketio-2.0.6 ,Socket.IO 是一个库,可以在客户端和服务器之间实现低延迟, 双向和基于事件的通信:netty-socketio-netty-socketio-2.0.6.tar.gznetty-socketio-netty-socketio-2.0.6.zip
赠送jar包:netty-codec-redis-4.1.73.Final.jar; 赠送原API文档:netty-codec-redis-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-redis-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-codec-...