AbstractChannel 是ChannelOutboundInvoker 的子类。
DefaultPipeLine 是ChannelOutboundInvoker 的子类。
那么哪个类才是真正实现ChannelOutboundInvoker 的功能? 是DefaultPipeLine 。
那AbstractChannel 为什么要实现ChannelOutboundInvoker ?
AbstractChannel 中实现了ChannelOutboundInvoker 的方法:只是简单的委派给DefaultPipeLine 。
AbstractChannel 中实现了ChannelOutboundInvoker 目的:在channel中平滑的调用ChannelOutboundInvoker 方法(委派给DefaultPipeLine ),相当于AbstractChannel 是DefaultPipeLine 的代理。
/** * * 持有多个Attribute对象,并通过AttributeKey可以获了 * 实现必须是线程安全的 */ public interface AttributeMap { /** * 通过AttributeKey 获取Attribute,如果没有就创建一个默认的Attribute返回 */ <T> Attribute<T> attr(AttributeKey<T> key); /** * 是否存在AttributeKey对应的Attribute */ <T> boolean hasAttr(AttributeKey<T> key); } /** * DefaultAttributeMap 实现了AttributeMap,内部使用 * private volatile AtomicReferenceArray<DefaultAttribute<?>> attributes; 保存DefaultAttribute 。 **/ public class DefaultAttributeMap implements AttributeMap { } public interface ChannelOutboundInvoker { /** * 请求端口绑定到SocketAddress并通知ChannelFuture 操作成功或失败 * This will result in having the * {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method * called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.bind(ChannelHandlerContext, SocketAddress, ChannelPromise)-->this */ ChannelFuture bind(SocketAddress localAddress); /** * 请求连接远端地址(SocketAddress)并通知ChannelFuture 操作成功或失败 * * 如果连接超时,触发ChannelFuture 失败with ConnectTimeoutException * 如果连接拒绝,触发ChannelFuture 失败with ConnectException * This will result in having the * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)—>this */ ChannelFuture connect(SocketAddress remoteAddress); /** * 请求连接远端地址(SocketAddress)并通知ChannelFuture 操作成功或失败 * * This will result in having the * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)—>this */ ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); /** * 请求远端断开连接并通知ChannelFuture 操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture disconnect(); /** * Request to close the {@link Channel} and notify the {@link ChannelFuture} once the operation completes, * either because the operation was successful or because of * an error. * 请求连接关闭 并通知ChannelFuture 操作成功或失败 * 一旦连接关闭,不能再重复使用它 * <p> * This will result in having the * {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture close(); /** * 请求从关联的EventExecutor上取消注册(不再获取此channel上的网络事件,read,write等)并通知ChannelFuture 操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture deregister(); /** * 请求端口绑定到SocketAddress并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)} method * called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)-->this */ ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise); /** * Request to connect to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. * * The given {@link ChannelFuture} will be notified. * 请求连接远端地址(SocketAddress)并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * If the connection fails because of a connection timeout, the {@link ChannelFuture} will get failed with * a {@link ConnectTimeoutException}. If it fails because of connection refused a {@link ConnectException} * will be used. * <p> * This will result in having the * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)-->this */ ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise); /** * 请求连接远端地址(SocketAddress)并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)-->this */ ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); /** * 请求远端断开连接并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.disconnect(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture disconnect(ChannelPromise promise); /** * Request to close the {@link Channel} and notify the {@link ChannelFuture} once the operation completes, * either because the operation was successful or because of * an error. * * After it is closed it is not possible to reuse it again. * 请求连接关闭 并通知ChannelFuture 操作成功或失败 * 一旦连接关闭,不能再重复使用它 * 同时 ChannelPromise被通知操作成功或失败 * The given {@link ChannelPromise} will be notified. * <p> * This will result in having the * {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.close(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture close(ChannelPromise promise); /** * 请求从关联的EventExecutor上取消注册(不再获取此channel上的网络事件,read,write等)并通知ChannelFuture 操作成功或失败 * 同时 ChannelPromise被通知操作成功或失败 * <p> * This will result in having the * {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.deregister(ChannelHandlerContext, ChannelPromise)-->this */ ChannelFuture deregister(ChannelPromise promise); /** * 从channel中读取数据-->第1个InBoundBuffer: * 1.触发ChannelInboundHandler.channelRead(ChannelHandlerContext, Object)事件(读出数据的情况下) * 2.触发ChannelInboundHandler.channelReadComplete(ChannelHandlerContext) 事件 * 如果已经有挂起的读取操作,则此方法不执行任何操作。 * This will result in having the * {@link ChannelOutboundHandler#read(ChannelHandlerContext)} * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the * {@link Channel}. * 调用源: * channel-->ChannelPipeline-->ChannelOutboundHandler.read(ChannelHandlerContext)-->this */ ChannelOutboundInvoker read(); /** * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. * 这个方法调用->ChannelHandlerContext.ChannelPipeline 写入缓冲区,但不会立即发送出去,除非调用Flush方法 * */ ChannelFuture write(Object msg); /** * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}. * This method will not request to actual flush, so be sure to call {@link #flush()} * once you want to request to flush all pending data to the actual transport. * 这个方法调用->ChannelHandlerContext.ChannelPipeline 写入缓冲区,但不会立即发送出去,除非调用Flush方法 * 方法调用完成,ChannelPromise会得到通知 */ ChannelFuture write(Object msg, ChannelPromise promise); /** * Request to flush all pending messages via this ChannelOutboundInvoker. * 写入缓冲区数据发送出去 */ ChannelOutboundInvoker flush(); /** * 先调用write(Object, ChannelPromise)再调用flush() */ ChannelFuture writeAndFlush(Object msg, ChannelPromise promise); /** * 先调用write(Object)再调用flush() */ ChannelFuture writeAndFlush(Object msg); /** * 创建一个ChannelPromise */ ChannelPromise newPromise(); /** * 创建一个newProgressivePromise */ ChannelProgressivePromise newProgressivePromise(); /** * * 创建一个标记为成功的ChannelFuture,ChannelFuture.isSuccess()==true,添加到此ChannelFuture上的FutureListener立即被通知 */ ChannelFuture newSucceededFuture(); /** * 创建一个标记为失败的ChannelFuture,ChannelFuture.isSuccess()==false,添加到此ChannelFuture上的FutureListener立即被通知 */ ChannelFuture newFailedFuture(Throwable cause); /** * Return a special ChannelPromise which can be reused for different operations. * <p> * It's only supported to use * it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)}. * </p> * <p> * Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used * if you want to save an object allocation for every write operation. You will not be able to detect if the * operation was complete, only if it failed as the implementation will call * {@link ChannelPipeline#fireExceptionCaught(Throwable)} in this case. * </p> * <strong>Be aware this is an expert feature and should be used with care!</strong> */ ChannelPromise voidPromise(); } /** * 一个channel(socket)通道,支持 read,write,connect,bind * 提供: * 1.查询channel状态 ,如 open? connected? * 2.通过ChannelConfig 配置channel参数 ,如接收发送缓冲区大小等 * 3.IO操作read, write, connect, and bind * 4.通过此channel关联的ChannelPipeline 触发IO事件 * 所有的IO操作都是异步的,所有操作立即返回ChannelFuture,通过ChannelFuture来获取操作结果 * SocketChannel 的parent()方法返回accept它的ServerSocketChannel * */ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { /** * 返回channel 唯一的ID */ ChannelId id(); /** * 每一个Channel都注册到EventLoop,返回Channel关联的EventLoop */ EventLoop eventLoop(); /** * 返回此channel的parent */ Channel parent(); /** * 返回channel对应的配置类ChannelConfig */ ChannelConfig config(); /** * channel 是否open? */ boolean isOpen(); /** * Returns {@code true} if the {@link Channel} is registered with an {@link EventLoop}. * channel 是否被注册到EventLoop(EventExecutor的实现) */ boolean isRegistered(); /** * Return {@code true} if the {@link Channel} is active and so connected. * 返回Channel是否是active?(已经连接Connected) */ boolean isActive(); /** * Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}. */ ChannelMetadata metadata(); /** * * 返回channel 的本地地址(可转换为InetSocketAddress类型获取详细信息) */ SocketAddress localAddress(); /** * 返回channel 的连接的远程地址(可转换为InetSocketAddress类型获取详细信息) */ SocketAddress remoteAddress(); /** * 返回ChannelFuture,当channel被关闭时,ChannelFuture被触发,多次调用返回同一实列 */ ChannelFuture closeFuture(); /** * Returns {@code true} if and only if the I/O thread will perform the * requested write operation immediately. Any write requests made when * this method returns {@code false} are queued until the I/O thread is * ready to process the queued write requests. */ boolean isWritable(); /** * Get how many bytes can be written until {@link #isWritable()} returns {@code false}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0. * 获取当前channel能够写入多少字节 * 当isWritable()==false,返回0 * 理解:写入缓冲区能写多少数据 */ long bytesBeforeUnwritable(); /** * Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0. */ long bytesBeforeWritable(); /** * Returns an <em>internal-use-only</em> object that provides unsafe operations. */ Unsafe unsafe(); /** * 返回关联的ChannelPipeline */ ChannelPipeline pipeline(); /** * 返回ByteBufAllocator,用于创建ByteBuf对象 */ ByteBufAllocator alloc(); @Override Channel read(); @Override Channel flush(); /** * Unsafe不应该由用户代码调用。 * Unsafe提供了IO调用方法(从IO线程调用) * invoker()、localAddress()、remoteAddress()、closeForcibly()、register(EventLoop, ChannelPromise)、 * deregister(ChannelPromise)、voidPromise() 可由其它线程调用。 */ interface Unsafe { /** * 返回 RecvByteBufAllocator.Handle, * RecvByteBufAllocator.Handle 用来给接收的数据分配ByteBuf */ RecvByteBufAllocator.Handle recvBufAllocHandle(); /** * 返回本地网络地址 */ SocketAddress localAddress(); /** * 返回远程网络地址 */ SocketAddress remoteAddress(); /** * 注册ChannelPromise里的channel,当注册完成后通知ChannelFuture */ void register(EventLoop eventLoop, ChannelPromise promise); /** * Bind the SocketAddress ,完成后通知ChannelPromise */ void bind(SocketAddress localAddress, ChannelPromise promise); /** * 本地channel(ChannelPromise内部)连接远程地址,完成后通知ChannelPromise * localAddress可空 */ void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); /** * 断开连接(Channel在ChannelPromise内部),完成后通知ChannelPromise */ void disconnect(ChannelPromise promise); /** * 关闭Channel(Channel在ChannelPromise内部),完成后并通知ChannelPromise */ void close(ChannelPromise promise); /** * 立即关闭channel 并不触发任务事件,但当注册失败情况下触发。 */ void closeForcibly(); /** * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the * {@link ChannelPromise} once the operation was complete. * 通过EventLoop注销(Deregister)channel(Channel在ChannelPromise内部),完成后通知ChannelPromise */ void deregister(ChannelPromise promise); /** * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the * {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing. * 调用读操作添充缓冲区(ChannelInboundHandler 在ChannelPipeline 内部) */ void beginRead(); /** * Schedules a write operation. */ void write(Object msg, ChannelPromise promise); /** * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}. */ void flush(); /** * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}. * It will never be notified of a success or error and so is only a placeholder for operations * that take a {@link ChannelPromise} as argument but for which you not want to get notified. */ ChannelPromise voidPromise(); /** * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored. * 返回一个ChannelOutboundBuffer(缓冲区对象) */ ChannelOutboundBuffer outboundBuffer(); } } public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final Channel parent;//构建器参数赋值 private final ChannelId id;//构建器创建一个DefaultChannelId private final Unsafe unsafe;//构建器 创建一个 newUnsafe private final DefaultChannelPipeline pipeline;//构建器 创建 new DefaultChannelPipeline(this); private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final CloseFuture closeFuture = new CloseFuture(this);//一个实例 private volatile SocketAddress localAddress;//localAddress()调用时通过 unsafe().remoteAddress()获取; private volatile SocketAddress remoteAddress;//remoteAddress()调用时通过 unsafe().remoteAddress()获取; private volatile EventLoop eventLoop; //在unsafe.register方法被调时,赋值,值来源于参数 private volatile boolean registered;//在unsafe.register方法被调时,赋值,值来源于注册结果 protected AbstractChannel(Channel parent) { this.parent = parent;//父channel id = newId();//创建一个DefaultChannelId unsafe = newUnsafe();//创建Unsafe pipeline = newChannelPipeline();//创建 new DefaultChannelPipeline(this); } public ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress); } public ChannelFuture connect(SocketAddress remoteAddress) { return pipeline.connect(remoteAddress); } public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return pipeline.connect(remoteAddress, localAddress);} /** * 以下网络IO方法都与上面三个类都,功能都委派给pipeline的同名方法执行。 **/ public ChannelFuture disconnect() ; public ChannelFuture close() ; public ChannelFuture deregister(); public Channel flush() ; public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) ; public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) ; public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) ; public ChannelFuture disconnect(ChannelPromise promise); public ChannelFuture close(ChannelPromise promise); public ChannelFuture deregister(ChannelPromise promise); public Channel read() ; public ChannelFuture write(Object msg) ; public ChannelFuture write(Object msg, ChannelPromise promise); public ChannelFuture writeAndFlush(Object msg) ; public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) ; public ChannelPromise newPromise(); public ChannelProgressivePromise newProgressivePromise() ; public ChannelFuture newSucceededFuture(); public ChannelFuture newFailedFuture(Throwable cause) ; /** * * AbstractUnsafe 功能: * 1.触发网络IO事件调用(方法是AbstractChannel子类实现)doBind、doDisconnect、doClose、doBeginRead、doWrite * 2.对ChannelPromise 设置结果:成功或失败 * 3.触发pipeline.fireXXXX();方法 */ protected abstract class AbstractUnsafe implements Unsafe { //io缓存 private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); //用来给接收的数据分配ByteBuf private RecvByteBufAllocator.Handle recvHandle; //是否Flush private boolean inFlush0; //默认未注册 private boolean neverRegistered = true; @Override public RecvByteBufAllocator.Handle recvBufAllocHandle() { if (recvHandle == null) {//从config中获取RecvByteBufAllocator.Handle recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; } @Override public final SocketAddress localAddress() { return localAddress0(); } @Override public final SocketAddress remoteAddress() { return remoteAddress0(); } @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop;//给AbstractChannel.eventLoop赋值 if (eventLoop.inEventLoop()) {//如果是内部线程(NioEventLoop启动的线程),直接调用register0注册 register0(promise); } else { try { eventLoop.execute(new Runnable() {//添加到NioEventLoop的任务队列,由内部线程从队列中取出再执行任务->调用register0 @Override public void run() { register0(promise); } }); } catch (Throwable t) { } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister();//执行注册操作 neverRegistered = false;//改为状态 registered = true;//channel 改变状态 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise);//设置ChannelPromise 为成功 pipeline.fireChannelRegistered();//触发注册事件 // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive();//只触发一次ChannelActive事件 } else if (config().isAutoRead()) {//config 自动读取为true,则自动调用读取IO读取 // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { } } @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) {} if (!wasActive && isActive()) {// invokeLater(new Runnable() { @Override public void run() {//只触发一次ChannelActive事件 pipeline.fireChannelActive(); } }); } safeSetSuccess(promise);//设置ChannelPromise 为成功 } @Override public final void disconnect(final ChannelPromise promise) { boolean wasActive = isActive(); try { doDisconnect();//执行网络断开 } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (wasActive && !isActive()) { invokeLater(new Runnable() { @Override public void run() {//只触发一次ChannelInactive事件 pipeline.fireChannelInactive(); } }); } safeSetSuccess(promise); closeIfClosed(); // doDisconnect() might have closed the channel } @Override public final void close(final ChannelPromise promise) { close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false); } private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (closeFuture.isDone()) {//closeFuture 只有一个实例,如果已经close了,直接返回 // Closed already. safeSetSuccess(promise);//设置ChannelPromise 为成功 return; } final boolean wasActive = isActive(); this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. try { // Close the channel and fail the queued messages in all cases. doClose0(promise);//关闭 } finally { } if (inFlush0) { invokeLater(new Runnable() { @Override public void run() { fireChannelInactiveAndDeregister(wasActive);//触发deregister、fireChannelInactive } }); } else { fireChannelInactiveAndDeregister(wasActive);//触发deregister、fireChannelInactive } } private void doClose0(ChannelPromise promise) { try { doClose();//执行关闭 closeFuture.setClosed();//设置closeFuture 状态 safeSetSuccess(promise);//设置ChannelPromise 为成功 } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } } private void fireChannelInactiveAndDeregister(final boolean wasActive) { deregister(voidPromise(), wasActive && !isActive()); } @Override public final void deregister(final ChannelPromise promise) { assertEventLoop(); deregister(promise, false); } private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) { if (!registered) {//没有注册过,直接设置ChannelPromise成功并返回 safeSetSuccess(promise); return; } doDeregister();//调用doDeregister方法 } @Override public final void beginRead() { try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e);//读取时有异常,触发ExceptionCaught事件 } }); close(voidPromise()); } } @Override public final void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); outboundBuffer.addMessage(msg, size, promise);//写入的数据存入ChannelOutboundBuffer } @Override public final void flush() { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; outboundBuffer.addFlush();//将未flush过的数据打上状态,以便flush时取出发送 flush0(); } @SuppressWarnings("deprecation") protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; inFlush0 = true; try { doWrite(outboundBuffer); } catch (Throwable t) { //异常处理 } finally { inFlush0 = false; } } private void invokeLater(Runnable task) { try { eventLoop().execute(task);//添加到NioEventLoop 执行队列,由内部线程执行 } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } } } /** * 以下方法为真正调用IO的代码,由子类实现 */ protected abstract SocketAddress localAddress0(); protected abstract SocketAddress remoteAddress0(); protected void doRegister() throws Exception { // NOOP } protected abstract void doBind(SocketAddress localAddress) throws Exception; protected abstract void doDisconnect() throws Exception; protected abstract void doClose() throws Exception; protected void doDeregister() throws Exception { // NOOP } protected abstract void doBeginRead() throws Exception; protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception; protected Object filterOutboundMessage(Object msg) throws Exception { return msg; } /** * 实现部分selector及selectkey相关操作:注册,取消注册,监听channel的读取 * 以及AbstractNioUnsafe 实现了connect方法 */ public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doRegister() throws Exception {//在selector上注册channel boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this);//attachment==AbstractNioChannel return; } catch (CancelledKeyException e) { } } } @Override protected void doDeregister() throws Exception { eventLoop().cancel(selectionKey());//取消channel所有事件监听 } @Override protected void doBeginRead() throws Exception { final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp);//让selector 监听channel的读取 } } protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { @Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { if (doConnect(remoteAddress, localAddress)) {//由子类实现 if (!wasActive && active) { pipeline().fireChannelActive();//只触发一次fireChannelActive事件 } } else { // } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } } } } /** * 实现io read,write */ public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected class NioByteUnsafe extends AbstractNioUnsafe { @Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); doReadBytes//由子类实现 allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf);触发pipeline.fireChannelRead事件(对象类型为ByteBuf) byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete();触发pipeline.fireChannelReadComplete事件 } catch (Throwable t) { //ignore } finally { //ignore } } } } /** * 执行写入操作 * 在写入缓中区中ChannelOutboundBuffer 获取需要发送的数据进入网络发送(Write)直到没有发送的事件---》在selector上取消write监听 * 如果由于网络写入缓冲区没有空间,在selector上注册write监听,并写添加任务队列任务,由线程调用flush方法继续发送 */ protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { Object msg = in.current();//flush方法标记过的需要发送的数据 if (msg == null) { // Wrote all messages. clearOpWrite();//如果没有需要写入的数据,则在select 取消write监听 // Directly return here so incompleteWrite(...) is not called. return;//退出 } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes();// boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); //doWriteBytes由子类实现数据写入 if (localFlushedAmount == 0) {//网络写入缓冲区不可以写入了,停止写入,退出 setOpWrite = true;//标记为未完成写入 break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) {//没有数据需要发送了 done = true; break; } } in.progress(flushedAmount);//标记已发送的数据量,以便下次发送时接着发送 if (done) {//没有数据需要发送了 in.remove();//ChannelOutboundBuffer 移除已发送的数据 } else { // Break the loop and so incompleteWrite(...) is called. break; } } else if (msg instanceof FileRegion) { //ignore } else { // Should not reach here. throw new Error(); } } incompleteWrite(setOpWrite); } /** * 未完成写入,在selector上注册写入事件,添加到任务队列执行:flush-->this.doWrite() 继续发送 **/ protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { setOpWrite();//在selector上注册写入事件 } else { // Schedule flush again later so other tasks can be picked up in the meantime Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask);//线程中调用继续写入 } } } /** * * 调用jdk SocketChannel 实现网络操作,read,write , */ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { @Override public boolean isActive() { SocketChannel ch = javaChannel(); return ch.isOpen() && ch.isConnected(); } @Override protected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress(); } @Override protected SocketAddress remoteAddress0() { return javaChannel().socket().getRemoteSocketAddress(); } @Override protected void doBind(SocketAddress localAddress) throws Exception { doBind0(localAddress); } private void doBind0(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress); } else { javaChannel().socket().bind(localAddress); } } @Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress); } boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } } @Override protected void doFinishConnect() throws Exception { if (!javaChannel().finishConnect()) { throw new Error(); } } @Override protected void doDisconnect() throws Exception { doClose(); } @Override protected void doClose() throws Exception { super.doClose(); javaChannel().close(); } @Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); } @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); } }
相关推荐
这个“netty-4.1.zip”压缩包包含的是Netty框架的4.1版本源代码,对于Java后端开发者来说,深入学习和理解Netty的源码能极大地提升网络编程的能力。 Netty的核心设计理念是基于Reactor模式,这是一种处理并发I/O...
Channel有多种实现,如NioSocketChannel、NioServerSocketChannel等,对应不同的网络通信方式。 4. **EventLoop和EventLoopGroup**:每个Channel都关联一个EventLoop,负责处理该Channel上的事件。EventLoopGroup是...
基于Maxwell设计的经典280W 4025RPM高效率科尔摩根12极39槽TBM无框力矩电机:生产与学习双重应用案例,基于Maxwell设计的经典280W高转速科尔摩根TBM无框力矩电机:7615系列案例解析与应用实践,基于maxwwell设计的经典280W,4025RPM 内转子 科尔摩根 12极39槽 TBM无框力矩电机,7615系列。 该案例可用于生产,或者学习用,(157) ,maxwell设计; 280W; 4025RPM内转子; 科尔摩根; 12极39槽TBM无框力矩电机; 7615系列; 生产/学习用。,基于Maxwell设计,高功率280W 12极39槽TBM无框力矩电机:生产与学习双用途案例
基于碳交易的微网优化模型的Matlab设计与实现策略分析,基于碳交易的微网优化模型的Matlab设计与实现探讨,考虑碳交易的微网优化模型matlab ,考虑碳交易; 微网优化模型; MATLAB;,基于Matlab的碳交易微网优化模型研究
二级2025模拟试题(答案版)
OpenCV是一个功能强大的计算机视觉库,它提供了多种工具和算法来处理图像和视频数据。在C++中,OpenCV可以用于实现基础的人脸识别功能,包括从摄像头、图片和视频中识别人脸,以及通过PCA(主成分分析)提取图像轮廓。以下是对本资源大体的介绍: 1. 从摄像头中识别人脸:通过使用OpenCV的Haar特征分类器,我们可以实时从摄像头捕获的视频流中检测人脸。这个过程涉及到将视频帧转换为灰度图像,然后使用预训练的Haar级联分类器来识别人脸区域。 2. 从视频中识别出所有人脸和人眼:在视频流中,除了检测人脸,我们还可以进一步识别人眼。这通常涉及到使用额外的Haar级联分类器来定位人眼区域,从而实现对人脸特征的更细致分析。 3. 从图片中检测出人脸:对于静态图片,OpenCV同样能够检测人脸。通过加载图片,转换为灰度图,然后应用Haar级联分类器,我们可以在图片中标记出人脸的位置。 4. PCA提取图像轮廓:PCA是一种统计方法,用于分析和解释数据中的模式。在图像处理中,PCA可以用来提取图像的主要轮廓特征,这对于人脸识别技术中的面部特征提取尤
麻雀搜索算法(SSA)自适应t分布改进版:卓越性能与优化代码注释,适合深度学习。,自适应t分布改进麻雀搜索算法(TSSA)——卓越的学习样本,优化效果出众,麻雀搜索算法(SSA)改进——采用自适应t分布改进麻雀位置(TSSA),优化后明显要优于基础SSA(代码基本每一步都有注释,代码质量极高,非常适合学习) ,TSSA(自适应t分布麻雀位置算法);注释详尽;高质量代码;适合学习;算法改进结果优异;TSSA相比基础SSA。,自适应T分布优化麻雀搜索算法:代码详解与学习首选(TSSA改进版)
锂电池主动均衡Simulink仿真研究:多种均衡策略与电路架构的深度探讨,锂电池主动均衡与多种均衡策略的Simulink仿真研究:buckboost拓扑及多层次电路分析,锂电池主动均衡simulink仿真 四节电池 基于buckboost(升降压)拓扑 (还有传统电感均衡+开关电容均衡+双向反激均衡+双层准谐振均衡+环形均衡器+cuk+耦合电感)被动均衡电阻式均衡 、分层架构式均衡以及分层式电路均衡,多层次电路,充放电。 ,核心关键词: 锂电池; 主动均衡; Simulink仿真; 四节电池; BuckBoost拓扑; 传统电感均衡; 开关电容均衡; 双向反激均衡; 双层准谐振均衡; 环形均衡器; CUK均衡; 耦合电感均衡; 被动均衡; 电阻式均衡; 分层架构式均衡; 多层次电路; 充放电。,锂电池均衡策略研究:Simulink仿真下的多拓扑主动与被动均衡技术
S7-1500和分布式外围系统ET200MP模块数据
内置式永磁同步电机无位置传感器模型:基于滑膜观测器和MTPA技术的深度探究,内置式永磁同步电机基于滑膜观测器和MTPA的无位置传感器模型研究,基于滑膜观测器和MTPA的内置式永磁同步电机无位置传感器模型 ,基于滑膜观测器;MTPA;内置式永磁同步电机;无位置传感器模型,基于滑膜观测与MTPA算法的永磁同步电机无位置传感器模型
centos7操作系统下安装docker,及docker常用命令、在docker中运行nginx示例,包括 1.设置yum的仓库 2.安装 Docker Engine-Community 3.docker使用 4.查看docker进程是否启动成功 5.docker常用命令及nginx示例 6.常见问题
给曙光服务器安装windows2012r2时候找不到磁盘,问厂家工程师要的raid卡驱动,内含主流大多数品牌raid卡驱动
数学建模相关主题资源2
西门子四轴卧式加工中心后处理系统:828D至840D支持,四轴联动制造解决方案,图档处理与试看程序一应俱全。,西门子四轴卧加后处理系统:支持828D至840D系统,四轴联动高精度制造解决方案,西门子四轴卧加后处理,支持828D~840D系统,支持四轴联动,可制制,看清楚联系,可提供图档处理试看程序 ,核心关键词:西门子四轴卧加后处理; 828D~840D系统支持; 四轴联动; 制程; 联系; 图档处理试看程序。,西门子四轴卧加后处理程序,支持多种系统与四轴联动
MATLAB下基于列约束生成法CCG的两阶段鲁棒优化问题求解入门指南:算法验证与经典文献参考,MATLAB下基于列约束生成法CCG的两阶段鲁棒优化问题求解入门指南:算法验证与文献参考,MATLAB代码:基于列约束生成法CCG的两阶段问题求解 关键词:两阶段鲁棒 列约束生成法 CCG算法 参考文档:《Solving two-stage robust optimization problems using a column-and-constraint generation method》 仿真平台:MATLAB YALMIP+CPLEX 主要内容:代码构建了两阶段鲁棒优化模型,并用文档中的相对简单的算例,进行CCG算法的验证,此篇文献是CCG算法或者列约束生成算法的入门级文献,其经典程度不言而喻,几乎每个搞CCG的两阶段鲁棒的人都绕不过此篇文献 ,两阶段鲁棒;列约束生成法;CCG算法;MATLAB;YALMIP+CPLEX;入门级文献。,MATLAB代码实现:基于两阶段鲁棒与列约束生成法CCG的算法验证研究
“生热研究的全面解读:探究参数已配置的Comsol模型中的18650圆柱锂电池表现”,探究已配置参数的COMSOL模型下的锂电池生热现象:18650圆柱锂电池模拟分析,出一个18650圆柱锂电池comsol模型 参数已配置,生热研究 ,出模型; 18650圆柱锂电池; comsol模型; 参数配置; 生热研究,构建18650电池的COMSOL热研究模型
移动端多端运行的知识付费管理系统源码,TP6+Layui+MySQL后端支持,功能丰富,涵盖直播、点播、管理全功能及礼物互动,基于UniApp跨平台开发的移动端知识付费管理系统源码:多端互通、全功能齐备、后端采用TP6与PHP及Layui前端,搭载MySQL数据库与直播、点播、管理、礼物等功能的强大整合。,知识付费管理系统源码,移动端uniApp开发,app h5 小程序一套代码多端运行,后端php(tp6)+layui+MySQL,功能齐全,直播,点播,管理,礼物等等功能应有尽有 ,知识付费;管理系统源码;移动端uniApp开发;多端运行;后端php(tp6);layui;MySQL;直播点播;管理功能;礼物功能,知识付费管理平台:全功能多端运行系统源码(PHP+Layui+MySQL)
基于Python+Django+MySQL的个性化图书推荐系统:协同过滤推荐,智能部署,用户定制功能,基于Python+Django+MySQL的个性化图书推荐系统:协同过滤推荐,智能部署,用户定制功能,Python+Django+Mysql个性化图书推荐系统 图书在线推荐系统 基于用户、项目、内容的协同过滤推荐算法。 帮远程安装部署 一、项目简介 1、开发工具和实现技术 Python3.8,Django4,mysql8,navicat数据库管理工具,html页面,javascript脚本,jquery脚本,bootstrap前端框架,layer弹窗组件、webuploader文件上传组件等。 2、项目功能 前台用户包含:注册、登录、注销、浏览图书、搜索图书、信息修改、密码修改、兴趣喜好标签、图书评分、图书收藏、图书评论、热点推荐、个性化推荐图书等功能; 后台管理员包含:用户管理、图书管理、图书类型管理、评分管理、收藏管理、评论管理、兴趣喜好标签管理、权限管理等。 个性化推荐功能: 无论是否登录,在前台首页展示热点推荐(根据图书被收藏数量降序推荐)。 登录用户,在前台首页展示个性化推荐
STM32企业级锅炉控制器源码分享:真实项目经验,带注释完整源码助你快速掌握实战经验,STM32企业级锅炉控制器源码:真实项目经验,完整注释,助力初学者快速上手,stm32真实企业项目源码 项目要求与网上搜的那些开发板的例程完全不在一个级别,也不是那些凑合性质的项目可以比拟的。 项目是企业级产品的要求开发的,能够让初学者了解真实的企业项目是怎么样的,增加工作经验 企业真实项目网上稀缺,完整源码带注释,适合没有参与工作或者刚学stm32的增加工作经验, 这是一个锅炉的控制器,有流程图和程序协议的介绍。 ,stm32源码;企业级项目;工作经验;锅炉控制器;流程图;程序协议,基于STM32的真实企业级锅炉控制器项目源码
整车性能目标书:涵盖燃油车、混动车及纯电动车型的十六个性能模块目标定义模板与集成开发指南,整车性能目标书:涵盖燃油车、混动车及纯电动车型的十六个性能模块目标定义模板与集成开发指南,整车性能目标书,汽车性能目标书,十六个性能模块目标定义模板,包含燃油车、混动车型及纯电动车型。 对于整车性能的集成开发具有较高的参考价值 ,整车性能目标书;汽车性能目标书;性能模块目标定义模板;燃油车;混动车型;纯电动车型;集成开发;参考价值,《汽车性能模块化目标书:燃油车、混动车及纯电动车的集成开发参考》