- 浏览: 981055 次
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
打算使用xmemcache作为memcache的客户端,由于x ...
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
netty 默认Channel管道线-添加通道处理器:http://donald-draper.iteye.com/blog/2388726
netty 默认Channel管道线-通道处理器移除与替换:http://donald-draper.iteye.com/blog/2388793
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
netty 默认Channel管道线-添加通道处理器:http://donald-draper.iteye.com/blog/2388726
netty 默认Channel管道线-通道处理器移除与替换:http://donald-draper.iteye.com/blog/2388793
public class DefaultChannelPipeline implements ChannelPipeline
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>>
@Override public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; }
//触发上下文的invokeChannelRegistered方法 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor();//获取上下文事件执行器 //如果事件执行器在当前事务循环,则直接调用上下文invokeChannelRegistered方法 if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { //否则创建一个线程执行上下文invokeChannelRegistered方法,并有有上下文事务执行器运行 executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } //触发通道channelRegistered事件 private void invokeChannelRegistered() { //如果通道处理器已添加到管道 if (invokeHandler()) { try { //触发通道处理器的channelRegistered事件 ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { //转发事件消息 fireChannelRegistered(); } }
//判断通道处理器已添加到管道 /** * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called * yet. If not return {@code false} and if called or could not detect return {@code true}. *确保通道处理器的handlerAdded方法已触发。 * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event. * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}. 如果失败,则不会调用通道处理器的相关事件处理方法,而是转发事件。这种情况主要针对通道处理器已经添加到管道, 但通道处理器handlerAdded方法没有被调用的情况,即通道处理器关联的上下文已经添加管道上下文链,但并没有更新上下文状态 和触发通道处理器的handlerAdded方法。 */ private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); }
//转发事件消息 Override public ChannelHandlerContext fireChannelRegistered() { //转发事件给上下文所属管道的下一个上下文 invokeChannelRegistered(findContextInbound()); return this; }
//获取上下文所属管道的下一个Inbound上下文 private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
@Override public final ChannelPipeline fireChannelUnregistered() { AbstractChannelHandlerContext.invokeChannelUnregistered(head); return this; }
static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelUnregistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelUnregistered(); } }); } } private void invokeChannelUnregistered() { if (invokeHandler()) { try { // ((ChannelInboundHandler) handler()).channelUnregistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelUnregistered(); } } @Override public ChannelHandlerContext fireChannelUnregistered() { invokeChannelUnregistered(findContextInbound()); return this; }
//通道激活 @Override public final ChannelPipeline fireChannelActive() { AbstractChannelHandlerContext.invokeChannelActive(head); return this; }
static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } } private void invokeChannelActive() { if (invokeHandler()) { try { //触发处理器channelActive事件 ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } } @Override public ChannelHandlerContext fireChannelActive() { invokeChannelActive(findContextInbound()); return this; }
//通道断开 @Override public final ChannelPipeline fireChannelInactive() { AbstractChannelHandlerContext.invokeChannelInactive(head); return this; }
static void invokeChannelInactive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelInactive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelInactive(); } }); } } private void invokeChannelInactive() { if (invokeHandler()) { try { //触发处理器channelInactive事件 ((ChannelInboundHandler) handler()).channelInactive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelInactive(); } } @Override public ChannelHandlerContext fireChannelInactive() { invokeChannelInactive(findContextInbound()); return this; }
//IO异常 @Override public final ChannelPipeline fireExceptionCaught(Throwable cause) { AbstractChannelHandlerContext.invokeExceptionCaught(head, cause); return this; }
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); } else { try { executor.execute(new Runnable() { @Override public void run() { next.invokeExceptionCaught(cause); } }); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event.", t); logger.warn("The exceptionCaught() event that was failed to submit was:", cause); } } } } private void invokeExceptionCaught(final Throwable cause) { if (invokeHandler()) { try { //触发处理器exceptionCaught事件 handler().exceptionCaught(this, cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug( "An exception {}" + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", error, cause); } } } else { fireExceptionCaught(cause); } } @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { invokeExceptionCaught(next, cause); return this; }
//触发用户事件 @Override public final ChannelPipeline fireUserEventTriggered(Object event) { AbstractChannelHandlerContext.invokeUserEventTriggered(head, event); return this; }
static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) { ObjectUtil.checkNotNull(event, "event"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeUserEventTriggered(event); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeUserEventTriggered(event); } }); } } private void invokeUserEventTriggered(Object event) { if (invokeHandler()) { try { //触发处理器userEventTriggered事件 ((ChannelInboundHandler) handler()).userEventTriggered(this, event); } catch (Throwable t) { notifyHandlerException(t); } } else { fireUserEventTriggered(event); } } @Override public ChannelHandlerContext fireUserEventTriggered(final Object event) { invokeUserEventTriggered(findContextInbound(), event); return this; }
//读数据 @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { //记录消息引用对象,用于内存泄漏时,调试 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //触发处理器channelRead事件 ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
//记录消息引用对象,用于内存泄漏时,调试 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
final Object touch(Object msg, AbstractChannelHandlerContext next) { //如果内存泄漏探测开启,则记录消息引用对象,否则直至返回消息对象 return touch ? ReferenceCountUtil.touch(msg, next) : msg; }
/** * Tries to call {@link ReferenceCounted#touch(Object)} if the specified message implements * {@link ReferenceCounted}. If the specified message doesn't implement {@link ReferenceCounted}, * this method does nothing. */ @SuppressWarnings("unchecked") public static <T> T touch(T msg, Object hint) { if (msg instanceof ReferenceCounted) { return (T) ((ReferenceCounted) msg).touch(hint); } return msg; }
/** * Records the current access location of this object with an additional arbitrary information for debugging * purposes. If this object is determined to be leaked, the information recorded by this operation will be * provided to you via {@link ResourceLeakDetector}. */ ReferenceCounted touch(Object hint);
//读数据完成 @Override public final ChannelPipeline fireChannelReadComplete() { AbstractChannelHandlerContext.invokeChannelReadComplete(head); return this; }
static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelReadComplete(); } else { Runnable task = next.invokeChannelReadCompleteTask; if (task == null) { next.invokeChannelReadCompleteTask = task = new Runnable() { @Override public void run() { next.invokeChannelReadComplete(); } }; } executor.execute(task); } } private void invokeChannelReadComplete() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelReadComplete(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelReadComplete(); } } @Override public ChannelHandlerContext fireChannelReadComplete() { invokeChannelReadComplete(findContextInbound()); return this; }
//写状态改变 @Override public final ChannelPipeline fireChannelWritabilityChanged() { AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head); return this; }
static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelWritabilityChanged(); } else { Runnable task = next.invokeChannelWritableStateChangedTask; if (task == null) { next.invokeChannelWritableStateChangedTask = task = new Runnable() { @Override public void run() { next.invokeChannelWritabilityChanged(); } }; } executor.execute(task); } } private void invokeChannelWritabilityChanged() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelWritabilityChanged(); } } @Override public ChannelHandlerContext fireChannelWritabilityChanged() { invokeChannelWritabilityChanged(findContextInbound()); return this; }
//绑定socket的地址 @Override public final ChannelFuture bind(SocketAddress localAddress) { //从管道尾部开始 return tail.bind(localAddress); }
@Override public ChannelFuture bind(SocketAddress localAddress) { return bind(localAddress, newPromise()); }
/** * The default {@link ChannelPromise} implementation. It is recommended to use {@link Channel#newPromise()} to create * a new {@link ChannelPromise} rather than calling the constructor explicitly. */ public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; private long checkpoint; ... /** * Creates a new instance. * * @param channel * the {@link Channel} associated with this future */ public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = channel; } }
//绑定socket地址 @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { //非可写通道任务,直接返回 // cancelled return promise; } //从当前上下文开始(尾部),向前找到第一个Outbound上下文,处理地址绑定事件 final AbstractChannelHandlerContext next = findContextOutbound(); //获取上下为事件执行器 EventExecutor executor = next.executor(); if (executor.inEventLoop()) { //如果事件执行器线程在事件循环中,则直接委托给invokeBind next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } //触发通道处理器地址绑定事件 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) {//如果通道处理器已经添加到管道中 try { //触发Outbound通道处理器的bind事件方法 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { //否则传递绑定事件给管道中的下一个Outbound上下文 bind(localAddress, promise); } } private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } }
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
//连接指定远端地址 @Override public final ChannelFuture connect(SocketAddress remoteAddress) { return tail.connect(remoteAddress); }
@Override public ChannelFuture connect(SocketAddress remoteAddress) { return connect(remoteAddress, newPromise()); } @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return connect(remoteAddress, null, promise); } @Override public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; } private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { connect(remoteAddress, localAddress, promise); } }
//绑定本地地址,连接远端地址 @Override public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return tail.connect(remoteAddress, localAddress); }
@Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return connect(remoteAddress, localAddress, newPromise()); }
//断开通道连接 @Override public final ChannelFuture disconnect() { return tail.disconnect(); }
@Override public ChannelFuture disconnect() { return disconnect(newPromise()); } @Override public ChannelFuture disconnect(final ChannelPromise promise) { if (isNotValidPromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // Translate disconnect to close if the channel has no notion of disconnect-reconnect. // So far, UDP/IP is the only transport that has such behavior. if (!channel().metadata().hasDisconnect()) { //如果还没有连接,则关闭通道 next.invokeClose(promise); } else { //否则断开通道 next.invokeDisconnect(promise); } } else { safeExecute(executor, new Runnable() { @Override public void run() { if (!channel().metadata().hasDisconnect()) { next.invokeClose(promise); } else { next.invokeDisconnect(promise); } } }, promise, null); } return promise; } //触发通道处理器关闭事件 private void invokeClose(ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).close(this, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { close(promise); } } //触发通道处理器断开事件 private void invokeDisconnect(ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).disconnect(this, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { disconnect(promise); } }
//关闭通道 @Override public final ChannelFuture close() { return tail.close(); }
@Override public ChannelFuture close() { return close(newPromise()); } @Override public ChannelFuture close(final ChannelPromise promise) { if (isNotValidPromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeClose(promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeClose(promise); } }, promise, null); } return promise; } private void invokeClose(ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).close(this, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { close(promise); } }
//反注册 @Override public final ChannelFuture deregister() { return tail.deregister(); }
@Override public ChannelFuture deregister() { return deregister(newPromise()); } @Override public ChannelFuture deregister(final ChannelPromise promise) { if (isNotValidPromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeDeregister(promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeDeregister(promise); } }, promise, null); } return promise; } private void invokeDeregister(ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).deregister(this, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { deregister(promise); } }
//刷新写请求数据 @Override public final ChannelPipeline flush() { tail.flush(); return this; }
@Override public ChannelHandlerContext flush() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { //触发通道处理器刷新事件操作 next.invokeFlush(); } else { //获取当前上下文的刷新任务线程 Runnable task = next.invokeFlushTask; if (task == null) { next.invokeFlushTask = task = new Runnable() { @Override public void run() { next.invokeFlush(); } }; } safeExecute(executor, task, channel().voidPromise(), null); } return this; } private void invokeFlush() { if (invokeHandler()) { invokeFlush0(); } else { flush(); } } private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); } }
//读操作 @Override public final ChannelPipeline read() { tail.read(); return this; }
@Override public ChannelHandlerContext read() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { //获取上下文读任务线程 Runnable task = next.invokeReadTask; if (task == null) { next.invokeReadTask = task = new Runnable() { @Override public void run() { next.invokeRead(); } }; } executor.execute(task); } return this; } private void invokeRead() { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); } } else { read(); } }
//写消息 @Override public final ChannelFuture write(Object msg) { return tail.write(msg); }
@Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } } catch (RuntimeException e) { //出现异常,释放消息对象 ReferenceCountUtil.release(msg); throw e; } write(msg, false, promise); return promise; } private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
//写消息,并发送 @Override public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
@Override public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } write(msg, true, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { //如果刷新,则调用invokeWriteAndFlush next.invokeWriteAndFlush(m, promise); } else { //否则调用invokeWrite next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { //创建写刷新任务线程 task = WriteAndFlushTask.newInstance(next, m, promise); } else { //创建写任务线程 task = WriteTask.newInstance(next, m, promise); } //执行任务线程 safeExecute(executor, task, promise, m); } } private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
@Override public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } @Override public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); } @Override public final ChannelFuture connect( SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { return tail.connect(remoteAddress, localAddress, promise); } @Override public final ChannelFuture disconnect(ChannelPromise promise) { return tail.disconnect(promise); } @Override public final ChannelFuture close(ChannelPromise promise) { return tail.close(promise); } @Override public final ChannelFuture deregister(final ChannelPromise promise) { return tail.deregister(promise); } @Override public final ChannelFuture write(Object msg, ChannelPromise promise) { return tail.write(msg, promise); } @Override public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { return tail.writeAndFlush(msg, promise); }
//创建通道异步任务结果 @Override public final ChannelPromise newPromise() { return new DefaultChannelPromise(channel); } @Override public final ChannelProgressivePromise newProgressivePromise() { return new DefaultChannelProgressivePromise(channel); } @Override public final ChannelFuture newSucceededFuture() { return succeededFuture; } @Override public final ChannelFuture newFailedFuture(Throwable cause) { return new FailedChannelFuture(channel, null, cause); } @Override public final ChannelPromise voidPromise() { return voidPromise; }
//获取通道消息大小估算Handler final MessageSizeEstimator.Handle estimatorHandle() { if (estimatorHandle == null) { estimatorHandle = channel.config().getMessageSizeEstimator().newHandle(); } return estimatorHandle; }
/** * Responsible to estimate size of a message. The size represent how much memory the message will ca. reserve in * memory. 负责估算消息的大小。大小表示消息需要多少内存,以便预留内存 */ public interface MessageSizeEstimator { /** * Creates a new handle. The handle provides the actual operations. */ Handle newHandle(); interface Handle { /** * Calculate the size of the given message. 计算给定消息的大小 * * @param msg The message for which the size should be calculated * @return size The size in bytes. The returned size must be >= 0 */ int size(Object msg); } }
//获取事件执行器组的事件执行器 private EventExecutor childExecutor(EventExecutorGroup group) { if (group == null) { return null; } //获取通道的SINGLE_EVENTEXECUTOR_PER_GROUP的配置项 //是否每个事件执行器组拥有一个执行器 Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP); if (pinEventExecutor != null && !pinEventExecutor) { //如果可以同用事件执行器,则返回事件执行器组的事件执行器 return group.next(); } //否则获取管道的子事件执行器 Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors; if (childExecutors == null) { //如果子执行器为空,则创建四个事件执行器组 // Use size of 4 as most people only use one extra EventExecutor. childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4); } // Pin one of the child executors once and remember it so that the same child executor // is used to fire events for the same channel. //获取分组的事件执行器 EventExecutor childExecutor = childExecutors.get(group); if (childExecutor == null) { //如果事件执行器组的事件执行器为空,则获取下一个事件执行器 childExecutor = group.next(); childExecutors.put(group, childExecutor); } return childExecutor; }
/** * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use * via its {@link #next()} method. Besides this, it is also responsible for handling their * life-cycle and allows shutting them down in a global fashion. * */ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
/** * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes * with some handy methods to see if a {@link Thread} is executed in a event loop. * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic * way to access methods. * */ public interface EventExecutor extends EventExecutorGroup {
/** * This class implements the <tt>Map</tt> interface with a hash table, using * reference-equality in place of object-equality when comparing keys (and * values). In other words, in an <tt>IdentityHashMap</tt>, two keys * <tt>k1</tt> and <tt>k2</tt> are considered equal if and only if * <tt>(k1==k2)</tt>. (In normal <tt>Map</tt> implementations (like * <tt>HashMap</tt>) two keys <tt>k1</tt> and <tt>k2</tt> are considered equal * if and only if <tt>(k1==null ? k2==null : k1.equals(k2))</tt>.) ... * * <p><b>This class is <i>not</i> a general-purpose <tt>Map</tt> * implementation! While this class implements the <tt>Map</tt> interface, it * intentionally violates <tt>Map's</tt> general contract, which mandates the * use of the <tt>equals</tt> method when comparing objects. This class is * designed for use only in the rare cases wherein reference-equality * semantics are required.</b> * * <p>A typical use of this class is <i>topology-preserving object graph * transformations</i>, such as serialization or deep-copying. To perform such * a transformation, a program must maintain a "node table" that keeps track * of all the object references that have already been processed. The node * table must not equate distinct objects even if they happen to be equal. * Another typical use of this class is to maintain <i>proxy objects</i>. For * example, a debugging facility might wish to maintain a proxy object for * each object in the program being debugged. * * @see System#identityHashCode(Object) * @see Object#hashCode() * @see Collection * @see Map * @see HashMap * @see TreeMap * @author Doug Lea and Josh Bloch * @since 1.4 IdentityHashMap与普通Map的区别在于Key的相等的条件不同,一般判断key是否相等 为if(k1==null ? k2==null : k1.equals(k2))则相等,而IdentityHashMap为k1==k2 相同,则任务是相等,可以用于存储 */ public class IdentityHashMap<K,V> extends AbstractMap<K,V>
//获取关联通道 @Override public final Channel channel() { return channel; }
//获取通道处理对应的名称 private String generateName(ChannelHandler handler) { //从通道处理器命名缓存,获取当前线程的通道处理器命名缓存 Map<Class<?>, String> cache = nameCaches.get(); Class<?> handlerType = handler.getClass();//获取处理器类型 String name = cache.get(handlerType);//从缓冲获取通道处理器对应的名称 if (name == null) { //如果名称为空,则生成处理器名称,并放入缓存 name = generateName0(handlerType); cache.put(handlerType, name); } // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid // any name conflicts. Note that we don't cache the names generated here. //简单是否存在name对应的上下文 if (context0(name) != null) { //存在,则重新命名 String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'. for (int i = 1;; i ++) { String newName = baseName + i; if (context0(newName) == null) { name = newName; break; } } } return name; }
abstract static class AbstractWriteTask implements Runnable { //在提交时是否估算任务size private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); //任务负载 // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment private static final int WRITE_TASK_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); private final Recycler.Handle<AbstractWriteTask> handle; private AbstractChannelHandlerContext ctx; private Object msg; private ChannelPromise promise; private int size; @SuppressWarnings("unchecked") private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) { this.handle = (Recycler.Handle<AbstractWriteTask>) handle; } //初始化写任务 protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { task.ctx = ctx; task.msg = msg; task.promise = promise; if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { //获取Outbound buf,从上下文关联通道的unsafe获取 ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD; buffer.incrementPendingOutboundBytes(task.size); } else { task.size = 0; } } else { task.size = 0; } } @Override public final void run() { try { ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) { buffer.decrementPendingOutboundBytes(size); } //写消息 write(ctx, msg, promise); } finally { // Set to null so the GC can collect them directly ctx = null; msg = null; promise = null; handle.recycle(this); } } //委托给上下文 protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ctx.invokeWrite(msg, promise); } } static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() { @Override protected WriteTask newObject(Handle<WriteTask> handle) { return new WriteTask(handle); } }; //创建写任务 private static WriteTask newInstance( AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { WriteTask task = RECYCLER.get(); //初始化写任务 init(task, ctx, msg, promise); return task; } private WriteTask(Recycler.Handle<WriteTask> handle) { super(handle); } } static final class WriteAndFlushTask extends AbstractWriteTask { private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() { @Override protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) { return new WriteAndFlushTask(handle); } }; //创建写刷新任务 private static WriteAndFlushTask newInstance( AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { WriteAndFlushTask task = RECYCLER.get(); //初始化写任务 init(task, ctx, msg, promise); return task; } private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) { super(handle); } @Override public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { super.write(ctx, msg, promise); //调用上下文flush方法 ctx.invokeFlush(); } }
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2444netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1316netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1594netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1397netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2177netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1078netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1219netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1309netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1855netty 管道线定义-ChannelPipeline:htt ...
4. **Pipeline**:通道处理管道,是 Netty 的核心特性之一。它允许将多个处理器(Handler)串联起来,对进来的数据进行处理,每个处理器执行特定的任务。 5. **Handler**:处理器接口,用于处理网络事件和数据。有...
- ChannelHandler是Netty处理I/O事件的核心组件,包括Inbound和Outbound两种类型,分别处理入站事件(如连接、接收数据)和出站事件(如发送数据、关闭连接)。 - 通常,我们会创建自定义的处理器类,继承`...
5. **Pipeline**: 事件处理器链,每个连接都有自己的Pipeline,其中包含多个处理器(ChannelHandler),用于处理入站(Inbound)和出站(Outbound)事件。 6. **ByteBuf**: Netty的字节缓冲区,比Java的ByteBuffer...
- **Handler**:处理I/O事件或消息的组件,分为Inbound和Outbound两种,分别处理入站和出站操作。 - **EventLoop**:执行Handler的事件处理方法,Netty使用线程池实现,确保每个Channel分配一个EventLoop。 - **...
- **Handler**:处理逻辑的组件,分为Inbound Handler(处理入站事件,如接收数据)和Outbound Handler(处理出站事件,如发送数据)。 3. **Netty的工作流程** - **BossGroup** 和 **WorkerGroup**:BossGroup...
1. **Inbound 和 Outbound**:ChannelHandler 分为两种类型,Inbound Handler 处理接收的数据,Outbound Handler 处理发送的数据。这使得处理流程更加清晰和可扩展。 2. **ChannelHandlerContext**:在 ...
分为Inbound和Outbound两种类型,分别处理入站和出站事件。 6. **编码与解码**:Netty提供了各种编解码器,如LineBasedFrameDecoder用于按行拆分消息,LengthFieldBasedFrameDecoder用于根据长度字段解码消息,方便...
- Netty提供了一套完整的异常处理机制,包括ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,方便开发者捕获和处理异常。 10. **自定义协议开发**: - Netty允许用户轻松创建自定义的编解码器,以...
5. **Handler**: ChannelHandler是处理I/O事件和数据的对象,分为两种类型:Inbound和Outbound。Inbound Handler处理读取到的数据,而Outbound Handler处理写入数据、连接管理和关闭操作。 6. **NIO和Epoll**: ...
每个处理器都有进站(Inbound)和出站(Outbound)两个处理阶段,用于处理接收到的数据和发送数据的操作。 进站操作通常涉及接收连接、读取数据、解码数据等。例如,`ByteToMessageDecoder` 就是一个常见的进站...
6. **Handlers**:ChannelHandler是业务逻辑的载体,分为Inbound和Outbound两种类型,分别处理输入和输出事件。例如,解码器、编码器、异常处理器等。 7. **Future和Promise**:Netty提供了Future和Promise来处理...
Netty的处理器主要分为两种类型:入站处理器(Inbound Handler)和出站处理器(Outbound Handler)。入站处理器通常处理读取数据、连接事件等事件,出站处理器则处理写入数据、建立连接等请求。 Netty的架构包括了...
- 添加、初始化、激活、处理事件、关闭等,理解这些阶段对于正确处理事件和资源管理至关重要。 8. **编解码器**: - Netty提供了多种编解码器,如LineBasedFrameDecoder、LengthFieldBasedFrameDecoder等,用于将...
每个ChannelHandler负责处理特定类型的事件,如入站(inbound)事件(如数据接收)和出站(outbound)事件(如数据发送)。 7. **ChannelHandlerContext**:这是ChannelHandler的上下文,它定义了ChannelHandler...
- 上游和下游事件的处理方式改变,`Upstream` 改为 `Inbound`,`Downstream` 改为 `Outbound`。这使得事件处理更加清晰,简化了处理逻辑。 - `ChannelHandler` 接口的继承结构发生变化,新增了 `...
- **管道(Pipeline)**:是一个由多个处理器组成的链表结构,用于处理入站(Inbound)和出站(Outbound)的数据流。每个处理器都可以执行特定的逻辑处理,比如编解码、错误处理等。 #### 3. SEDA事件处理模式 SEDA...
Handler是Netty处理业务逻辑的关键,分为Inbound和Outbound两种。Inbound Handler处理入站事件,如连接建立、数据读取;Outbound Handler处理出站事件,如数据写入、连接关闭。用户可以根据需求自定义Handler,实现...