- 浏览: 981441 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
引言:
前一篇文章我们看了通道Outbound缓存区ChannelOutboundBuffer ,先来回顾一下:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。
添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。
添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。
移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件
从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。
将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
看完了抽象Unsafe定义和通道Outbound缓冲区,今天我们回到抽象通道,接着抽象通道初始化,继续看抽象通道的其他方法
先把抽象通道的变量声明贴出来及构造,以便理解:
上面抽象通道的变量和构造有单独的文章讲述,这里不再说:
来看其他方法
在上面我们看到好多方法时委托给通道的Channel管道,这是因为:
即通道实际上是一个ChannelOutboundInvoker,当通道相关操作发生时,触发通道相关操作事件。
从上面可以看出,通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
总结:
通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
引言:
前一篇文章我们看了通道Outbound缓存区ChannelOutboundBuffer ,先来回顾一下:
通道Outbound缓存区内部关联一个通道,同时有一个线程本地buf数组,一个未刷新的buf链表和一个刷新buf链表。通道写消息时,消息将会被包装成写请求Entry。
添加消息到通道Outbound缓冲区,首先包装消息为写请求Entry,将写请求Entry添加到未刷新写请求链表上,并更新通道当前待发送的字节数据,如果通道待发送的字节数大于通道写bufsize,则更新通道写状态,并触发ChannelWritabilityChanged事件。触发事件实际操作委托给通道的Channel管道。
添加刷新操作,即遍历未刷新写请求链表,将写请求添加到刷新链表中,如果写请求取消,则更新通道待发送字节数,如果待发送字节数消息,小于通道配置的写buf size,则更新通道可写状态。
移除操作,主要是从刷新写请求链移除链头写请求,并则释放写请求消息,更新写请求任务结果,当前通道待发送字节数和可写状态,并触发相应的事件
从刷新写请求链表,移除writtenBytes个字节数方法removeBytes,自旋,直至从刷新链中移除writtenBytes个字节数,如果链头消息的可读字节数小于writtenBytes,则移除写请求Entry,否则更新writtenBytes,继续从刷新链中的写请求消息中移除writtenBytes个字节数。
将刷新链上的写请求消息,添加到nio buffer数组中方法nioBuffers,主要是将刷新链上的写请求消息包装成direct buf添加到通道Outbound缓存区的nio buf数组中,这个方法主要在NioSocketChannel#doWrite方法重用。方法调用后,#nioBufferCount和#nioBufferSize,将返回当前nio buf数组的长度和可读字节数。
看完了抽象Unsafe定义和通道Outbound缓冲区,今天我们回到抽象通道,接着抽象通道初始化,继续看抽象通道的其他方法
先把抽象通道的变量声明贴出来及构造,以便理解:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class); private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道关闭异常 private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");//确保通道打开方法调用时,通道关闭异常 private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractUnsafe.class, "close(...)");//close方法调用时,通道关闭异常 private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( new ClosedChannelException(), AbstractUnsafe.class, "write(...)");//write方法调用时,通道关闭异常 private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace( new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道还未连接异常 private final Channel parent;//所属通道 private final ChannelId id;//通道id private final Unsafe unsafe;//硬件底层操作类 private final DefaultChannelPipeline pipeline;//Channel管道 private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);//空异步任务 private final CloseFuture closeFuture = new CloseFuture(this);//异步关闭任务 private volatile SocketAddress localAddress;//本地socket地址 private volatile SocketAddress remoteAddress;//远端socket地址 private volatile EventLoop eventLoop;//通道注册的事件循环 private volatile boolean registered;//是否注册 /** Cache for the string representation of this channel */ private boolean strValActive; private String strVal; ... /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent) { this.parent = parent; //创建通道id id = newId(); //创建底层操作类unsafe unsafe = newUnsafe(); //新建Channel管道 pipeline = newChannelPipeline(); } /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent, ChannelId id) { this.parent = parent; this.id = id; unsafe = newUnsafe(); pipeline = newChannelPipeline(); } }
上面抽象通道的变量和构造有单独的文章讲述,这里不再说:
来看其他方法
//判断通道是否可写 @Override public boolean isWritable() { //如果unsafe关联的通道Outbound 缓冲区不为空,且可写返回true ChannelOutboundBuffer buf = unsafe.outboundBuffer(); return buf != null && buf.isWritable(); } //直到通道可写前,通道Outbound 缓冲区的字节数。如果通道不可写,则返回0 @Override public long bytesBeforeUnwritable() { ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable. // We should be consistent with that here. return buf != null ? buf.bytesBeforeUnwritable() : 0; } //获取直到通道可写,通道底层buf有多少字节数据需要发送。如果可写返回0 @Override public long bytesBeforeWritable() { ChannelOutboundBuffer buf = unsafe.outboundBuffer(); // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable. // We should be consistent with that here. return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE; } //获取所属通道 @Override public Channel parent() { return parent; } //获取通道内部Channel管道 @Override public ChannelPipeline pipeline() { return pipeline; } //获取通道字节buf分配器 @Override public ByteBufAllocator alloc() { return config().getAllocator(); } //获取通道所在的事件循环 @Override public EventLoop eventLoop() { EventLoop eventLoop = this.eventLoop; if (eventLoop == null) { throw new IllegalStateException("channel not registered to an event loop"); } return eventLoop; } //获取通道本地地址 @Override public SocketAddress localAddress() { SocketAddress localAddress = this.localAddress; if (localAddress == null) { try { this.localAddress = localAddress = unsafe().localAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return localAddress; } /** * @deprecated no use-case for this. */ @Deprecated protected void invalidateLocalAddress() { localAddress = null; } //获取远端socket地址 @Override public SocketAddress remoteAddress() { SocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { try { this.remoteAddress = remoteAddress = unsafe().remoteAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return remoteAddress; } /** * @deprecated no use-case for this. */ @Deprecated protected void invalidateRemoteAddress() { remoteAddress = null; } //判断通道是否注册到事件循环 @Override public boolean isRegistered() { return registered; } //绑定本地socket地址 @Override public ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress); } //连接远端socket地址 @Override public ChannelFuture connect(SocketAddress remoteAddress) { return pipeline.connect(remoteAddress); } //上面两个方法的合体 @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return pipeline.connect(remoteAddress, localAddress); } //断开通道连接 @Override public ChannelFuture disconnect() { return pipeline.disconnect(); } //关闭通道 @Override public ChannelFuture close() { return pipeline.close(); } //从事件循环反注册 @Override public ChannelFuture deregister() { return pipeline.deregister(); } //刷新通道 @Override public Channel flush() { pipeline.flush(); return this; } 下面几个方法与上面不同是,带了一个异步任务结果,在操作完成时,通道任务结果 @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); } @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, localAddress, promise); } @Override public ChannelFuture disconnect(ChannelPromise promise) { return pipeline.disconnect(promise); } @Override public ChannelFuture close(ChannelPromise promise) { return pipeline.close(promise); } @Override public ChannelFuture deregister(ChannelPromise promise) { return pipeline.deregister(promise); } //通道读操作 @Override public Channel read() { pipeline.read(); return this; } //写消息 @Override public ChannelFuture write(Object msg) { return pipeline.write(msg); } @Override public ChannelFuture write(Object msg, ChannelPromise promise) { return pipeline.write(msg, promise); } //写并刷新消息 @Override public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { return pipeline.writeAndFlush(msg, promise); } //创建异步可写任务 @Override public ChannelPromise newPromise() { return pipeline.newPromise(); } //创建异步可写进度任务 @Override public ChannelProgressivePromise newProgressivePromise() { return pipeline.newProgressivePromise(); } //创建已经成功的任务 @Override public ChannelFuture newSucceededFuture() { return pipeline.newSucceededFuture(); } //创建已经失败的任务 @Override public ChannelFuture newFailedFuture(Throwable cause) { return pipeline.newFailedFuture(cause); } //获取关闭异步任务 @Override public ChannelFuture closeFuture() { return closeFuture; } //获取通道unsafe @Override public Unsafe unsafe() { return unsafe; } //创建空异步任务 @Override public final ChannelPromise voidPromise() { return pipeline.voidPromise(); }
在上面我们看到好多方法时委托给通道的Channel管道,这是因为:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
即通道实际上是一个ChannelOutboundInvoker,当通道相关操作发生时,触发通道相关操作事件。
从上面可以看出,通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
总结:
通道的绑定操作、连接,写消息,读操作,刷新操作,反注册、断开连接,关闭通道等操作事件实际调用通道的Channel管道的相关方法,即触发通道相关事件,这些方法是重写了通道OutboundInvoker的相关方法。在抽象Unsafe那篇文章中,我们看到其内部也有绑定、注册,读操作,写操作和关闭操作,这些是通道的实际操作方法。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2446netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1317netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1596netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1398netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2179netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1079netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1219netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 958netty 通道接口定义:http://donald-drap ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1857netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1878netty Inboudn/Outbound通道Invoker ...
相关推荐
在Netty中,通道(Channel)是连接的抽象,它代表了与远程实体的一个连接,可以进行读写操作。而通道处理器链(ChannelPipeline)则允许用户自定义数据在网络中传输的处理流程,这提供了高度的灵活性和可扩展性。 ...
Pipeline(管道)则是一系列处理通道事件的处理器链,每个处理器可以对数据进行解码、编码或者执行业务逻辑。 3. **ByteBuf**:Netty提供了自己的ByteBuf类,作为缓冲区,它比Java的ByteBuffer更易用且高效,支持...
2. **Netty架构**:Netty采用了反应器模式,包含Bootstrap(引导类)、ServerBootstrap(服务器引导类)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)等组件,构建了高效的事件驱动模型。...
在Netty中,EventLoop(事件循环)负责处理I/O事件,而Channel(通道)则代表网络连接,它们是进行数据传输的基础。ChannelHandler(通道处理器)是业务逻辑的载体,通过ChannelPipeline(通道管道)组织成处理链,...
Netty 是由 JBoss 组织开源的一个网络通信框架,基于 Java NIO(非阻塞I/O)构建,提供了一套高度抽象和优化的 API,使得开发者可以轻松地处理网络连接的各种复杂情况。Netty 提供了多种传输类型,如 TCP、UDP 以及...
Channel是网络连接的抽象,它可以是TCP、UDP或者任何其他类型的I/O连接。Pipeline则是一个处理链,每个连接都有一个自己的Pipeline,其中包含多个处理器(ChannelHandler),这些处理器按顺序处理进出的数据。 ...
《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...
2. **Channel**:在Netty中,Channel是连接的抽象,它代表了到另一个实体(例如,另一个网络节点)的连接。每个Channel都有自己的生命周期,可以进行读写操作,并且可以注册监听器以响应各种网络事件。 3. **...
Netty基础,用于学习Netty,参考黑马程序员的netty教程
- Netty提供了许多性能优化选项,如自定义ByteBuf分配器、心跳机制、通道空闲检测等。 10. **错误处理与异常安全**: - Netty提供了一套完善的异常处理机制,确保即使在出错时也能优雅地关闭资源。 综上所述,...
通道是Netty中的基本I/O抽象,它可以代表任何类型的连接,如TCP、UDP或者本地进程间通信。事件处理器则负责处理各种I/O事件,如连接建立、数据接收和发送等,通过非阻塞I/O(NIO)实现高效率的数据传输。 在"Netty-...
Netty的核心组件包括Bootstrap(启动器)、ServerBootstrap(服务器启动器)、Channel(通道)、EventLoop(事件循环)和ChannelHandler(处理程序)。通过这些组件,开发者可以方便地创建出可伸缩、低延迟的网络...
- **Channel与Pipeline**:Channel是网络连接的抽象,负责读写数据;Pipeline则是一系列处理I/O事件的处理器链,每个处理器执行特定的处理任务。 - **EventLoop与EventLoopGroup**:EventLoop是执行I/O操作和处理...
1. **异步事件驱动**:Netty基于Java NIO(非阻塞I/O)实现,利用了其事件循环(EventLoop)和通道(Channel)的概念,可以处理大量并发连接,提高了系统的吞吐量。 2. **高效性**:Netty通过减少对象创建和内存...
Netty基于NIO(非阻塞I/O)模型,它提供了高度抽象的API,简化了网络编程。在Netty中,I/O操作被封装在Channel接口中,而ChannelHandler则用于处理I/O事件和数据。事件循环(EventLoop)是Netty处理事件的关键组件,...
《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...
Netty 的核心组件包括:ByteBuf(字节缓冲区)、Channel(通道)、EventLoop(事件循环)、Pipeline(处理链)以及Handler(处理器)。ByteBuf 提供了一种高效的方式来处理网络数据,避免了 Java 原生字节数组操作中...
它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。 Netty is a NIO ...
ChannelHandlerAdapter 4.X版本和5.X版本的差别很大。ChannelRead是属于5.X版本的4.X版本没有这个方法,所以如果要用ChannelRead。可以更换5.X版本的Netty。