- 浏览: 987655 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
引言:
上一篇文章我们看了抽象通道触发OutboundInvoker相关事件方法,先来回顾一下:
通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
今天我们来看抽象nio通道AbstractNioChannel
从上面可以看出,抽象nio通道内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)
来看构造:
从上面可以看出抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。
下面几个方法,很简单,看一下就行
来看抽象通道几个实际操作方法do*,
先从注册方法开始
从上面来看注册工作主要是,注册可选择通道到通道所在事件循环的选择器中。
//再来看反注册
//读操作
从上面可以看出,开始读操作,实际工作为将读操作事件,添加选择key的兴趣事件集
再来看实际关闭任务操作:
来看其他方法:
从下面一个方法:
可以看出抽象nio通道的内部Unsafe为NioUnsafe
来看NioUnsafe的抽象实现AbstractNioUnsafe,为抽象nio通道的内部类,
从上面可以看出,抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。
选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。
移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。
来看抽象NioUnsafe的其他方法
连接方法,我们需要关注为以下片段:
//AbstractNioChannel
从上面可以看出,连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功
,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。
再来看完成连接方法
//AbstractNioChannel
从上面可以看出,完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。
再来看刷新操作
回到抽象nio通道,再来看抽象nio通道的其他方法:
我们来看这句:
//ByteBufUtil
//PlatformDependent
//PlatformDependent0
下面两个字节部分都是ByteBufUtil的内部类
//ThreadLocalDirectByteBuf
//ThreadLocalUnsafeDirectByteBuf
总结:
抽象nio通道AbstractNioChannel内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)。抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。
注册doRegister工作主要是,注册可选择通道到通道所在事件循环的选择器中。反注册doDeregister,委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key。开始读操作doBeginRead,实际工作为将读操作事件,添加选择key的兴趣事件集
抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。
附:
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
引言:
上一篇文章我们看了抽象通道触发OutboundInvoker相关事件方法,先来回顾一下:
通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
今天我们来看抽象nio通道AbstractNioChannel
package io.netty.channel.nio; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * Abstract base class for {@link Channel} implementations which use a Selector based approach. */ public abstract class AbstractNioChannel extends AbstractChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioChannel.class); //关闭通道异常 private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractNioChannel.class, "doClose()"); private final SelectableChannel ch;//关联通道 protected final int readInterestOp;//读操作事件 volatile SelectionKey selectionKey;//关联选择key boolean readPending;//是否开始读操作 private final Runnable clearReadPendingRunnable = new Runnable() { @Override public void run() { clearReadPending0(); } }; /** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */ private ChannelPromise connectPromise;//异步可写的连接任务 private ScheduledFuture<?> connectTimeoutFuture; private SocketAddress requestedRemoteAddress;//远端socket地址 }
从上面可以看出,抽象nio通道内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)
来看构造:
/** * Create a new instance * * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} 所属通道 * @param ch the underlying {@link SelectableChannel} on which it operates 底层选择通道 * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} 读操作事件 */ protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { //初始化通道,并配置为非阻塞模式 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
从上面可以看出抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。
下面几个方法,很简单,看一下就行
//通道是否打开 @Override public boolean isOpen() { return ch.isOpen(); } //获取底层选择通道 protected SelectableChannel javaChannel() { return ch; } //获取通道所在的事件循环 @Override public NioEventLoop eventLoop() { return (NioEventLoop) super.eventLoop(); }
来看抽象通道几个实际操作方法do*,
先从注册方法开始
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //注册可选择通道到通道所在事件循环的选择器中 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. //如果注册异常,并且没有选择,则执行选择操作,将选择key从选择器的取消key集合中移除 eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
从上面来看注册工作主要是,注册可选择通道到通道所在事件循环的选择器中。
//再来看反注册
@Override protected void doDeregister() throws Exception { //委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key eventLoop().cancel(selectionKey()); }
//读操作
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { //选择key无效,直接返回 return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { //将读操作事件,添加选择key的兴趣事件集 selectionKey.interestOps(interestOps | readInterestOp); } }
从上面可以看出,开始读操作,实际工作为将读操作事件,添加选择key的兴趣事件集
再来看实际关闭任务操作:
@Override protected void doClose() throws Exception { ChannelPromise promise = connectPromise; if (promise != null) { // Use tryFailure() instead of setFailure() to avoid the race against cancel(). promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); connectPromise = null; } //获取连接超时任务 ScheduledFuture<?> future = connectTimeoutFuture; if (future != null) { //取消任务 future.cancel(false); connectTimeoutFuture = null; } }
来看其他方法:
//获取通道Unsafe @Override public NioUnsafe unsafe() { return (NioUnsafe) super.unsafe(); } /** * Return the current {@link SelectionKey} 获取通道选择key */ protected SelectionKey selectionKey() { assert selectionKey != null; return selectionKey; } /** * @deprecated No longer supported. * No longer supported. 是否正在进行读操作 */ @Deprecated protected boolean isReadPending() { return readPending; } /** * @deprecated Use {@link #clearReadPending()} if appropriate instead. * No longer supported. 设置正在读标志 */ @Deprecated protected void setReadPending(final boolean readPending) { if (isRegistered()) { EventLoop eventLoop = eventLoop(); if (eventLoop.inEventLoop()) { //在当前事件循环,完成实际设置读标志 setReadPending0(readPending); } else { eventLoop.execute(new Runnable() { @Override public void run() { setReadPending0(readPending); } }); } } else { // Best effort if we are not registered yet clear readPending. // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is // not set yet so it would produce an assertion failure. this.readPending = readPending; } } //完成实际设置读标志 private void setReadPending0(boolean readPending) { this.readPending = readPending; if (!readPending) { //如果为不在读数据,委托给Unsafe,从选择key兴趣集中移除读操作事件 ((AbstractNioUnsafe) unsafe()).removeReadOp(); } } /** * Set read pending to {@code false}. 清除读操作标志,即设置标志为false */ protected final void clearReadPending() { if (isRegistered()) { EventLoop eventLoop = eventLoop(); if (eventLoop.inEventLoop()) { //在当前事件循环,完成实际清除 clearReadPending0(); } else { eventLoop.execute(clearReadPendingRunnable); } } else { // Best effort if we are not registered yet clear readPending. This happens during channel initialization. // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is // not set yet so it would produce an assertion failure. readPending = false; } } //完成实际清除 private void clearReadPending0() { readPending = false; //委托给Unsafe,从选择key兴趣集中移除读操作事件 ((AbstractNioUnsafe) unsafe()).removeReadOp(); }
从下面一个方法:
//获取通道Unsafe @Override public NioUnsafe unsafe() { return (NioUnsafe) super.unsafe(); }
可以看出抽象nio通道的内部Unsafe为NioUnsafe
/** * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel} 特殊的Unsafe,允许访问底层的选择通道 */ public interface NioUnsafe extends Unsafe { /** * Return underlying {@link SelectableChannel} 返回底层选择通道 */ SelectableChannel ch(); /** * Finish connect 完成连接 */ void finishConnect(); /** * Read from underlying {@link SelectableChannel} 从底层选择操作,读取数据 */ void read(); //强制刷新 void forceFlush(); }
来看NioUnsafe的抽象实现AbstractNioUnsafe,为抽象nio通道的内部类,
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { //移除读兴趣事件 protected final void removeReadOp() { //获取选择key SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { //选择key无效,直接返回 return; } int interestOps = key.interestOps(); if ((interestOps & readInterestOp) != 0) { // only remove readInterestOp if needed //从选择key兴趣事件集中,移除读操作事件 key.interestOps(interestOps & ~readInterestOp); } } //获取选择通道,实际返回的抽象nio通道内部的底层可选择通道 @Override public final SelectableChannel ch() { return javaChannel(); } }
从上面可以看出,抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。
选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。
移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。
来看抽象NioUnsafe的其他方法
//连接远端Socket地址,如果需要绑定本地socket地址,连接完成通知异步可写任务promise @Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { //确保任务没有取消,通道打开 return; } try { if (connectPromise != null) { //已连接 // Already a connect in process. throw new ConnectionPendingException(); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) { //如果连接成功,则更新任务结果,如果需要,则触发通道的激活事件fireChannelActive fulfillConnectPromise(promise, wasActive); } else { //连接失败,则添加异步连接任务 connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout.连接超时配置 int connectTimeoutMillis = config().getConnectTimeoutMillis(); //更新连接异步任务结果为连接超时,并将任务交个事件循环区调度 if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } //添加任务监听器 promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; //连接任务取消,则关闭任务 close(voidPromise()); } } }); } } catch (Throwable t) { //异常,则更新任务失败,如果需要则关闭通道 promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
连接方法,我们需要关注为以下片段:
if (doConnect(remoteAddress, localAddress)) { //如果连接成功,则更新任务结果,如果需要,则触发通道的激活事件fireChannelActive fulfillConnectPromise(promise, wasActive); }
//AbstractNioChannel
/** * Connect to the remote peer,待子类实现 */ protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
//更新任务结果,触发通道的激活事件fireChannelActive private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { // Closed via cancellation and the promise has been notified already. 通道已经关闭,直接返回 return; } // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. // We still need to ensure we call fireChannelActive() in this case. boolean active = isActive(); // trySuccess() will return false if a user cancelled the connection attempt. //更新连接任务成功完成 boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && active) { //触发通道的激活事件fireChannelActive pipeline().fireChannelActive(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { //关闭任务 close(voidPromise()); } }
从上面可以看出,连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功
,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。
再来看完成连接方法
@Override public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); //实际完成连接 doFinishConnect(); //更新任务结果,触发通道的激活事件fireChannelActive fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { //这个包装异常的方法annotateConnectException,在上一篇已看过,即将远端socket地址,添加的异常信息中 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
//AbstractNioChannel
/** * Finish the connect,待子类实现 */ protected abstract void doFinishConnect() throws Exception;
//更新连接任务为异常失败 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // Use tryFailure() instead of setFailure() to avoid the race against cancel(). promise.tryFailure(cause); closeIfClosed(); }
从上面可以看出,完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。
再来看刷新操作
@Override protected final void flush0() { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. if (isFlushPending()) { return; } //委托给父类 super.flush0(); }
@Override public final void forceFlush() { // directly call super.flush0() to force a flush now //委托给父类 super.flush0(); }
private boolean isFlushPending() { SelectionKey selectionKey = selectionKey(); //写操作事件,存在选择key兴趣事件集中 return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; }
回到抽象nio通道,再来看抽象nio通道的其他方法:
//判断事件循环为通道兼容,即判断事件循环是否为Nio事件循环 @Override protected boolean isCompatible(EventLoop loop) { return loop instanceof NioEventLoop; }
/** * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one. * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high, * but just returns the original {@link ByteBuf}.. 包装原始buf为direct buf,成功则释放原始buf,如果保证成本较高,则返回原始buf */ protected final ByteBuf newDirectBuffer(ByteBuf buf) { //获取buf可读字节数 final int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //释放buf ReferenceCountUtil.safeRelease(buf); return Unpooled.EMPTY_BUFFER; } //获取字节buf分配器 final ByteBufAllocator alloc = alloc(); if (alloc.isDirectBufferPooled()) { //如果分配器为Direct池Buffer类型,则分配字节Direct类型字节buf ByteBuf directBuf = alloc.directBuffer(readableBytes); //将原始buf中的数据,写到新的Direct buf中 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); //释放原始buf ReferenceCountUtil.safeRelease(buf); return directBuf; } //否则,获取线程本地的direct buf final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); if (directBuf != null) { //将原始buf中的数据,写到新的Direct buf中 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); ReferenceCountUtil.safeRelease(buf); return directBuf; } // Allocating and deallocating an unpooled direct buffer is very expensive; give up. //如果分配和回收一个非池类的Direct buf代价比较高,则直接返回原始buf。 return buf; } /** * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder. * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by * this method. Note that this method does not create an off-heap copy if the allocation / deallocation cost is * too high, but just returns the original {@link ByteBuf}.. 此方法与上面方法不同的是,释放的是buf的Holder,主要是保证原始buf能够释放 */ protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) { final int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //释放的是buf的Holder ReferenceCountUtil.safeRelease(holder); return Unpooled.EMPTY_BUFFER; } final ByteBufAllocator alloc = alloc(); if (alloc.isDirectBufferPooled()) { ByteBuf directBuf = alloc.directBuffer(readableBytes); directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); //释放的是buf的Holder ReferenceCountUtil.safeRelease(holder); return directBuf; } final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); if (directBuf != null) { directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); //释放的是buf的Holder ReferenceCountUtil.safeRelease(holder); return directBuf; } // Allocating and deallocating an unpooled direct buffer is very expensive; give up. if (holder != buf) { // Ensure to call holder.release() to give the holder a chance to release other resources than its content. buf.retain();//buf引用计数器自增1 ReferenceCountUtil.safeRelease(holder); } return buf; }
我们来看这句:
//否则,获取线程本地的direct buf final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
//ByteBufUtil
** * Returns a cached thread-local direct buffer, if available. * * @return a cached thread-local direct buffer, if available. {@code null} otherwise. */ public static ByteBuf threadLocalDirectBuffer() { if (THREAD_LOCAL_BUFFER_SIZE <= 0) { //线程本地buffer size小于0,则直接返回 return null; } if (PlatformDependent.hasUnsafe()) { return ThreadLocalUnsafeDirectByteBuf.newInstance(); } else { return ThreadLocalDirectByteBuf.newInstance(); } }
//PlatformDependent
public final class PlatformDependent { private static final boolean HAS_UNSAFE = hasUnsafe0(); /** * Return {@code true} if {@code sun.misc.Unsafe} was found on the classpath and can be used for accelerated * direct memory access. */ public static boolean hasUnsafe() { return HAS_UNSAFE; } private static boolean hasUnsafe0() { if (isAndroid()) { logger.debug("sun.misc.Unsafe: unavailable (Android)"); return false; } if (PlatformDependent0.isExplicitNoUnsafe()) { return false; } try { boolean hasUnsafe = PlatformDependent0.hasUnsafe(); logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable"); return hasUnsafe; } catch (Throwable ignored) { // Probably failed to initialize PlatformDependent0. return false; } } ... }
//PlatformDependent0
/** * The {@link PlatformDependent} operations which requires access to {@code sun.misc.*}. */ final class PlatformDependent0 { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PlatformDependent0.class); private static final long ADDRESS_FIELD_OFFSET; private static final long BYTE_ARRAY_BASE_OFFSET; private static final Constructor<?> DIRECT_BUFFER_CONSTRUCTOR; private static final boolean IS_EXPLICIT_NO_UNSAFE = explicitNoUnsafe0(); private static final Method ALLOCATE_ARRAY_METHOD; private static final int JAVA_VERSION = javaVersion0(); private static final boolean IS_ANDROID = isAndroid0(); private static final Object INTERNAL_UNSAFE; static final Unsafe UNSAFE; static boolean hasUnsafe() { return UNSAFE != null; } ... }
下面两个字节部分都是ByteBufUtil的内部类
//ThreadLocalDirectByteBuf
static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf { private static final Recycler<ThreadLocalDirectByteBuf> RECYCLER = new Recycler<ThreadLocalDirectByteBuf>() { @Override protected ThreadLocalDirectByteBuf newObject(Handle<ThreadLocalDirectByteBuf> handle) { return new ThreadLocalDirectByteBuf(handle); } }; static ThreadLocalDirectByteBuf newInstance() { ThreadLocalDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); return buf; } ... }
//ThreadLocalUnsafeDirectByteBuf
static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER = new Recycler<ThreadLocalUnsafeDirectByteBuf>() { @Override protected ThreadLocalUnsafeDirectByteBuf newObject(Handle<ThreadLocalUnsafeDirectByteBuf> handle) { return new ThreadLocalUnsafeDirectByteBuf(handle); } }; static ThreadLocalUnsafeDirectByteBuf newInstance() { ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); return buf; } ... }
总结:
抽象nio通道AbstractNioChannel内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)。抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。
注册doRegister工作主要是,注册可选择通道到通道所在事件循环的选择器中。反注册doDeregister,委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key。开始读操作doBeginRead,实际工作为将读操作事件,添加选择key的兴趣事件集
抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。
附:
/** * A collection of utility methods that is related with handling {@link ByteBuf}, * such as the generation of hex dump and swapping an integer's byte order. */ public final class ByteBufUtil { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ByteBufUtil.class); private static final FastThreadLocal<CharBuffer> CHAR_BUFFERS = new FastThreadLocal<CharBuffer>() { @Override protected CharBuffer initialValue() throws Exception { return CharBuffer.allocate(1024); } }; //ThreadLocalDirectByteBuf static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf { private static final Recycler<ThreadLocalDirectByteBuf> RECYCLER = new Recycler<ThreadLocalDirectByteBuf>() { @Override protected ThreadLocalDirectByteBuf newObject(Handle<ThreadLocalDirectByteBuf> handle) { return new ThreadLocalDirectByteBuf(handle); } }; static ThreadLocalDirectByteBuf newInstance() { ThreadLocalDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); return buf; } private final Handle<ThreadLocalDirectByteBuf> handle; private ThreadLocalDirectByteBuf(Handle<ThreadLocalDirectByteBuf> handle) { super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); this.handle = handle; } @Override protected void deallocate() { if (capacity() > THREAD_LOCAL_BUFFER_SIZE) { super.deallocate(); } else { clear(); handle.recycle(this); } } } //ThreadLocalUnsafeDirectByteBuf static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER = new Recycler<ThreadLocalUnsafeDirectByteBuf>() { @Override protected ThreadLocalUnsafeDirectByteBuf newObject(Handle<ThreadLocalUnsafeDirectByteBuf> handle) { return new ThreadLocalUnsafeDirectByteBuf(handle); } }; static ThreadLocalUnsafeDirectByteBuf newInstance() { ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); return buf; } private final Handle<ThreadLocalUnsafeDirectByteBuf> handle; private ThreadLocalUnsafeDirectByteBuf(Handle<ThreadLocalUnsafeDirectByteBuf> handle) { super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); this.handle = handle; } @Override protected void deallocate() { if (capacity() > THREAD_LOCAL_BUFFER_SIZE) { super.deallocate(); } else { clear(); handle.recycle(this); } } } }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1329netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2072netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2462netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1324netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1326netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1604netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1852netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1407netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2848netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2193netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2046netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1090netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1885netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1225netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1212netty 通道接口定义:http:/ ... -
netty 抽象通道后续
2017-09-13 22:40 1318netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2197netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1092netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1867netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1890netty Inboudn/Outbound通道Invoker ...
相关推荐
Netty基于NIO(非阻塞I/O)模型,它提供了高度抽象的API,简化了网络编程。在Netty中,I/O操作被封装在Channel接口中,而ChannelHandler则用于处理I/O事件和数据。事件循环(EventLoop)是Netty处理事件的关键组件,...
BIO适合简单的服务,NIO适合处理大量并发连接,而Netty则提供了更高级的抽象,适用于复杂、高性能的应用。通过阅读《Socket 之 BIO、NIO、Netty 简单实现》的博客,你可以了解如何在Java中实现这些通信模型,从而...
Java NIO (Non-blocking Input/Output) 是Java平台中用于高效处理I/O操作的一种机制,它与传统的IO模型( Blocking I/O)相比,提供了更高级别的抽象,允许应用程序以非阻塞的方式读写数据,提高了并发性能。Netty是...
Netty是一个基于NIO(Non-blocking I/O)的Java开源框架,用于简化网络编程。它提供了一套完整的解决方案,包括线程模型、传输层、编解码器等,使开发者能够更加专注于业务逻辑而不是底层I/O处理。 #### 2. Netty的...
对于协议解析,Netty提供了零拷贝机制,能有效减少内存复制,提高效率。 除此之外,Netty还提供了强大的心跳检测机制,可以防止因网络延迟或故障导致的连接僵死。其优雅的关闭机制也能确保在系统关闭时,所有正在...
总的来说,这个 Netty 讲义提供了从基础到进阶,再到源码解析的全面教程,对于想要学习或深入理解 Netty 的开发者来说,是一份非常宝贵的资源。通过系统地学习这些材料,读者可以掌握 Netty 的核心原理,并具备构建...
《Netty权威指南第二版》源代码是一份珍贵的学习资源,由知名作者李林锋编写,专注于Java网络编程框架Netty的深入解析。这个压缩包包含的不仅是代码,更是理解和掌握Netty技术的关键。Netty是基于Java NIO(非阻塞I/...
- **通道(Channel)**:在Netty中,通道是连接到某个I/O资源的抽象表示,可以进行读写操作。 - **处理器链(Handler Chain)**:每个通道上都有一个处理器链,消息在处理器之间按顺序传递,进行处理和转发。 2. ...
1. **异步事件驱动**:Netty基于非阻塞I/O模型,利用Java NIO库,实现了高效的网络通信。通过事件循环(EventLoop)和事件处理器(ChannelHandler),Netty能够处理大量并发连接,显著提高系统的吞吐量。 2. **...
- Channel是Netty中网络连接的抽象,负责处理I/O操作,如读写数据、连接和断开连接。 - Pipeline是数据处理链,每个Channel在接收和发送数据时,数据会经过一系列预定义的处理器(ChannelHandler),实现了灵活的...
通道是网络连接的抽象,可以代表TCP连接、UDP套接字或者本地进程间通信。事件处理器则是处理与通道相关的事件,如读取、写入、连接和关闭等。Netty通过责任链模式来组织这些处理器,形成了所谓的“处理器管道...
Netty 的核心组件包括:ByteBuf(字节缓冲区)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)以及Handler(处理器)。ByteBuf 提供了一种高效的方式来处理网络数据,避免了 Java 原生字节数组操作中...
Netty使用了NIO(New Input/Output)来支持非阻塞的网络通信,相比于传统的IO(阻塞IO),Netty可以大大提升应用程序的性能和可扩展性。在服务端启动的过程中,Netty通过实现Handler处理器机制和EventLoop事件循环...
根据给定的文件信息,以下是对Netty框架关键知识点的详细解析: ### Introduction Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器与客户端。它提供了一系列的API来简化网络...
Netty中的关键组件包括Bootstrap(引导类)、Channel(通道)、EventLoop(事件循环)和Pipeline(处理链)。Bootstrap负责初始化和配置网络连接,Channel是网络I/O操作的抽象,EventLoop负责监听和处理事件,而...
3. **Channel** 和 **Pipeline**:在 Netty 中,每个连接都对应一个 Channel,它是网络连接的抽象。Pipeline 是一系列 ChannelHandler 的链式结构,负责处理进来的数据和发送出去的数据。每个 ChannelHandler 负责...
《Netty权威指南(第2版)》是深入理解并掌握Netty框架的重要参考资料,它为读者提供了全面、深入的Netty技术解析。Netty是一个高性能、异步事件驱动的网络应用程序框架,广泛用于开发高效的网络应用,如服务器、...
### 2024年Java常见BIO、NIO、AIO、Netty面试题解析 #### 一、基础知识概述 1. **IO概念**: - Java中的I/O(Input/Output)指的是输入输出操作,它以流为基础进行数据的输入输出。所有的数据在Java中都是以流的...
首先,Netty的核心设计理念是基于NIO(非阻塞I/O)和Reactor模式,这种模式可以有效地处理大量并发连接,减少了线程上下文切换带来的开销。Netty提供了丰富的频道处理类和管道模型,使得网络通信变得更加简单和高效...
6. **NIO(非阻塞I/O)的角色**:Netty 使用 Java NIO 库,提供非阻塞I/O操作。NIO 的选择器(Selector)可以监控多个 Channel 的状态,当有数据可读或可写时,会唤醒并处理,从而提高了系统的并发能力。 7. **性能...