public interface ChannelOutboundInvoker { /** * 请求端口绑定到SocketAddress并通知ChannelFuture 操作成功或失败 * This will result in having the * {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method * called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.bind(ChannelHandlerContext, SocketAddress, ChannelPromise)-->this */ ChannelFuture bind(SocketAddress localAddress); /** * 请求连接远端地址(SocketAddress)并通知ChannelFuture 操作成功或失败 * * 如果连接超时,触发ChannelFuture 失败with ConnectTimeoutException * 如果连接拒绝,触发ChannelFuture 失败with ConnectException * This will result in having the * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)—>this */ ChannelFuture connect(SocketAddress remoteAddress); /** * 请求连接远端地址(SocketAddress)并通知ChannelFuture 操作成功或失败 * * This will result in having the * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)—>this */ ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); /** * 请求远端断开连接并通知ChannelFuture 操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture disconnect(); /** * Request to close the {@link Channel} and notify the {@link ChannelFuture} once the operation completes, * either because the operation was successful or because of * an error. * 请求连接关闭 并通知ChannelFuture 操作成功或失败 * 一旦连接关闭,不能再重复使用它 * <p> * This will result in having the * {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture close(); /** * 请求从关联的EventExecutor上取消注册(不再获取此channel上的网络事件,read,write等)并通知ChannelFuture 操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture deregister(); /** * 请求端口绑定到SocketAddress并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method * called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)-->this */ ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise); /** * Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. * * The given {@link ChannelFuture} will be notified. * 请求连接远端地址(SocketAddress)并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with * a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException} * will be used. * <p> * This will result in having the * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)-->this */ ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise); /** * 请求连接远端地址(SocketAddress)并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)-->this */ ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); /** * 请求远端断开连接并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.disconnect(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture disconnect(ChannelPromise promise); /** * Request to close the {@link Channel} and notify the {@link ChannelFuture} once the operation completes, * either because the operation was successful or because of * an error. * * After it is closed it is not possible to reuse it again. * 请求连接关闭 并通知ChannelFuture 操作成功或失败 * 一旦连接关闭,不能再重复使用它 * 同时 ChannelPromise被通知操作成功或失败 * The given {@link ChannelPromise} will be notified. * <p> * This will result in having the * {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.close(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture close(ChannelPromise promise); /** * 请求从关联的EventExecutor上取消注册(不再获取此channel上的网络事件,read,write等)并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.deregister(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture deregister(ChannelPromise promise); /** * 从channel中读取数据-->第1个InBoundBuffer: * 1.触发ChannelInboundHandler.channelRead(ChannelHandlerContext, Object)事件(读出数据的情况下) * 2.触发ChannelInboundHandler.channelReadComplete(ChannelHandlerContext) 事件 * 如果已经有挂起的读取操作,则此方法不执行任何操作。 * This will result in having the * {@link ChannelOutboundHandler#read(ChannelHandlerContext)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.read(ChannelHandlerContext)-->this */ ChannelOutboundInvoker read(); /** * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. * 这个方法调用->ChannelHandlerContext.ChannelPipeline 写入缓冲区,但不会立即发送出去,除非调用Flush方法 * */ ChannelFuture write(Object msg); /** * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. * 这个方法调用->ChannelHandlerContext.ChannelPipeline 写入缓冲区,但不会立即发送出去,除非调用Flush方法 * 方法调用完成,ChannelPromise会得到通知 */ ChannelFuture write(Object msg, ChannelPromise promise); /** * Request to flush all pending messages via this ChannelOutboundInvoker. * 写入缓冲区数据发送出去 */ ChannelOutboundInvoker flush(); /** * 先调用write(Object, ChannelPromise)再调用flush() */ ChannelFuture writeAndFlush(Object msg, ChannelPromise promise); /** * 先调用write(Object)再调用flush() */ ChannelFuture writeAndFlush(Object msg); /** * 创建一个ChannelPromise */ ChannelPromise newPromise(); /** * 创建一个newProgressivePromise */ ChannelProgressivePromise newProgressivePromise(); /** * * 创建一个标记为成功的ChannelFuture,ChannelFuture.isSuccess()==true,添加到此ChannelFuture上的FutureListener立即被通知 */ ChannelFuture newSucceededFuture(); /** * 创建一个标记为失败的ChannelFuture,ChannelFuture.isSuccess()==false,添加到此ChannelFuture上的FutureListener立即被通知 */ ChannelFuture newFailedFuture(Throwable cause); /** * Return a special ChannelPromise which can be reused for different operations. * <p> * It's only supported to use * it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)}. * </p> * <p> * Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used * if you want to save an object allocation for every write operation. You will not be able to detect if the * operation was complete, only if it failed as the implementation will call * {@link ChannelPipeline#fireExceptionCaught(Throwable)} in this case. * </p> * <strong>Be aware this is an expert feature and should be used with care!</strong> */ ChannelPromise voidPromise(); } public interface ChannelInboundInvoker { /** * * EventLoop.register 时触发fireChannelRegistered事件 * This will result in having the {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)} method * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelInboundHandler.channelRegistered(ChannelHandlerContext)-->this */ ChannelInboundInvoker fireChannelRegistered(); /** * EventLoop.unregistered 时触发fireChannelUnregistered事件 * This will result in having the {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)} method * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelInboundHandler.channelUnregistered(ChannelHandlerContext)-->this */ ChannelInboundInvoker fireChannelUnregistered(); /** * 当chanel 连接成功后-->触发fireChannelActive事件 * This will result in having the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)} method * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelInboundHandler.channelActive(ChannelHandlerContext)-->this */ ChannelInboundInvoker fireChannelActive(); /** * 当chanel 关闭后-->触发fireChannelActive事件 * This will result in having the {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} method * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelInboundHandler.channelInactive(ChannelHandlerContext)-->this */ ChannelInboundInvoker fireChannelInactive(); /** * A {@link Channel} received an {@link Throwable} in one of its inbound operations. * 当channel IO操作产生异常时--》触发fireExceptionCaught 事件 * This will result in having the {@link ChannelInboundHandler#exceptionCaught(ChannelHandlerContext, Throwable)} * method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelInboundHandler.exceptionCaught(ChannelHandlerContext,Throwable)-->this */ ChannelInboundInvoker fireExceptionCaught(Throwable cause); /** * A {@link Channel} received an user defined event. * 当channel接收到一个用户定义的事件时--》触发fireExceptionCaught 事件 * This will result in having the {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)} * method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelInboundHandler.userEventTriggered(ChannelHandlerContext,Object)-->this */ ChannelInboundInvoker fireUserEventTriggered(Object event); /** * A {@link Channel} received a message. * 当channel有数据进入时---》 触发fireChannelRead 事件 * This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} * method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelInboundHandler.channelRead(ChannelHandlerContext,Object)-->this */ ChannelInboundInvoker fireChannelRead(Object msg); /** * 当数据读取完成时--》触发ChannelPipeline中下一个ChannelInboundHandler.channelReadComplete事件 * Triggers an {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} * event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. */ ChannelInboundInvoker fireChannelReadComplete(); /** * 当channel 是否可以写入状态改变时--》触发ChannelPipeline中下一个ChannelInboundHandler.channelWritabilityChanged 事件 * Triggers an {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)} * event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. */ ChannelInboundInvoker fireChannelWritabilityChanged(); } /** * A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a * {@link Channel}. {@link ChannelPipeline} implements an advanced form of the * <a href="http://www.oracle.com/technetwork/java/interceptingfilter-142169.html">Intercepting Filter</a> pattern * to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline * interact with each other. * ChannelHandler 队列能够处理或拦截 channel 相关的读取(Inbound)事件或写入(Outbound)操作。 * ChannelPipeline 是一种拦截或过滤器模式,接收到事件,交由ChannelHandler队列处理,以及负责ChannelHandler之间的交互。 * <h3>Creation of a pipeline</h3> * 每一个Channel 都有它自己的pipeline,在channel创建的时候自动创建pipeline * Each channel has its own pipeline and it is created automatically when a new channel is created. * * <h3>How an event flows in a pipeline</h3> * 下面这个图展示了在ChannelPipeline中ChannelHandlers 处理IO事件过程。 * 每一个I/O 事件被ChannelInboundHandler 或ChannelOutboundHandler处理,并转交给相邻的ChannelInboundHandler或ChannelOutboundHandler处理, * 转交过程由ChannelHandlerContext实现,例如:ChannelHandlerContext#fireChannelRead(Object) * The following diagram describes how I/O events are processed by {@link ChannelHandler}s in a {@link ChannelPipeline} * typically. An I/O event is handled by either a {@link ChannelInboundHandler} or a {@link ChannelOutboundHandler} * and be forwarded to its closest handler by calling the event propagation methods defined in * {@link ChannelHandlerContext}, such as {@link ChannelHandlerContext#fireChannelRead(Object)} and * {@link ChannelHandlerContext#write(Object)}. * * <pre> * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * +---------------------------------------------------+---------------+ * | ChannelPipeline | | * | \|/ | * | +---------------------+ +-----------+----------+ | * | | Inbound Handler N | | Outbound Handler 1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | +----------+----------+ +-----------+----------+ | * | /|\ . | * | . . | * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| * | [ method call] [method call] | * | . . | * | . \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 1 | | Outbound Handler M | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * +---------------+-----------------------------------+---------------+ * | \|/ * +---------------+-----------------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+ * For example, let us assume that we created the following pipeline: * 示例 :下面ChannelPipeline添加了5个ChannelHandler * <pre> * {@link ChannelPipeline} p = ...; * p.addLast("1", new InboundHandlerA()); * p.addLast("2", new InboundHandlerB()); * p.addLast("3", new OutboundHandlerA()); * p.addLast("4", new OutboundHandlerB()); * p.addLast("5", new InboundOutboundHandlerX()); * </pre> * In the example above, the class whose name starts with {@code Inbound} means it is an inbound handler. * The class whose name starts with {@code Outbound} means it is a outbound handler. * <p> * 上面示例中对于读取(inbound)事件,ChannelHandler执行顺序方向为1---》5,3和4未实现ChannelInboundHandler,所以 * 真正执行的顺序为1--》2--》5 * 上面示例中对于写入(outbound)事件,ChannelHandler执行顺序方向为5---》1,1和2未实现ChannelOutboundHandler,所以 * 真正执行的顺序为 5---》4---》3 * 由于5实现了两个接口:ChannelInboundHandler和ChannelOutboundHandler,所以无论读取(inbound)或写入(outbound),它都会 * 执行 * *ChannelHandler 有责任把事件传递给下一个ChannelHandler执行,通过方式:调用ChannelHandlerContext的方法: * * <li>Inbound event propagation methods: * <ul> * <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li> * <li>{@link ChannelHandlerContext#fireChannelActive()}</li> * <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li> * <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li> * <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li> * <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li> * <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li> * <li>{@link ChannelHandlerContext#fireChannelInactive()}</li> * <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li> * </ul> * </li> * <li>Outbound event propagation methods: * <ul> * <li>{@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}</li> * <li>{@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}</li> * <li>{@link ChannelHandlerContext#write(Object, ChannelPromise)}</li> * <li>{@link ChannelHandlerContext#flush()}</li> * <li>{@link ChannelHandlerContext#read()}</li> * <li>{@link ChannelHandlerContext#disconnect(ChannelPromise)}</li> * <li>{@link ChannelHandlerContext#close(ChannelPromise)}</li> * <li>{@link ChannelHandlerContext#deregister(ChannelPromise)}</li> * </ul> * </li> * </ul> * * 下面示例展示事件的传递: * * <pre> * public class MyInboundHandler extends {@link ChannelInboundHandlerAdapter} { * * public void channelActive({@link ChannelHandlerContext} ctx) { * System.out.println("Connected!"); * ctx.fireChannelActive();//传递给一个ChannelInboundHandler处理 * } * } * * public clas MyOutboundHandler extends {@link ChannelOutboundHandlerAdapter} { * * public void close({@link ChannelHandlerContext} ctx, {@link ChannelPromise} promise) { * System.out.println("Closing .."); * ctx.close(promise);//传递给一个ChannelOutboundHandler处理 * } * } * </pre> * * * 构建一个ChannelPipeline: * * //构建一个线程池,用于执行业务逻辑 * static final EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(); * //channel 创建时pipeline 就创建(new)了 * ChannelPipeline pipeline = ch.pipeline(); * //添加解码器 ByteBuf--》Java object * pipeline.addLast("decoder", new MyProtocolDecoder()); * //添加编码器 Java object-》ByteBuf * pipeline.addLast("encoder", new MyProtocolEncoder()); * //添加业务逻辑处理器。告诉pipeline在业务线程池中执行MyBusinessLogicHandler,而不是在EventExecutor(NioEventLoopp) io线程中 * //执行,这样IO线程就不会被耗时的业务阻塞, 当然如果业务线程是异步或非常快的完成,就不用指定businessGroup--》即在IO线程中执行。 * pipeline.addLast(businessGroup, "handler", new MyBusinessLogicHandler()); * */ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> { /** * 在pipeline中相应的位置(位置决定执行顺序)插入ChannelHandler * * @param group 如果ChannelHandler执行时间长或阻塞式的,请使用此参数,指定 * 定一个线程池(EventExecutorGroup),会在此线程池中执行ChannelHandler,从而IO线程不阻塞。 * @param baseName the name of the existing handler * @param name the name of the handler to insert before * @param handlerType the type of the handler * @param oldHandler the {@link ChannelHandler} to be replaced * @param newName the name under which the replacement should be added * @param newHandler the {@link ChannelHandler} which is used as replacement * @param oldName the name of the {@link ChannelHandler} to be replaced * @param oldHandlerType the type of the handler to be removed */ ChannelPipeline addFirst(String name, ChannelHandler handler); ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); ChannelPipeline addLast(String name, ChannelHandler handler); ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); ChannelPipeline addFirst(ChannelHandler... handlers); ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); ChannelPipeline addLast(ChannelHandler... handlers); ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); ChannelPipeline remove(ChannelHandler handler); ChannelHandler remove(String name); <T extends ChannelHandler> T remove(Class<T> handlerType); ChannelHandler removeFirst(); ChannelHandler removeLast(); ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler); ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler); <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler); ChannelHandler first(); ChannelHandler last(); ChannelHandler get(String name); <T extends ChannelHandler> T get(Class<T> handlerType); /** * 返回某个ChannelHandler的ChannelHandlerContext */ ChannelHandlerContext firstContext(); ChannelHandlerContext lastContext(); ChannelHandlerContext context(ChannelHandler handler); ChannelHandlerContext context(String name); ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType); Channel channel(); List<String> names(); Map<String, ChannelHandler> toMap(); } /** * DefaultChannelPipeline 为具体的实现类,实现了ChannelPipeline,ChannelOutboundInvoker,ChannelInboundInvoker * 内部执行一个ChannelHandlerContext链表,head表示头部,tail表示尾部。 * 每添加一个ChannelHandler 就创建一个DefaultChannelHandlerContext,添加到链表中 * 之后在DefaultChannelHandlerContext.handlers 中添加ChannelHandler * 结构: * DefaultChannelPipeline(1)---->(多)ChannelHandlerContext(1)------->(1)ChannelHandler * */ public class DefaultChannelPipeline implements ChannelPipeline { protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel");//持有Channel succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); //创建了ChannelHandlerContext链表,head链表的头部,tail链表的尾部 tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } /** * 此为添加ChannelHandler的一个方法,其它的添加方式与其类似 */ public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //如果handler是ChannelHandlerAdapter的子类,且不是共享的且被添加过,则报错。 //handler.isShare==false && handler.add == true ==>报错 checkMultiplicity(handler); name = filterName(name, handler);//检查是否有重名 //创建 new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); // ChannelHandlerContext 是ChannelHandlerContext的实现 newCtx = newContext(group, name, handler); //添加到链表中 addFirst0(newCtx); //如果未注册过(in eventloop),就创建一个待执行任务:触发添加的ChannelHandlet.handlerAdded方法 // 在AbstractChannel.register0(ChannelPromise promise)方法中调用:pipeline.invokeHandlerAddedIfNeeded();执行此任务 if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); //如果注册过,但当前不是内部线程(NioEventLoop线程),则在NioEventLoop //中执行队列中添加handlerAdded的任务,由内部线程(NioEventLoop线程)触发添加的ChannelHandlet.handlerAdded方法 if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx);//注册过,又在内部线程内,则直接触发被添加的ChannelHandlet.handlerAdded方法 return this; } private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); } //ChannelHandlerContext 添加到链表 private void addFirst0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext nextCtx = head.next; newCtx.prev = head; newCtx.next = nextCtx; head.next = newCtx; nextCtx.prev = newCtx; } //实际添加ChannelHandler private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); }catch (Throwable t) { //ignore code } } /** * DefaultChannelPipeline 实现了ChannelInboundInvoker的firexxxx方法(属于inbound类型的)如: * fireChannelActive、fireChannelInactive、fireExceptionCaught、fireUserEventTriggered、fireChannelRead等等在ChannelPipine中定义接口 * 交给AbstractChannelHandlerContext.invokeXXXX(head) 方法会对head->next->next-->...->tail 每一个ChannelHandlerContext(ChannelInboundHandler的) * 调用ChannelInboundHandler.xxxx事件方法, */ public final ChannelPipeline fireXXXX() { AbstractChannelHandlerContext.invokeXXXX(head); return this; ) /** * DefaultChannelPipeline 实现了ChannelOutboundInvoker的一些网络IO方法(实际上是网络IO操作完成的通知), * 如bind,connect,disconnect,close,deregister,flush,read,write等(属于outbound类型的) * 都交给链表尾部的中的tail执行: * tail->prev->prev->prev->prev->....->head每一个ChannelHandlerContext(ChannelOutboundHandler的) * 调用ChannelInboundHandler.xxxx事件方法, * 最终链表中head.xxxx事件方法被调用(见正面head说明) **/ public final ChannelFuture xxxx(SocketAddress localAddress) { return tail.xxxx(localAddress); } /** * 下面看一链表中的head,即实现了ChannelOutboundHandler,又实现了ChannelInboundHandler * 对于ChannelOutboundHandler方法:bind,connect,read,write等,调用Channel.unsafe().xxxx方法——》实际的网络操作, * 并通知PipeLine执行fireXXXX方法 * */ final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { //Channel 中的Unsafe // private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise); } @Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) { destroy(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } } }
相关推荐
Netty4.1的源码,欢迎大家下载。.............................................................................................................................................................................
这个“netty4.1源码”压缩包包含的是Netty框架4.1版本的源代码,对于深入理解Netty的工作原理、性能优化以及自定义功能扩展非常有帮助。 Netty的核心特性包括: 1. **异步非阻塞I/O**:Netty基于Java NIO(非阻塞I...
这个"Netty-4.1 源码包"包含了Netty框架的源代码,允许开发者深入理解其内部工作原理,优化自定义实现,或者排查问题。 在Netty 4.1 版本中,主要包含以下关键知识点: 1. **NIO (Non-blocking I/O)**: Netty 使用...
在本文中,我们将深入探讨 Netty 4.1 的中文API帮助文档和用户指南,以及如何利用这些资源来提升你的网络编程技能。 首先,Netty 4.1 中文API帮助文档是理解 Netty 内部机制的关键工具。它包含了详细的类、接口、...
这个“netty 4.1 中文.CHM”文件是一个压缩包,包含的是Netty 4.1版本的中文版帮助文档,对于开发者来说是一个非常宝贵的资源,特别是对于那些中文为母语的开发者,它提供了方便的理解和学习Netty的途径。...
Netty-4.1.97.Final源码提供了对Netty内部机制的深度洞察,对于Java程序员尤其是希望提升网络编程能力或进行定制化开发的人来说,是一份极其宝贵的资料。 首先,让我们从整体上了解Netty的架构设计。Netty采用了...
赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....
netty案例,netty4.1中级拓展篇八《Netty心跳服务与断线重连》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724845&idx=1&sn=8631c590ff4876ba0b7af64df16fc54b&scene=19#wechat_redirect
赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...
这个压缩包包含的是Netty 4.1的英文API帮助文档和用户指南,对于理解和使用Netty框架非常有帮助。 首先,我们来看`netty 4.1.CHM`文件,这是一个CHM(Compiled Help Manual)格式的帮助文档,通常包含了详细的API...
赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...
在本文中,我们将深入分析 Netty 4.1 源码中的 EchoServer 示例,以理解其核心组件和工作原理。 首先,我们关注 EchoServer 服务端的初始化,这涉及到两个关键组件:`bossGroup` 和 `workerGroup`。它们都是 `...
标题 "netty-netty-4.1.32.final-remark.zip" 提到了 Netty 的版本号 4.1.32.Final,这表明这是一个关于 Netty 4.1.32.Final 版本的资料包。"final" 表示这是该版本的最终发布,通常意味着经过了充分测试和稳定。...
这个“netty-netty-4.1.79.Final.tar.gz”文件是一个包含Netty 4.1.79.Final版本的压缩包,通常用于Java开发环境。解压后,我们可以得到Netty的源代码、库文件和其他相关资源。 Netty的核心特性包括: 1. **异步...
netty案例,netty4.1中级拓展篇十三《Netty基于SSL实现信息传输过程中双向加密验证》源码 ...
netty案例,netty4.1中级拓展篇一《Netty与SpringBoot整合》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724796&idx=1&sn=ce5dc3c913d464b0e2e4e429a17bb01e&scene=19#wechat_redirect
netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...
netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724927&idx=1&sn=a16bc8e98d6a27816da0896adcc83778&scene=19#wechat_redirect
这个“netty-netty-4.1.69.Final.tar.gz”文件是Netty的最新稳定版本,版本号为4.1.69.Final,它是一个压缩包文件,通常包含源码、编译后的类库、文档和其他相关资源。 Netty的核心特点包括: 1. **异步事件驱动**...