- 浏览: 984463 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
引言:
前面一篇文章我们看了管道线接口的定义,先来回顾一下:
Channle管道线继承了Inbound、OutBound通道Invoker和Iterable<Entry<String, ChannelHandler>>接口,Channel管道线主要是管理Channel的通道处理器,每个通道有一个Channle管道线。Channle管道线主要定义了添加移除替换通道处理器的相关方法,在添加通道处理器的相关方法中,有一个事件执行器group参数,用于中Inbound和Outbound的相关事件,告诉管道,在不同于IO线程的事件执行器组中,执行通道处理器的事件执行方法,以保证IO线程不会被一个耗时任务阻塞,如果你的业务逻辑完全异步或能够快速的完成,可以添加一个事件执行器组。Channel管道线中的Inbound和Outbound通道处理器,主要通过通道处理器上下文的相关fire-INBOUND_ENT和OUTBOUND_OPR事件方法,传播Inbound和Outbound事件给管道中的下一个通道处理器。
今天来看一下Channel管道线的默认实现DefaultChannelPipeline
在往下看之前,我们要先把默认管道线的变量先大致了解一下,我们现在非完全了解,知道大概的含义和用途,
待后面讲到时,在细说。
上面的变量我们一个一个的来看:
1.
//log
从上面来看netty的log实例,实际是从默认log工厂获取,默认log工厂顺序为SLF4J,LOG4j,JDKLog。
2.
先来看generateName0方法,
//ObjectUtil
从generateName0方法来主要是产生指定类的名称。
再来看一个HeadContext和TailContext
从上面方法的实现来看:
outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理,但读操作实际是通过Channel读取。
再来看一下构造:
来看构造的更新通道上下文状态
再来看父类构造
//这个只需要知道大概意思,后面再说
从上面可以看出通道处理器上下文关联一个管道线,一个事件执行器(处理通道处理器上下文相关的事件),和判断通道处理器上下文是inbound和oubound处理器的两个标志。
在看一个默认的通道处理器上下文的实现:
从上面可看出默认的通达处理器关联一个通道处理器ChannelHandler。
简单理一下通道,通道处理器,通道处理器上下文,管道线的关系:
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。
再来看HeadContext的通道注册方法
再来看添加通道处理器回调任务定义:
来看添加通道上下文方法callHandlerAdded0
再来看移除上下文方法
从上来看处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。
HeadContext的通道注册方法,主要是执行通道处理器添加回调任务链中的任务。
再来看HeadContext的通道上下文反注册方法
从上面来看channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除状态。
在头部上下文的通道注册方法分析中我们有说过通道处理器添加回调任务PendingHandlerAddedTask,
现在来看一下另外一个回调任务,通道处理器移除回调任务
//TailContext
尾部上下文没有什么好说的,来看通道读操作和异常操作:
//读操作
从上面来看Channel的管道线的通道处理器上下文链的尾部是一个傀儡,不同于尾部上下文,头部上下文,在处理inbound事件时,触发通道处理器上下文相关的方法,在处理outbound事件时,委托给管道线关联的Channle的内部unsafe
3-8这部分,我们不再说,用到时再说:
3.
4.
5.
6.
7.
8.
来看管道的构造:
从上来看管道构造,主要是检查管道通道是否为空,初始化管道上下文链的头部与尾部上下文。
总结:
netty的log实例,实际是从默认log工厂获取,默认log工厂顺序为SLF4J,LOG4j,JDKLog。
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,通道上下文描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。上下文链表的头部为HeadContext,尾部为TailContext。
头部上下文HeadContext的outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理相关事件,但读操作实际是通过Channel读取。HeadContext的通道注册方法channelRegistered,主要是执行通道处理器添加回调任务链中的任务。处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除。
Channel的管道线的通道处理器上下文链的尾部TailContext是一个傀儡,不同于尾部上下文,头部上下文,在处理inbound事件时,触发通道处理器上下文相关的方法,在处理outbound事件时,委托给管道线关联的Channle的内部unsafe。
默认Channel管道实现内部有两个回调任务PendingHandlerAdded/RemovedTask,一个是添加通道处理器上下文回调任务,一个是移除通道上下文回调任务,主要是触发上下文关联通道处理器的处理器添加移除事件,并更新相应的上下文状态为已添加或已移除。
管道构造,主要是检查管道通道是否为空,初始化管道上下文链的头部与尾部上下文。
netty通道处理器上下文可以说,是Mina中Hanlder和过滤器的集合,整合两者功能,管道线有点Mina过滤链的意味,HeadContext相当于Mina过滤链的头部过滤器,TailContext相当于Mina过滤链的尾部过滤器
附:
//SucceededChannelFuture
//CompleteChannelFuture
//CompleteFuture
//VoidChannelPromise
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
引言:
前面一篇文章我们看了管道线接口的定义,先来回顾一下:
Channle管道线继承了Inbound、OutBound通道Invoker和Iterable<Entry<String, ChannelHandler>>接口,Channel管道线主要是管理Channel的通道处理器,每个通道有一个Channle管道线。Channle管道线主要定义了添加移除替换通道处理器的相关方法,在添加通道处理器的相关方法中,有一个事件执行器group参数,用于中Inbound和Outbound的相关事件,告诉管道,在不同于IO线程的事件执行器组中,执行通道处理器的事件执行方法,以保证IO线程不会被一个耗时任务阻塞,如果你的业务逻辑完全异步或能够快速的完成,可以添加一个事件执行器组。Channel管道线中的Inbound和Outbound通道处理器,主要通过通道处理器上下文的相关fire-INBOUND_ENT和OUTBOUND_OPR事件方法,传播Inbound和Outbound事件给管道中的下一个通道处理器。
今天来看一下Channel管道线的默认实现DefaultChannelPipeline
package io.netty.channel; import io.netty.channel.Channel.Unsafe; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.WeakHashMap; import java.util.concurrent.RejectedExecutionException; /** * The default {@link ChannelPipeline} implementation. It is usually created * by a {@link Channel} implementation when the {@link Channel} is created. */ public class DefaultChannelPipeline implements ChannelPipeline { //log static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); //管道线中通道处理器上下文的头部与尾部名 private static final String HEAD_NAME = generateName0(HeadContext.class); private static final String TAIL_NAME = generateName0(TailContext.class); //从名字来看,是命名缓存,具体作用,遇到再探究 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override protected Map<Class<?>, String> initialValue() throws Exception { return new WeakHashMap<Class<?>, String>(); } }; final AbstractChannelHandlerContext head;//管道线中通道处理器上下文的头部 final AbstractChannelHandlerContext tail;//管道线中通道处理器上下文的尾部 private final Channel channel;//管道线所属Channel private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled(); private Map<EventExecutorGroup, EventExecutor> childExecutors; private MessageSizeEstimator.Handle estimatorHandle; private boolean firstRegistration = true;//是否第一次注册通道处理器上下文 /** * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}. *pendingHandlerCallbackHead为处理添加通道处理器到管道任务链表的头部. * We only keep the head because it is expected that the list is used infrequently and its size is small. * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management * complexity. 因为链表不经常用,同时size较小,所以我们仅保持头部.全迭代插入是节省内存和尾部管理的一个折衷。 */ private PendingHandlerCallback pendingHandlerCallbackHead; /** * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never * change. 一旦通道注册到通道选择器,设置为true,将不会在改变 */ private boolean registered; }
在往下看之前,我们要先把默认管道线的变量先大致了解一下,我们现在非完全了解,知道大概的含义和用途,
待后面讲到时,在细说。
上面的变量我们一个一个的来看:
1.
//log
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
public abstract class InternalLoggerFactory { private static volatile InternalLoggerFactory defaultFactory; /** * Creates a new logger instance with the name of the specified class. 创建指定class的log实例 */ public static InternalLogger getInstance(Class<?> clazz) { return getInstance(clazz.getName()); } /** * Creates a new logger instance with the specified name. 创建指定名的log实例 */ public static InternalLogger getInstance(String name) { return getDefaultFactory().newInstance(name); } /** * Returns the default factory. The initial default factory is * {@link JdkLoggerFactory}. 获取默认log工厂 */ public static InternalLoggerFactory getDefaultFactory() { if (defaultFactory == null) { defaultFactory = newDefaultFactory(InternalLoggerFactory.class.getName()); } return defaultFactory; } //创建默认log工厂 @SuppressWarnings("UnusedCatchParameter") private static InternalLoggerFactory newDefaultFactory(String name) { InternalLoggerFactory f; try { f = new Slf4JLoggerFactory(true);//先找SLF4J f.newInstance(name).debug("Using SLF4J as the default logging framework"); } catch (Throwable t1) { try { f = Log4JLoggerFactory.INSTANCE;//再找LOG4j f.newInstance(name).debug("Using Log4J as the default logging framework"); } catch (Throwable t2) { f = JdkLoggerFactory.INSTANCE;//最后JDKLog f.newInstance(name).debug("Using java.util.logging as the default logging framework"); } } return f; }
从上面来看netty的log实例,实际是从默认log工厂获取,默认log工厂顺序为SLF4J,LOG4j,JDKLog。
2.
//管道线中通道处理器上下文的头部与尾部名 private static final String HEAD_NAME = generateName0(HeadContext.class); private static final String TAIL_NAME = generateName0(TailContext.class);
先来看generateName0方法,
//获取指定类型的名称 private static String generateName0(Class<?> handlerType) { return StringUtil.simpleClassName(handlerType) + "#0"; }
//StringUtil import static io.netty.util.internal.ObjectUtil.*; private static final char PACKAGE_SEPARATOR_CHAR = '.'; /** * Generates a simplified name from a {@link Class}. Similar to {@link Class#getSimpleName()}, but it works fine * with anonymous classes. */ public static String simpleClassName(Class<?> clazz) { //获取类型name String className = checkNotNull(clazz, "clazz").getName(); //获取name最后一个简单类型名的起始索引 final int lastDotIdx = className.lastIndexOf(PACKAGE_SEPARATOR_CHAR); if (lastDotIdx > -1) { return className.substring(lastDotIdx + 1); } return className; }
//ObjectUtil
/** * A grab-bag of useful utility methods. */ public final class ObjectUtil { /** * Checks that the given argument is not null. If it is, throws {@link NullPointerException}. * Otherwise, returns the argument. 检查给定的参数是否为空 */ public static <T> T checkNotNull(T arg, String text) { if (arg == null) { throw new NullPointerException(text); } return arg; } }
从generateName0方法来主要是产生指定类的名称。
再来看一个HeadContext和TailContext
//HeadContext final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); //获取管道线所属通道的unsafe unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise); } @Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } //上述方法来看,outbound的相关操作,直接委托给管道线所属通道的unsafe,这个应该是Native API @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { //处理待完成的通道上下文添加任务 invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) { destroy(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); //通道激活时,就开始读取数据 readIfIsAutoRead(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); //读取操作完成,继续处理器通道中为完成的数据 readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { //委托给通道的read方法 channel.read(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } }
从上面方法的实现来看:
outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理,但读操作实际是通过Channel读取。
再来看一下构造:
HeadContext(DefaultChannelPipeline pipeline) { //父类构造 super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); //更新通道上下文状态为添加完毕 setAddComplete(); }
来看构造的更新通道上下文状态
//AbstractChannelHandlerContext private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState"); //设置通道上下文添加完毕 final void setAddComplete() { for (;;) { int oldState = handlerState; // Ensure we never update when the handlerState is REMOVE_COMPLETE already. // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not // exposing ordering guarantees. //确保在更新处理器状态时,通道没有被移除 if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { return; } } }
再来看父类构造
super(pipeline, null, HEAD_NAME, false, true);
//这个只需要知道大概意思,后面再说
//AbstractChannelHandlerContext abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); volatile AbstractChannelHandlerContext next;//通道处理器上下文后继 volatile AbstractChannelHandlerContext prev;//通道处理器上下文前驱 /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called. */ private static final int ADD_PENDING = 1;//添加状态 /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. */ private static final int ADD_COMPLETE = 2;//添加完成状态 /** * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ private static final int REMOVE_COMPLETE = 3;//移除完成状态 /** * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ private static final int INIT = 0;//初始化状态 private final boolean inbound;//是否为inbound处理器上下文 private final boolean outbound;//是否为outbound处理器上下文 private final DefaultChannelPipeline pipeline;//上下文关联管道线 private final String name; private final boolean ordered;//事件执行器是否为顺序执行器 // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. final EventExecutor executor; private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. // There is no need to make this volatile as at worse it will just create a few more instances then needed. private Runnable invokeChannelReadCompleteTask; private Runnable invokeReadTask; private Runnable invokeChannelWritableStateChangedTask; private Runnable invokeFlushTask; private volatile int handlerState = INIT; AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; } }
从上面可以看出通道处理器上下文关联一个管道线,一个事件执行器(处理通道处理器上下文相关的事件),和判断通道处理器上下文是inbound和oubound处理器的两个标志。
在看一个默认的通道处理器上下文的实现:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } @Override public ChannelHandler handler() { return handler; } private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } }
从上面可看出默认的通达处理器关联一个通道处理器ChannelHandler。
简单理一下通道,通道处理器,通道处理器上下文,管道线的关系:
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。
再来看HeadContext的通道注册方法
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { //处理待完成的通道上下文添加任务 invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } //如果需要,执行待添加通道上下文的Callback任务 final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, // that were added before the registration was done. //注册到事件循环.调用在注册之前,添加通道处理器的回调任务 callHandlerAddedForAllHandlers(); } } private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { //断言当前通道没有注册 assert !registered; // This Channel itself was registered.通道注册到选择器 registered = true; //获取添加处理器任务的回调任务 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. GC this.pendingHandlerCallbackHead = null; } // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside // the EventLoop. //回调任务的执行,不许放在同步外边,因为当持有锁调用handlerAdded方法时,如果handlerAdded尝试从事件循环的外部添加 //另外一个处理器,可能产生一个死锁。 PendingHandlerCallback task = pendingHandlerCallbackHead; //遍历添加通道处理器回调任务列表,执行任务 while (task != null) { task.execute(); task = task.next; } }
再来看添加通道处理器回调任务定义:
//PendingHandlerCallback private abstract static class PendingHandlerCallback implements Runnable { final AbstractChannelHandlerContext ctx; PendingHandlerCallback next; PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; } abstract void execute(); }
//PendingHandlerAddedTask 处理器添加回调任务 private final class PendingHandlerAddedTask extends PendingHandlerCallback { PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); } @Override public void run() { callHandlerAdded0(ctx); } //在通道注册到选择器时,调用 @Override void execute() { //获取通道上下文的事件执行器 EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { //如果当前执行器在事务循环中直接委托为callHandlerAdded0 callHandlerAdded0(ctx); } else { try { //否则执行器,直接执行处理器添加回调任务 executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } //异常则移除通道处理器上下文 remove0(ctx); ctx.setRemoved();//标志为移除 } } } }
//EventExecutor /** * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument 当前线程是否在事务循环中 */ boolean inEventLoop(); /** * Return {@code true} if the given {@link Thread} is executed in the event loop, * {@code false} otherwise. */ boolean inEventLoop(Thread thread);
来看添加通道上下文方法callHandlerAdded0
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { //触发上下文关联通道处理器的handlerAdded事件 ctx.handler().handlerAdded(ctx); //更新上下文状态为添加完毕 ctx.setAddComplete(); } catch (Throwable t) { //异常发生移除通道上下文 boolean removed = false; try { //移除上下文 remove0(ctx); try { //触发通道处理器的handlerRemoved事件 ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved();//标记为已移除 } removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } } if (removed) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t)); } else { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t)); } } }
再来看移除上下文方法
//从管道线中移除指定通道处理器上下文 private static void remove0(AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; }
从上来看处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。
HeadContext的通道注册方法,主要是执行通道处理器添加回调任务链中的任务。
再来看HeadContext的通道上下文反注册方法
@Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered();//触发上下文fireChannelUnregistered // Remove all handlers sequentially if channel is closed and unregistered. //如果通道关闭,并处于反注册状态,移除所有通道处理器 if (!channel.isOpen()) { //移除处理器 destroy(); } }
/** * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger * handlerRemoved(). * 从管道尾部到头部移除所有处理器,并触发处理器handlerRemoved事件 * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)}) * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that * the handlers are removed after all events are handled. *注意,先从当前管道线的位置到尾部,移除处理器(#destroyUp),等到达尾部时,再从头部开始移除(#destroyDown), 以保证在处理器移除前,所有的事件已经处理。 * See: https://github.com/netty/netty/issues/3156 */ private synchronized void destroy() { //完成实际的移除工作 destroyUp(head.next, false); } //从当前位置到尾部 private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) { final Thread currentThread = Thread.currentThread(); final AbstractChannelHandlerContext tail = this.tail; for (;;) { if (ctx == tail) { //如果当前通道上下文为尾部,则转向头部 destroyDown(currentThread, tail.prev, inEventLoop); break; } final EventExecutor executor = ctx.executor(); //如果执行器或消息任务线程不在当前事件循环中,创建一个线程执行移除工作 if (!inEventLoop && !executor.inEventLoop(currentThread)) { final AbstractChannelHandlerContext finalCtx = ctx; executor.execute(new Runnable() { @Override public void run() { destroyUp(finalCtx, true); } }); break; } ctx = ctx.next; inEventLoop = false; } } //到达尾部,从头部开始移除处理器 private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) { // We have reached at tail; now traverse backwards. //从管道头部开始移除通道处理器 final AbstractChannelHandlerContext head = this.head; for (;;) { if (ctx == head) { break; } final EventExecutor executor = ctx.executor(); //如果执行器或消息任务线程在当前事件循环中,则直接移除 if (inEventLoop || executor.inEventLoop(currentThread)) { synchronized (this) { //从管道移除上下文 remove0(ctx); } //触发处理移除事件,并更新上下为状态为已移除 callHandlerRemoved0(ctx); } else { final AbstractChannelHandlerContext finalCtx = ctx; //否则,开启一个新的线程,执行移除工作状态 executor.execute(new Runnable() { @Override public void run() { destroyDown(Thread.currentThread(), finalCtx, true); } }); break; } ctx = ctx.prev; inEventLoop = false; } } //触发处理移除事件,并更新上下为状态为已移除 private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) { // Notify the complete removal. try { try { //触发上下文关联的处理器handlerRemoved事件 ctx.handler().handlerRemoved(ctx); } finally { //更新上下文状态为已移除 ctx.setRemoved(); } } catch (Throwable t) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t)); } }
从上面来看channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除状态。
在头部上下文的通道注册方法分析中我们有说过通道处理器添加回调任务PendingHandlerAddedTask,
现在来看一下另外一个回调任务,通道处理器移除回调任务
private final class PendingHandlerRemovedTask extends PendingHandlerCallback { PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) { super(ctx); } @Override public void run() { //触发处理移除事件,并更新上下为状态为已移除状态 callHandlerRemoved0(ctx); } @Override void execute() { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerRemoved0(ctx); } else { //如果移除线程不在当前事务循环中,移除任务交给上下文关联的事件执行器 try { executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," + " removing handler {}.", executor, ctx.name(), e); } // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved(). ctx.setRemoved(); } } } }
//TailContext
// A special catch-all handler that handles both bytes and messages. final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { //构造尾部上下文 TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } @Override//获取通道处理器 public ChannelHandler handler() { return this; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // This may not be a configuration error and so don't log anything. // The event may be superfluous for the current pipeline configuration. //释放触发用户事件对象 ReferenceCountUtil.release(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } }
尾部上下文没有什么好说的,来看通道读操作和异常操作:
//异常操作 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); }
/** * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. 当异常没有被用户的handler处理,到达管道的尾部时,调用 */ protected void onUnhandledInboundException(Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception.", cause); } finally { ReferenceCountUtil.release(cause); } }
//读操作
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }
/** * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. 当消息没有被用户的handler处理,到达管道的尾部时,调用此方法释放消息对象 */ protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
从上面来看Channel的管道线的通道处理器上下文链的尾部是一个傀儡,不同于尾部上下文,头部上下文,在处理inbound事件时,触发通道处理器上下文相关的方法,在处理outbound事件时,委托给管道线关联的Channle的内部unsafe
3-8这部分,我们不再说,用到时再说:
3.
//从名字来看,是命名缓存,具体作用,遇到再探究 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override protected Map<Class<?>, String> initialValue() throws Exception { return new WeakHashMap<Class<?>, String>(); } };
4.
final AbstractChannelHandlerContext head;//管道线中通道处理器上下文的头部 final AbstractChannelHandlerContext tail;//管道线中通道处理器上下文的尾部
5.
private final Channel channel;//管道线所属Channel private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled();
6.
private Map<EventExecutorGroup, EventExecutor> childExecutors; private MessageSizeEstimator.Handle estimatorHandle; private boolean firstRegistration = true;//是否第一次注册通道处理器上下文
7.
/** * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}. *pendingHandlerCallbackHead为处理添加通道处理器到管道任务链表的头部. * We only keep the head because it is expected that the list is used infrequently and its size is small. * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management * complexity. 因为链表不经常用,同时size较小,所以我们仅保持头部.全迭代插入是节省内存和尾部管理的一个折衷。 */ private PendingHandlerCallback pendingHandlerCallbackHead;
8.
/** * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never * change. 一旦通道注册到通道选择器,设置为true,将不会在改变 */ private boolean registered;
来看管道的构造:
protected DefaultChannelPipeline(Channel channel) { //检查管道关联的通道是否为null this.channel = ObjectUtil.checkNotNull(channel, "channel"); //新建一个通道完成任务 succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); //初始化头部和尾部上下文 tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
从上来看管道构造,主要是检查管道通道是否为空,初始化管道上下文链的头部与尾部上下文。
总结:
netty的log实例,实际是从默认log工厂获取,默认log工厂顺序为SLF4J,LOG4j,JDKLog。
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,通道上下文描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。上下文链表的头部为HeadContext,尾部为TailContext。
头部上下文HeadContext的outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理相关事件,但读操作实际是通过Channel读取。HeadContext的通道注册方法channelRegistered,主要是执行通道处理器添加回调任务链中的任务。处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除。
Channel的管道线的通道处理器上下文链的尾部TailContext是一个傀儡,不同于尾部上下文,头部上下文,在处理inbound事件时,触发通道处理器上下文相关的方法,在处理outbound事件时,委托给管道线关联的Channle的内部unsafe。
默认Channel管道实现内部有两个回调任务PendingHandlerAdded/RemovedTask,一个是添加通道处理器上下文回调任务,一个是移除通道上下文回调任务,主要是触发上下文关联通道处理器的处理器添加移除事件,并更新相应的上下文状态为已添加或已移除。
管道构造,主要是检查管道通道是否为空,初始化管道上下文链的头部与尾部上下文。
netty通道处理器上下文可以说,是Mina中Hanlder和过滤器的集合,整合两者功能,管道线有点Mina过滤链的意味,HeadContext相当于Mina过滤链的头部过滤器,TailContext相当于Mina过滤链的尾部过滤器
附:
//SucceededChannelFuture
package io.netty.channel; import io.netty.util.concurrent.EventExecutor; /** * The {@link CompleteChannelFuture} which is succeeded already. It is * recommended to use {@link Channel#newSucceededFuture()} instead of * calling the constructor of this future. 已经成功完成的任务 */ final class SucceededChannelFuture extends CompleteChannelFuture { /** * Creates a new instance. * * @param channel the {@link Channel} associated with this future */ SucceededChannelFuture(Channel channel, EventExecutor executor) { super(channel, executor); } @Override public Throwable cause() { return null; } @Override public boolean isSuccess() { return true; } }
//CompleteChannelFuture
package io.netty.channel; import io.netty.util.concurrent.CompleteFuture; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; /** * A skeletal {@link ChannelFuture} implementation which represents a * {@link ChannelFuture} which has been completed already. */ abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture { private final Channel channel;//关联通道 /** * Creates a new instance. * * @param channel the {@link Channel} associated with this future */ protected CompleteChannelFuture(Channel channel, EventExecutor executor) { super(executor); if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; } @Override protected EventExecutor executor() { EventExecutor e = super.executor(); if (e == null) { return channel().eventLoop(); } else { return e; } } @Override public ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; } @Override public ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { super.addListeners(listeners); return this; } @Override public ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.removeListener(listener); return this; } @Override public ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { super.removeListeners(listeners); return this; } @Override public ChannelFuture syncUninterruptibly() { return this; } @Override public ChannelFuture sync() throws InterruptedException { return this; } @Override public ChannelFuture await() throws InterruptedException { return this; } @Override public ChannelFuture awaitUninterruptibly() { return this; } @Override public Channel channel() { return channel; } @Override public Void getNow() { return null; } @Override public boolean isVoid() { return false; } }
//CompleteFuture
package io.netty.util.concurrent; import java.util.concurrent.TimeUnit; /** * A skeletal {@link Future} implementation which represents a {@link Future} which has been completed already. 表示一个已经完成的任务 */ public abstract class CompleteFuture<V> extends AbstractFuture<V> { private final EventExecutor executor;//内部事件执行器 /** * Creates a new instance. * * @param executor the {@link EventExecutor} associated with this future */ protected CompleteFuture(EventExecutor executor) { this.executor = executor; } /** * Return the {@link EventExecutor} which is used by this {@link CompleteFuture}. */ protected EventExecutor executor() { return executor; } @Override public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { if (listener == null) { throw new NullPointerException("listener"); } DefaultPromise.notifyListener(executor(), this, listener); return this; } @Override public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { if (listeners == null) { throw new NullPointerException("listeners"); } for (GenericFutureListener<? extends Future<? super V>> l: listeners) { if (l == null) { break; } DefaultPromise.notifyListener(executor(), this, l); } return this; } @Override public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) { // NOOP return this; } @Override public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { // NOOP return this; } @Override public Future<V> await() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return this; } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return true; } @Override public Future<V> sync() throws InterruptedException { return this; } @Override public Future<V> syncUninterruptibly() { return this; } @Override public boolean await(long timeoutMillis) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return true; } @Override public Future<V> awaitUninterruptibly() { return this; } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { return true; } @Override public boolean awaitUninterruptibly(long timeoutMillis) { return true; } @Override public boolean isDone() { return true; } @Override public boolean isCancellable() { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } }
//VoidChannelPromise
package io.netty.channel; import io.netty.util.concurrent.AbstractFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.concurrent.TimeUnit; final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise { private final Channel channel;//关联通道 private final boolean fireException;//异常发生时,是否触发异常 /** * Creates a new instance. * * @param channel the {@link Channel} associated with this future */ VoidChannelPromise(Channel channel, boolean fireException) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; this.fireException = fireException; } @Override public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { fail(); return this; } @Override public VoidChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { fail(); return this; } @Override public VoidChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) { // NOOP return this; } @Override public VoidChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { // NOOP return this; } @Override public VoidChannelPromise await() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } return this; } @Override public boolean await(long timeout, TimeUnit unit) { fail(); return false; } @Override public boolean await(long timeoutMillis) { fail(); return false; } @Override public VoidChannelPromise awaitUninterruptibly() { fail(); return this; } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { fail(); return false; } @Override public boolean awaitUninterruptibly(long timeoutMillis) { fail(); return false; } @Override public Channel channel() { return channel; } @Override public boolean isDone() { return false; } @Override public boolean isSuccess() { return false; } @Override public boolean setUncancellable() { return true; } @Override public boolean isCancellable() { return false; } @Override public boolean isCancelled() { return false; } @Override public Throwable cause() { return null; } @Override public VoidChannelPromise sync() { fail(); return this; } @Override public VoidChannelPromise syncUninterruptibly() { fail(); return this; } @Override public VoidChannelPromise setFailure(Throwable cause) { fireException(cause); return this; } @Override public VoidChannelPromise setSuccess() { return this; } @Override public boolean tryFailure(Throwable cause) { fireException(cause); return false; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean trySuccess() { return false; } private static void fail() { throw new IllegalStateException("void future"); } @Override public VoidChannelPromise setSuccess(Void result) { return this; } @Override public boolean trySuccess(Void result) { return false; } @Override public Void getNow() { return null; } @Override public ChannelPromise unvoid() { ChannelPromise promise = new DefaultChannelPromise(channel); if (fireException) { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { fireException(future.cause()); } } }); } return promise; } @Override public boolean isVoid() { return true; } private void fireException(Throwable cause) { // Only fire the exception if the channel is open and registered // if not the pipeline is not setup and so it would hit the tail // of the pipeline. // See https://github.com/netty/netty/issues/1517 if (fireException && channel.isRegistered()) { channel.pipeline().fireExceptionCaught(cause); } } }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1324netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2063netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2453netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1320netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1319netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1599netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1848netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1400netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2839netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2186netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2041netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1082netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1879netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1221netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1205netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 959netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1313netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2193netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1084netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1859netty 管道线定义-ChannelPipeline:htt ...
相关推荐
NettyChannel是一个基于Netty框架的测试程序,它包含了客户端(client)和服务器端(server)的实现,目的是为了深入理解Netty中的Channel概念及其工作流程。Netty是一个高性能、异步事件驱动的网络应用程序框架,...
Netty对Channel总结的思维导图,包括功能梳理,源码分析。
Spring Boot的初始化器会自动查找`ServerBootstrap`实例,因此我们可以通过扩展`AbstractServerBootstrapFactory`来自定义服务器初始化: ```java import io.netty.bootstrap.ServerBootstrap; import io.netty....
Netty支持多种序列化方式,其中包括使用Java原生的序列化机制。 Java序列化是通过实现`Serializable`接口来完成的。当一个类实现这个接口时,它的实例就可以被序列化。在Netty中,我们通常会创建一个专门用于序列化...
初始化过程中,会设置Channel的一些基本属性,如配置参数、通道选项和通道属性。这些参数可能包括TCP相关的配置,比如缓冲区大小、超时时间等,它们通常是通过`NioServerSocketChannelConfig`这样的配置类进行管理的...
SpringBoot是Spring框架的一个轻量级容器,它简化了Spring应用程序的初始设置和配置。SpringBoot 2.0带来了许多改进,包括性能提升、更好的错误处理、对Java 9的支持以及对WebFlux的支持,后者是Spring提供的反应式...
- **模块化架构**: Netty采用了高度模块化的设计,允许开发者根据需求灵活选择所需组件,减少了不必要的依赖负担。 - **高度可定制**: Netty允许开发者自定义处理网络事件的逻辑,比如数据解码/编码、异常处理等,...
基于Netty实现的MQTT客户端_netty-mqtt-client
Bootstrap是Netty中的一个启动类,用于配置并初始化Netty服务。它有两个主要实现:`Bootstrap`和`ServerBootstrap`,分别用于客户端和服务端的初始化。 #### 类AbstractBootstrap 这是一个抽象基类,为`Bootstrap`...
在 Netty 中,"Pipeline"(管道)是其核心设计之一,它提供了一种处理进/出站数据的有效方式。下面我们将深入探讨 Netty 的管道机制以及如何进行测试。 首先,Netty 管道是数据传输路径上的处理链,它由多个处理器...
Netty的事件驱动模型基于“事件循环”(Event Loop)和“管道”(Channel Pipeline)的概念。事件循环是Netty中的核心组件,负责处理I/O事件,并将这些事件分发到对应的处理器。管道则是一系列处理器的链,每个...
在Netty中,`ChannelHandlerContext`是处理I/O事件的关键接口,它提供了发送和接收数据的方法,以及对管道中的其他处理器进行操作的能力。在进行Java序列化时,我们通常会使用`ctx.writeAndFlush()`方法将序列化后...
Channel是Netty中的核心组件,代表一个网络连接,而ChannelHandler则是处理I/O事件和数据传输的处理器。 书中会详细讲解Netty的管道(Pipeline)机制,这是Netty处理网络请求的独特方式。管道由一系列的...
1. **工厂模式(Factory Pattern)**:Netty的ChannelInitializer就是一个典型的工厂模式,它用于初始化ChannelPipeline,根据需求创建并添加相应的处理器。 2. **单例模式(Singleton Pattern)**:Netty的...
BootStrap则负责Netty应用的启动和初始化。 Netty的高性能主要得益于其零拷贝技术,池化技术以及对滑动窗口的优化。零拷贝技术可以减少不必要的内存拷贝操作,池化技术可以复用池中的Channel、Buffer等资源,减少...
3. **Netty 服务器启动**: 使用 Spring 的 ApplicationListener 或 CommandLineRunner 接口,在应用启动时触发 Netty 服务器的初始化和启动。这样可以确保 Netty 服务器在 Spring 容器启动后自动运行。 4. **...
Netty的Channel和Pipeline是两个核心组件。Channel是网络连接的抽象,它可以是TCP、UDP或者任何其他类型的I/O连接。Pipeline则是一个处理链,每个连接都有一个自己的Pipeline,其中包含多个处理器(ChannelHandler)...
- 创建一个 ChannelInitializer,用于初始化新建的 Channel,添加 ChannelHandler 到 ChannelPipeline。 - 绑定监听端口,启动服务端。 2. **客户端:** - 创建一个客户端 Bootstrap,设置 EventLoopGroup。 - ...
channelRead0() 方法用于处理接收到的客户端消息,而channelActive() 方法在客户端连接建立后被调用,适合在这个时候发送欢迎消息或进行其他初始化操作。 在实际的代码实现中,我们可能还需要定义自定义的Message...
2. 添加自定义的 ChannelInitializer,用于在 Channel 初始化时设置 ChannelPipeline 中的 Handler。 3. 绑定客户端的 Handler,通常包含编码(将业务对象转换为 ByteBuf)和解码(将 ByteBuf 转换回业务对象)逻辑...