`
Donald_Draper
  • 浏览: 987655 次
社区版块
存档分类
最新评论

netty 抽象nio通道解析

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
引言:
上一篇文章我们看了抽象通道触发OutboundInvoker相关事件方法,先来回顾一下:
    通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
今天我们来看抽象nio通道AbstractNioChannel
package io.netty.channel.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * Abstract base class for {@link Channel} implementations which use a Selector based approach.
 */
public abstract class AbstractNioChannel extends AbstractChannel {

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(AbstractNioChannel.class);
    //关闭通道异常
    private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractNioChannel.class, "doClose()");

    private final SelectableChannel ch;//关联通道
    protected final int readInterestOp;//读操作事件
    volatile SelectionKey selectionKey;//关联选择key
    boolean readPending;//是否开始读操作
    private final Runnable clearReadPendingRunnable = new Runnable() {
        @Override
        public void run() {
            clearReadPending0();
        }
    };

    /**
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    private ChannelPromise connectPromise;//异步可写的连接任务
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;//远端socket地址
}

从上面可以看出,抽象nio通道内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)


来看构造:
/**
 * Create a new instance
 *
 * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
 所属通道
 * @param ch                the underlying {@link SelectableChannel} on which it operates
 底层选择通道
 * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
 读操作事件
 */
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        //初始化通道,并配置为非阻塞模式
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

从上面可以看出抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。
下面几个方法,很简单,看一下就行
//通道是否打开
@Override
public boolean isOpen() {
    return ch.isOpen();
}
//获取底层选择通道
protected SelectableChannel javaChannel() {
    return ch;
}
//获取通道所在的事件循环
@Override
public NioEventLoop eventLoop() {
    return (NioEventLoop) super.eventLoop();
}


来看抽象通道几个实际操作方法do*,
先从注册方法开始
 @Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
	   //注册可选择通道到通道所在事件循环的选择器中
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
		//如果注册异常,并且没有选择,则执行选择操作,将选择key从选择器的取消key集合中移除
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

从上面来看注册工作主要是,注册可选择通道到通道所在事件循环的选择器中。

//再来看反注册
@Override
protected void doDeregister() throws Exception {
    //委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key
    eventLoop().cancel(selectionKey());
}


//读操作
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        //选择key无效,直接返回
        return;
    }

    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        //将读操作事件,添加选择key的兴趣事件集
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

从上面可以看出,开始读操作,实际工作为将读操作事件,添加选择key的兴趣事件集

再来看实际关闭任务操作:
@Override
protected void doClose() throws Exception {
    ChannelPromise promise = connectPromise;
    if (promise != null) {
        // Use tryFailure() instead of setFailure() to avoid the race against cancel().
        promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
        connectPromise = null;
    }
    //获取连接超时任务
    ScheduledFuture<?> future = connectTimeoutFuture;
    if (future != null) {
        //取消任务
        future.cancel(false);
        connectTimeoutFuture = null;
    }
}

来看其他方法:
//获取通道Unsafe
@Override
public NioUnsafe unsafe() {
    return (NioUnsafe) super.unsafe();
}
/**
 * Return the current {@link SelectionKey}
 获取通道选择key
 */
protected SelectionKey selectionKey() {
    assert selectionKey != null;
    return selectionKey;
}

/**
 * @deprecated No longer supported.
 * No longer supported.
 是否正在进行读操作
 */
@Deprecated
protected boolean isReadPending() {
    return readPending;
}

/**
 * @deprecated Use {@link #clearReadPending()} if appropriate instead.
 * No longer supported.
 设置正在读标志
 */
@Deprecated
protected void setReadPending(final boolean readPending) {
    if (isRegistered()) {
        EventLoop eventLoop = eventLoop();
        if (eventLoop.inEventLoop()) {
	    //在当前事件循环,完成实际设置读标志
            setReadPending0(readPending);
        } else {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    setReadPending0(readPending);
                }
            });
        }
    } else {
        // Best effort if we are not registered yet clear readPending.
        // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
        // not set yet so it would produce an assertion failure.
        this.readPending = readPending;
    }
}
//完成实际设置读标志
private void setReadPending0(boolean readPending) {
    this.readPending = readPending;
    if (!readPending) {
         //如果为不在读数据,委托给Unsafe,从选择key兴趣集中移除读操作事件
        ((AbstractNioUnsafe) unsafe()).removeReadOp();
    }
}


/**
 * Set read pending to {@code false}.
 清除读操作标志,即设置标志为false
 */
protected final void clearReadPending() {
    if (isRegistered()) {
        EventLoop eventLoop = eventLoop();
        if (eventLoop.inEventLoop()) {
	   //在当前事件循环,完成实际清除
            clearReadPending0();
        } else {
            eventLoop.execute(clearReadPendingRunnable);
        }
    } else {
        // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
        // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
        // not set yet so it would produce an assertion failure.
        readPending = false;
    }
}
//完成实际清除
private void clearReadPending0() {
    readPending = false;
    //委托给Unsafe,从选择key兴趣集中移除读操作事件
    ((AbstractNioUnsafe) unsafe()).removeReadOp();
}


从下面一个方法:
//获取通道Unsafe
@Override
public NioUnsafe unsafe() {
    return (NioUnsafe) super.unsafe();
}

可以看出抽象nio通道的内部Unsafe为NioUnsafe

/**
 * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
 特殊的Unsafe,允许访问底层的选择通道
 */
public interface NioUnsafe extends Unsafe {
    /**
     * Return underlying {@link SelectableChannel}
     返回底层选择通道
     */
    SelectableChannel ch();

    /**
     * Finish connect
     完成连接
     */
    void finishConnect();

    /**
     * Read from underlying {@link SelectableChannel}
     从底层选择操作,读取数据
     */
    void read();
    //强制刷新
    void forceFlush();
}


来看NioUnsafe的抽象实现AbstractNioUnsafe,为抽象nio通道的内部类,
 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
        //移除读兴趣事件
        protected final void removeReadOp() {
	    //获取选择key
            SelectionKey key = selectionKey();
            // Check first if the key is still valid as it may be canceled as part of the deregistration
            // from the EventLoop
            // See https://github.com/netty/netty/issues/2104
            if (!key.isValid()) {
	        //选择key无效,直接返回
                return;
            }
            int interestOps = key.interestOps();
            if ((interestOps & readInterestOp) != 0) {
                // only remove readInterestOp if needed
		//从选择key兴趣事件集中,移除读操作事件
                key.interestOps(interestOps & ~readInterestOp);
            }
        }
        //获取选择通道,实际返回的抽象nio通道内部的底层可选择通道
        @Override
        public final SelectableChannel ch() {
            return javaChannel();
        }
}

从上面可以看出,抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。
选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。
移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。

来看抽象NioUnsafe的其他方法
//连接远端Socket地址,如果需要绑定本地socket地址,连接完成通知异步可写任务promise
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        //确保任务没有取消,通道打开
        return;
    }

    try {
        if (connectPromise != null) {
	    //已连接
            // Already a connect in process.
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();
        if (doConnect(remoteAddress, localAddress)) {
	   //如果连接成功,则更新任务结果,如果需要,则触发通道的激活事件fireChannelActive
            fulfillConnectPromise(promise, wasActive);
        } else {
	    //连接失败,则添加异步连接任务
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.连接超时配置
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
	    //更新连接异步任务结果为连接超时,并将任务交个事件循环区调度
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            //添加任务监听器
            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
			//连接任务取消,则关闭任务
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
       //异常,则更新任务失败,如果需要则关闭通道
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

连接方法,我们需要关注为以下片段:

if (doConnect(remoteAddress, localAddress)) {
   //如果连接成功,则更新任务结果,如果需要,则触发通道的激活事件fireChannelActive
    fulfillConnectPromise(promise, wasActive);
} 


//AbstractNioChannel
/**
 * Connect to the remote peer,待子类实现
 */
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;



//更新任务结果,触发通道的激活事件fireChannelActive
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    if (promise == null) {
        // Closed via cancellation and the promise has been notified already.
	通道已经关闭,直接返回
        return;
    }

    // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
    // We still need to ensure we call fireChannelActive() in this case.
    boolean active = isActive();

    // trySuccess() will return false if a user cancelled the connection attempt.
    //更新连接任务成功完成
    boolean promiseSet = promise.trySuccess();

    // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
    // because what happened is what happened.
    if (!wasActive && active) {
        //触发通道的激活事件fireChannelActive
        pipeline().fireChannelActive();
    }
    // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
    if (!promiseSet) {
        //关闭任务
        close(voidPromise());
    }
}

从上面可以看出,连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功
,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。

再来看完成连接方法
@Override
public final void finishConnect() {
    // Note this method is invoked by the event loop only if the connection attempt was
    // neither cancelled nor timed out.

    assert eventLoop().inEventLoop();

    try {
        boolean wasActive = isActive();
	//实际完成连接
        doFinishConnect();
	//更新任务结果,触发通道的激活事件fireChannelActive
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {
         //这个包装异常的方法annotateConnectException,在上一篇已看过,即将远端socket地址,添加的异常信息中
        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    } finally {
        // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
        // See https://github.com/netty/netty/issues/1770
        if (connectTimeoutFuture != null) {
            connectTimeoutFuture.cancel(false);
        }
        connectPromise = null;
    }
}

//AbstractNioChannel
/**
 * Finish the connect,待子类实现
 */
protected abstract void doFinishConnect() throws Exception;



//更新连接任务为异常失败
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
    if (promise == null) {
        // Closed via cancellation and the promise has been notified already.
        return;
    }
    // Use tryFailure() instead of setFailure() to avoid the race against cancel().
    promise.tryFailure(cause);
    closeIfClosed();
}

从上面可以看出,完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。

再来看刷新操作
@Override
protected final void flush0() {
    // Flush immediately only when there's no pending flush.
    // If there's a pending flush operation, event loop will call forceFlush() later,
    // and thus there's no need to call it now.
    if (isFlushPending()) {
        return;
    }
    //委托给父类
    super.flush0();
}


@Override
public final void forceFlush() {
    // directly call super.flush0() to force a flush now
     //委托给父类
    super.flush0();
}

private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    //写操作事件,存在选择key兴趣事件集中
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}


回到抽象nio通道,再来看抽象nio通道的其他方法:
//判断事件循环为通道兼容,即判断事件循环是否为Nio事件循环
@Override
protected boolean isCompatible(EventLoop loop) {
    return loop instanceof NioEventLoop;
}


/**
 * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
 * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
 * but just returns the original {@link ByteBuf}..
 包装原始buf为direct buf,成功则释放原始buf,如果保证成本较高,则返回原始buf
 */
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
    //获取buf可读字节数
    final int readableBytes = buf.readableBytes();
    if (readableBytes == 0) {
        //释放buf
        ReferenceCountUtil.safeRelease(buf);
        return Unpooled.EMPTY_BUFFER;
    }
    //获取字节buf分配器
    final ByteBufAllocator alloc = alloc();
    if (alloc.isDirectBufferPooled()) {
        //如果分配器为Direct池Buffer类型,则分配字节Direct类型字节buf
        ByteBuf directBuf = alloc.directBuffer(readableBytes);
	//将原始buf中的数据,写到新的Direct buf中
        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
	//释放原始buf
        ReferenceCountUtil.safeRelease(buf);
        return directBuf;
    }
    //否则,获取线程本地的direct buf
    final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
    if (directBuf != null) {
       //将原始buf中的数据,写到新的Direct buf中
        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
        ReferenceCountUtil.safeRelease(buf);
        return directBuf;
    }
    // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
   //如果分配和回收一个非池类的Direct buf代价比较高,则直接返回原始buf。
    return buf;
}


/**
 * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
 * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
 * this method.  Note that this method does not create an off-heap copy if the allocation / deallocation cost is
 * too high, but just returns the original {@link ByteBuf}..
 此方法与上面方法不同的是,释放的是buf的Holder,主要是保证原始buf能够释放
 */
protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
    final int readableBytes = buf.readableBytes();
    if (readableBytes == 0) {
        //释放的是buf的Holder
        ReferenceCountUtil.safeRelease(holder);
        return Unpooled.EMPTY_BUFFER;
    }

    final ByteBufAllocator alloc = alloc();
    if (alloc.isDirectBufferPooled()) {
        ByteBuf directBuf = alloc.directBuffer(readableBytes);
        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
	//释放的是buf的Holder
        ReferenceCountUtil.safeRelease(holder);
        return directBuf;
    }

    final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
    if (directBuf != null) {
        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
	//释放的是buf的Holder
        ReferenceCountUtil.safeRelease(holder);
        return directBuf;
    }

    // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
    if (holder != buf) {
        // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
        buf.retain();//buf引用计数器自增1
        ReferenceCountUtil.safeRelease(holder);
    }
    return buf;
}


我们来看这句:
//否则,获取线程本地的direct buf
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();


//ByteBufUtil
**
 * Returns a cached thread-local direct buffer, if available.
 *
 * @return a cached thread-local direct buffer, if available.  {@code null} otherwise.
 */
public static ByteBuf threadLocalDirectBuffer() {
    if (THREAD_LOCAL_BUFFER_SIZE <= 0) {
       //线程本地buffer size小于0,则直接返回
        return null;
    }
    if (PlatformDependent.hasUnsafe()) {
        return ThreadLocalUnsafeDirectByteBuf.newInstance();
    } else {
        return ThreadLocalDirectByteBuf.newInstance();
    }
}

//PlatformDependent
public final class PlatformDependent {
private static final boolean HAS_UNSAFE = hasUnsafe0();
 /**
     * Return {@code true} if {@code sun.misc.Unsafe} was found on the classpath and can be used for accelerated
     * direct memory access.
     */
    public static boolean hasUnsafe() {
        return HAS_UNSAFE;
    }
    private static boolean hasUnsafe0() {
        if (isAndroid()) {
            logger.debug("sun.misc.Unsafe: unavailable (Android)");
            return false;
        }
        if (PlatformDependent0.isExplicitNoUnsafe()) {
            return false;
        }
        try {
            boolean hasUnsafe = PlatformDependent0.hasUnsafe();
            logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
            return hasUnsafe;
        } catch (Throwable ignored) {
            // Probably failed to initialize PlatformDependent0.
            return false;
        }
    }
...
}

//PlatformDependent0
/**
 * The {@link PlatformDependent} operations which requires access to {@code sun.misc.*}.
 */
final class PlatformDependent0 {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PlatformDependent0.class);
    private static final long ADDRESS_FIELD_OFFSET;
    private static final long BYTE_ARRAY_BASE_OFFSET;
    private static final Constructor<?> DIRECT_BUFFER_CONSTRUCTOR;
    private static final boolean IS_EXPLICIT_NO_UNSAFE = explicitNoUnsafe0();
    private static final Method ALLOCATE_ARRAY_METHOD;
    private static final int JAVA_VERSION = javaVersion0();
    private static final boolean IS_ANDROID = isAndroid0();
    private static final Object INTERNAL_UNSAFE;
    static final Unsafe UNSAFE;
    static boolean hasUnsafe() {
        return UNSAFE != null;
    }
    ...
}

下面两个字节部分都是ByteBufUtil的内部类
//ThreadLocalDirectByteBuf
static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf {
    
        private static final Recycler<ThreadLocalDirectByteBuf> RECYCLER = new Recycler<ThreadLocalDirectByteBuf>() {
            @Override
            protected ThreadLocalDirectByteBuf newObject(Handle<ThreadLocalDirectByteBuf> handle) {
                return new ThreadLocalDirectByteBuf(handle);
            }
        };
    
        static ThreadLocalDirectByteBuf newInstance() {
            ThreadLocalDirectByteBuf buf = RECYCLER.get();
            buf.setRefCnt(1);
            return buf;
        }
	...
}


//ThreadLocalUnsafeDirectByteBuf
static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
	private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
		new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
		    @Override
		    protected ThreadLocalUnsafeDirectByteBuf newObject(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
			return new ThreadLocalUnsafeDirectByteBuf(handle);
		    }
		};

	static ThreadLocalUnsafeDirectByteBuf newInstance() {
	    ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();
	    buf.setRefCnt(1);
	    return buf;
	}
	...
}

总结:

抽象nio通道AbstractNioChannel内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)。抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。

注册doRegister工作主要是,注册可选择通道到通道所在事件循环的选择器中。反注册doDeregister,委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key。开始读操作doBeginRead,实际工作为将读操作事件,添加选择key的兴趣事件集

抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。



附:
/**
 * A collection of utility methods that is related with handling {@link ByteBuf},
 * such as the generation of hex dump and swapping an integer's byte order.
 */
public final class ByteBufUtil {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ByteBufUtil.class);
    private static final FastThreadLocal<CharBuffer> CHAR_BUFFERS = new FastThreadLocal<CharBuffer>() {
        @Override
        protected CharBuffer initialValue() throws Exception {
            return CharBuffer.allocate(1024);
        }
    };

    //ThreadLocalDirectByteBuf
    static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf {
    
        private static final Recycler<ThreadLocalDirectByteBuf> RECYCLER = new Recycler<ThreadLocalDirectByteBuf>() {
            @Override
            protected ThreadLocalDirectByteBuf newObject(Handle<ThreadLocalDirectByteBuf> handle) {
                return new ThreadLocalDirectByteBuf(handle);
            }
        };
    
        static ThreadLocalDirectByteBuf newInstance() {
            ThreadLocalDirectByteBuf buf = RECYCLER.get();
            buf.setRefCnt(1);
            return buf;
        }
    
        private final Handle<ThreadLocalDirectByteBuf> handle;
    
        private ThreadLocalDirectByteBuf(Handle<ThreadLocalDirectByteBuf> handle) {
            super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
            this.handle = handle;
        }
    
        @Override
        protected void deallocate() {
            if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
                super.deallocate();
            } else {
                clear();
                handle.recycle(this);
            }
        }
    }
    //ThreadLocalUnsafeDirectByteBuf
    static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {

        private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
                new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
                    @Override
                    protected ThreadLocalUnsafeDirectByteBuf newObject(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
                        return new ThreadLocalUnsafeDirectByteBuf(handle);
                    }
                };

        static ThreadLocalUnsafeDirectByteBuf newInstance() {
            ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();
            buf.setRefCnt(1);
            return buf;
        }

        private final Handle<ThreadLocalUnsafeDirectByteBuf> handle;

        private ThreadLocalUnsafeDirectByteBuf(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
            super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
            this.handle = handle;
        }

        @Override
        protected void deallocate() {
            if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
                super.deallocate();
            } else {
                clear();
                handle.recycle(this);
            }
        }
    }
}
0
0
分享到:
评论

相关推荐

    netty框架图及netty面试知识点解析

    Netty基于NIO(非阻塞I/O)模型,它提供了高度抽象的API,简化了网络编程。在Netty中,I/O操作被封装在Channel接口中,而ChannelHandler则用于处理I/O事件和数据。事件循环(EventLoop)是Netty处理事件的关键组件,...

    Socket 之 BIO、NIO、Netty 简单实现

    BIO适合简单的服务,NIO适合处理大量并发连接,而Netty则提供了更高级的抽象,适用于复杂、高性能的应用。通过阅读《Socket 之 BIO、NIO、Netty 简单实现》的博客,你可以了解如何在Java中实现这些通信模型,从而...

    Java-NIO-Netty框架学习

    Java NIO (Non-blocking Input/Output) 是Java平台中用于高效处理I/O操作的一种机制,它与传统的IO模型( Blocking I/O)相比,提供了更高级别的抽象,允许应用程序以非阻塞的方式读写数据,提高了并发性能。Netty是...

    netty源码解析视频.txt

    Netty是一个基于NIO(Non-blocking I/O)的Java开源框架,用于简化网络编程。它提供了一套完整的解决方案,包括线程模型、传输层、编解码器等,使开发者能够更加专注于业务逻辑而不是底层I/O处理。 #### 2. Netty的...

    深入浅出Netty_netty_

    对于协议解析,Netty提供了零拷贝机制,能有效减少内存复制,提高效率。 除此之外,Netty还提供了强大的心跳检测机制,可以防止因网络延迟或故障导致的连接僵死。其优雅的关闭机制也能确保在系统关闭时,所有正在...

    Netty-讲义.zip

    总的来说,这个 Netty 讲义提供了从基础到进阶,再到源码解析的全面教程,对于想要学习或深入理解 Netty 的开发者来说,是一份非常宝贵的资源。通过系统地学习这些材料,读者可以掌握 Netty 的核心原理,并具备构建...

    Netty权威指南第二版源代码

    《Netty权威指南第二版》源代码是一份珍贵的学习资源,由知名作者李林锋编写,专注于Java网络编程框架Netty的深入解析。这个压缩包包含的不仅是代码,更是理解和掌握Netty技术的关键。Netty是基于Java NIO(非阻塞I/...

    Netty权威指南 第二版

    - **通道(Channel)**:在Netty中,通道是连接到某个I/O资源的抽象表示,可以进行读写操作。 - **处理器链(Handler Chain)**:每个通道上都有一个处理器链,消息在处理器之间按顺序传递,进行处理和转发。 2. ...

    netty4.0源码,netty例子,netty api文档

    1. **异步事件驱动**:Netty基于非阻塞I/O模型,利用Java NIO库,实现了高效的网络通信。通过事件循环(EventLoop)和事件处理器(ChannelHandler),Netty能够处理大量并发连接,显著提高系统的吞吐量。 2. **...

    Netty权威指南 第2版 带书签目录 完整版

    - Channel是Netty中网络连接的抽象,负责处理I/O操作,如读写数据、连接和断开连接。 - Pipeline是数据处理链,每个Channel在接收和发送数据时,数据会经过一系列预定义的处理器(ChannelHandler),实现了灵活的...

    Netty使用初步

    通道是网络连接的抽象,可以代表TCP连接、UDP套接字或者本地进程间通信。事件处理器则是处理与通道相关的事件,如读取、写入、连接和关闭等。Netty通过责任链模式来组织这些处理器,形成了所谓的“处理器管道...

    Netty权威指南-Netty源码

    Netty 的核心组件包括:ByteBuf(字节缓冲区)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)以及Handler(处理器)。ByteBuf 提供了一种高效的方式来处理网络数据,避免了 Java 原生字节数组操作中...

    netty源码分析之服务端启动全解析

    Netty使用了NIO(New Input/Output)来支持非阻塞的网络通信,相比于传统的IO(阻塞IO),Netty可以大大提升应用程序的性能和可扩展性。在服务端启动的过程中,Netty通过实现Handler处理器机制和EventLoop事件循环...

    learning-netty

    根据给定的文件信息,以下是对Netty框架关键知识点的详细解析: ### Introduction Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器与客户端。它提供了一系列的API来简化网络...

    淘宝netty例子以及原理

    Netty中的关键组件包括Bootstrap(引导类)、Channel(通道)、EventLoop(事件循环)和Pipeline(处理链)。Bootstrap负责初始化和配置网络连接,Channel是网络I/O操作的抽象,EventLoop负责监听和处理事件,而...

    netty4-demos

    3. **Channel** 和 **Pipeline**:在 Netty 中,每个连接都对应一个 Channel,它是网络连接的抽象。Pipeline 是一系列 ChannelHandler 的链式结构,负责处理进来的数据和发送出去的数据。每个 ChannelHandler 负责...

    Netty权威指南 (第2版)

    《Netty权威指南(第2版)》是深入理解并掌握Netty框架的重要参考资料,它为读者提供了全面、深入的Netty技术解析。Netty是一个高性能、异步事件驱动的网络应用程序框架,广泛用于开发高效的网络应用,如服务器、...

    2024年Java常见的-BIO,NIO,AIO,Netty面试题

    ### 2024年Java常见BIO、NIO、AIO、Netty面试题解析 #### 一、基础知识概述 1. **IO概念**: - Java中的I/O(Input/Output)指的是输入输出操作,它以流为基础进行数据的输入输出。所有的数据在Java中都是以流的...

    netty服务端、客户端代码

    首先,Netty的核心设计理念是基于NIO(非阻塞I/O)和Reactor模式,这种模式可以有效地处理大量并发连接,减少了线程上下文切换带来的开销。Netty提供了丰富的频道处理类和管道模型,使得网络通信变得更加简单和高效...

    netty文件传输

    6. **NIO(非阻塞I/O)的角色**:Netty 使用 Java NIO 库,提供非阻塞I/O操作。NIO 的选择器(Selector)可以监控多个 Channel 的状态,当有数据可读或可写时,会唤醒并处理,从而提高了系统的并发能力。 7. **性能...

Global site tag (gtag.js) - Google Analytics