- 浏览: 978838 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 抽象BootStrap定义:http://donald-draper.iteye.com/blog/2392492
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
netty Bootstrap解析:http://donald-draper.iteye.com/blog/2392593
在前面看服务端和客户端引导配置的时候,我们配置完Nio事件循环组之后配置的是Nio通道,
在服务端和客户端的相关代码如下:
服务端:
客户端:
从代码片段中可以,看出服务端的通道为NioServerSocketChannel,而客户端的为
NioSocketChannel我们先来看一下两种通道的声明定义:
先来看NioServerSocketChannel的声明定义:
//NioServerSocketChannel
再来看另外一个分支ServerSocketChannel的声明定义:
再来看NioSocketChannel的声明定义:
//NioSocketChannel
从上面可以看出Nio服务端socket通道和socket通道都是一种通道,不同的时服务端socket通道是,基于消息的通道,而socket通道是基于字节的通道,不同的是Server Socket通道用于接受socket连接请求,而socket通道用于客户端,连接Server通道,或者Server通道接收客户端的请求,产生与客户端交互的Socket通道。
为了理解NioSocketChannel和NioServerSocketChannel的作用,我们先从通道接口定义来看起:
//Channel
从上面可以看出通道,关联一个事件循环,及通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器;一个字节buf分配器ByteBufAllocator,用于分配字节buf,还有一些获取通道状态的方式,是否注册到事件循环,通道是否打开,是否可写;另外还要获取通道配置ChannelConfig,通道元数据ChannelMetadata的方法;最重要的是,关联一个Unsafe,用于通道的地址绑定,连接操作以及断开,通道的读写,注册到事件循环以及反注册。
再来看ServerSocketChanel接口定义:
//ServerSocketChannel
//ServerChannel
从上面来看ServerSocketChannel没有什么特殊的,只是标注是一个可以接受客户端连接的通道。
再来看SocketChanel接口定义:
//SocketChannel
//DuplexChannel
从上面看出Socket通道实际上是一个可写可读的流通道。
总结:
通道Channel,关联一个事件循环,及通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器;一个字节buf分配器ByteBufAllocator,用于分配字节buf,还有一些获取通道状态的方式,是否注册到事件循环,通道是否打开,是否可写;另外还要获取通道配置ChannelConfig,通道元数据ChannelMetadata的方法;最重要的是,关联一个Unsafe,用于通道的地址绑定,连接操作以及断开,通道的读写,注册到事件循环以及反注册。
附:
//ChannelId
//ChannelConfig
//ChannelMetadata
//ChannelOutboundBuffer
netty 抽象BootStrap定义:http://donald-draper.iteye.com/blog/2392492
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
netty Bootstrap解析:http://donald-draper.iteye.com/blog/2392593
在前面看服务端和客户端引导配置的时候,我们配置完Nio事件循环组之后配置的是Nio通道,
在服务端和客户端的相关代码如下:
服务端:
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //ServerBootstrap,用于配置服务端,一般为ServerSocket通道 ServerBootstrap serverBoot = new ServerBootstrap(); serverBoot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)
客户端:
EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Bootstrap,用于配置客户端,一般为Socket通道 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class)
从代码片段中可以,看出服务端的通道为NioServerSocketChannel,而客户端的为
NioSocketChannel我们先来看一下两种通道的声明定义:
先来看NioServerSocketChannel的声明定义:
//NioServerSocketChannel
/** * A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses * NIO selector based implementation to accept new connections. 基于nio选择器的服务端socket通道的实现,用于接受新的连接 */ public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
/** * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages. */ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
/** * Abstract base class for {@link Channel} implementations which use a Selector based approach. */ public abstract class AbstractNioChannel extends AbstractChannel {
/** * A skeletal {@link Channel} implementation. */ public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
再来看另外一个分支ServerSocketChannel的声明定义:
/** * A TCP/IP {@link ServerChannel} which accepts incoming TCP/IP connections. */ public interface ServerSocketChannel extends ServerChannel { /** * A {@link Channel} that accepts an incoming connection attempt and creates * its child {@link Channel}s by accepting them. {@link ServerSocketChannel} is * a good example. */ public interface ServerChannel extends Channel {
再来看NioSocketChannel的声明定义:
//NioSocketChannel
/** * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. 基于nio选择器的socket通道 */ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
/** * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes. 抽象nio字节通道 */ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
再来看另外一个分支SocketChannel: /** * A TCP/IP socket {@link Channel}. */ public interface SocketChannel extends DuplexChannel {
/** * A duplex {@link Channel} that has two sides that can be shutdown independently. */ public interface DuplexChannel extends Channel {
从上面可以看出Nio服务端socket通道和socket通道都是一种通道,不同的时服务端socket通道是,基于消息的通道,而socket通道是基于字节的通道,不同的是Server Socket通道用于接受socket连接请求,而socket通道用于客户端,连接Server通道,或者Server通道接收客户端的请求,产生与客户端交互的Socket通道。
为了理解NioSocketChannel和NioServerSocketChannel的作用,我们先从通道接口定义来看起:
//Channel
package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.util.AttributeMap; import java.net.InetSocketAddress; import java.net.SocketAddress; /** * A nexus to a network socket or a component which is capable of I/O * operations such as read, write, connect, and bind. * * A channel provides a user: * [list] * [*]the current state of the channel (e.g. is it open? is it connected?), * [*]the {@linkplain ChannelConfig configuration parameters} of the channel (e.g. receive buffer size), * [*]the I/O operations that the channel supports (e.g. read, write, connect, and bind), and * <li>the {@link ChannelPipeline} which handles all I/O events and requests * associated with the channel.</li> * [/list] * * <h3>All I/O operations are asynchronous.</h3> * <p> * All I/O operations in Netty are asynchronous. It means any I/O calls will * return immediately with no guarantee that the requested I/O operation has * been completed at the end of the call. Instead, you will be returned with * a {@link ChannelFuture} instance which will notify you when the requested I/O * operation has succeeded, failed, or canceled. * * <h3>Channels are hierarchical</h3> * <p> * A {@link Channel} can have a {@linkplain #parent() parent} depending on * how it was created. For instance, a {@link SocketChannel}, that was accepted * by {@link ServerSocketChannel}, will return the {@link ServerSocketChannel} * as its parent on {@link #parent()}. * <p> * The semantics of the hierarchical structure depends on the transport * implementation where the {@link Channel} belongs to. For example, you could * write a new {@link Channel} implementation that creates the sub-channels that * share one socket connection, as [url=http://beepcore.org/]BEEP[/url] and * [url=http://en.wikipedia.org/wiki/Secure_Shell]SSH[/url] do. * * <h3>Downcast to access transport-specific operations</h3> * <p> * Some transports exposes additional operations that is specific to the * transport. Down-cast the {@link Channel} to sub-type to invoke such * operations. For example, with the old I/O datagram transport, multicast * join / leave operations are provided by {@link DatagramChannel}. * * <h3>Release resources</h3> * <p> * It is important to call {@link #close()} or {@link #close(ChannelPromise)} to release all * resources once you are done with the {@link Channel}. This ensures all resources are * released in a proper way, i.e. filehandles. */ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { /** * Returns the globally unique identifier of this {@link Channel}. 返回通道全局唯一id */ ChannelId id(); /** * Return the {@link EventLoop} this {@link Channel} was registered to. 返回通道注册的事件循环 */ EventLoop eventLoop(); /** * Returns the parent of this channel. * 返回所属通道 * @return the parent channel. * {@code null} if this channel does not have a parent channel. */ Channel parent(); /** * Returns the configuration of this channel. 返回通道配置 */ ChannelConfig config(); /** * Returns {@code true} if the {@link Channel} is open and may get active later 判断通道是否打开 */ boolean isOpen(); /** * Returns {@code true} if the {@link Channel} is registered with an {@link EventLoop}. 判断通道是否注册到事件循环 */ boolean isRegistered(); /** * Return {@code true} if the {@link Channel} is active and so connected. 判断当前同时是否激活,及连接 */ boolean isActive(); /** * Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}. 返回通道元数据 */ ChannelMetadata metadata(); /** * Returns the local address where this channel is bound to. The returned * {@link SocketAddress} is supposed to be down-cast into more concrete * type such as {@link InetSocketAddress} to retrieve the detailed * information. * 返回通道绑定的socket地址 * @return the local address of this channel. * {@code null} if this channel is not bound. */ SocketAddress localAddress(); /** * Returns the remote address where this channel is connected to. The * returned {@link SocketAddress} is supposed to be down-cast into more * concrete type such as {@link InetSocketAddress} to retrieve the detailed * information. *返回远端socket地址。如果没有连接,返回null。对于报文通道,返回null * @return the remote address of this channel. * {@code null} if this channel is not connected. * If this channel is not connected but it can receive messages * from arbitrary remote addresses (e.g. {@link DatagramChannel}, * use {@link DatagramPacket#recipient()} to determine * the origination of the received message as this method will * return {@code null}. */ SocketAddress remoteAddress(); /** * Returns the {@link ChannelFuture} which will be notified when this * channel is closed. This method always returns the same future instance. 返回通道任务结果,当通道关闭,将会通知通道任务结果。此方总是返回同一任务实例 */ ChannelFuture closeFuture(); /** * Returns {@code true} if and only if the I/O thread will perform the * requested write operation immediately. Any write requests made when * this method returns {@code false} are queued until the I/O thread is * ready to process the queued write requests. 当且仅当IO线程立刻自行写请求操作。当方法返回false时,写请求被放在队列中,直到 有IO线程准备处理写请求队列 */ boolean isWritable(); /** * Get how many bytes can be written until {@link #isWritable()} returns {@code false}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0. 在判断是否可写方法返回false前,估算有多少字节需要发送,反之的值,总是非负数,如果#isWritable方法, 返回false,则此方法返回0。此方法的作用就是判断在IO线程处理写请求队列时,判断有多少字节要发送。 */ long bytesBeforeUnwritable(); /** * Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}. * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0. 此方法与上述方法相反,用于估算在#isWritable方法返回true前,写请求队列中待发送的字节数 */ long bytesBeforeWritable(); /** * Returns an [i]internal-use-only[/i] object that provides unsafe operations. 返回通道内部线程线程安全操作辅助Unsafe */ Unsafe unsafe(); /** * Return the assigned {@link ChannelPipeline}. 返回内部的Channel管道 */ ChannelPipeline pipeline(); /** * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s. 返回通道的字节buf分配器 */ ByteBufAllocator alloc(); //下面两个方法时重写了ChannelOutboundInvoker的方法 @Override Channel read(); @Override Channel flush(); /** * [i]Unsafe[/i] operations that should [i]never[/i] be called from user-code. These methods * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the * following methods: Unsafe不可以,用户代码直接调用,这些方法仅仅能被实际的transport实现,必须被IO线程调用,除了如下的方法 * [list] * [*]{@link #localAddress()} * [*]{@link #remoteAddress()} * [*]{@link #closeForcibly()} * [*]{@link #register(EventLoop, ChannelPromise)} * [*]{@link #deregister(ChannelPromise)} * [*]{@link #voidPromise()} * [/list] */ interface Unsafe { /** * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when * receiving data. 返回Unsafe内部的接收字节部分分配器Handle,用于分配接收数据时,存放数据的字节部分。 */ RecvByteBufAllocator.Handle recvBufAllocHandle(); /** * Return the {@link SocketAddress} to which is bound local or * {@code null} if none. 返回绑定socket地址 */ SocketAddress localAddress(); /** * Return the {@link SocketAddress} to which is bound remote or * {@code null} if none is bound yet. 返回远端socket地址,没有绑定,则为null */ SocketAddress remoteAddress(); /** * Register the {@link Channel} of the {@link ChannelPromise} and notify * the {@link ChannelFuture} once the registration was complete. 注册通道到事件循环,当注册完成时,通知可写任务结果ChannelPromise */ void register(EventLoop eventLoop, ChannelPromise promise); /** * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify * it once its done. 绑定socket地址,绑定完毕,通知可写任务结果ChannelPromise */ void bind(SocketAddress localAddress, ChannelPromise promise); /** * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}. * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just * pass {@code null} to it. * * The {@link ChannelPromise} will get notified once the connect operation was complete. 连接远端socket地址,如果需要本地socket地址,则传入,没有则传null */ void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); /** * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the * operation was complete. 断开通道连接,待完成,通知可写任务结果ChannelPromise */ void disconnect(ChannelPromise promise); /** * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the * operation was complete. 关闭通道,待完成,通知可写任务结果ChannelPromise */ void close(ChannelPromise promise); /** * Closes the {@link Channel} immediately without firing any events. Probably only useful * when registration attempt failed. 在不触发任务事件的情况下,关闭通道 */ void closeForcibly(); /** * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the * {@link ChannelPromise} once the operation was complete. 从事件循环,反注册通道,待完成,通知可写任务结果ChannelPromise */ void deregister(ChannelPromise promise); /** * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the * {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing. 调度一个读操作,填充字节数据到管道中第一个Inbound处理器的inbound buf中。如果有一个正在读取的操作,则 此方法什么都不做 */ void beginRead(); /** * Schedules a write operation. 调度一下写操作 */ void write(Object msg, ChannelPromise promise); /** * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}. 通过#write,刷新所有写请求操作 */ void flush(); /** * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}. * It will never be notified of a success or error and so is only a placeholder for operations * that take a {@link ChannelPromise} as argument but for which you not want to get notified. 返回一个可以重用和转递Unsafe操作的可选通道任务结果。当操作失败或成功时,不会被通知。 */ ChannelPromise voidPromise(); /** * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored. 返回一个Outbound buf,用于存放写请求。 */ ChannelOutboundBuffer outboundBuffer(); } }
从上面可以看出通道,关联一个事件循环,及通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器;一个字节buf分配器ByteBufAllocator,用于分配字节buf,还有一些获取通道状态的方式,是否注册到事件循环,通道是否打开,是否可写;另外还要获取通道配置ChannelConfig,通道元数据ChannelMetadata的方法;最重要的是,关联一个Unsafe,用于通道的地址绑定,连接操作以及断开,通道的读写,注册到事件循环以及反注册。
再来看ServerSocketChanel接口定义:
//ServerSocketChannel
package io.netty.channel.socket; import io.netty.channel.ServerChannel; import java.net.InetSocketAddress; /** * A TCP/IP {@link ServerChannel} which accepts incoming TCP/IP connections. */ public interface ServerSocketChannel extends ServerChannel { @Override ServerSocketChannelConfig config(); @Override InetSocketAddress localAddress(); @Override InetSocketAddress remoteAddress(); }
//ServerChannel
package io.netty.channel; import io.netty.channel.socket.ServerSocketChannel; /** * A {@link Channel} that accepts an incoming connection attempt and creates * its child {@link Channel}s by accepting them. {@link ServerSocketChannel} is * a good example. */ public interface ServerChannel extends Channel { // This is a tag interface. }
从上面来看ServerSocketChannel没有什么特殊的,只是标注是一个可以接受客户端连接的通道。
再来看SocketChanel接口定义:
//SocketChannel
package io.netty.channel.socket; import io.netty.channel.Channel; import java.net.InetSocketAddress; /** * A TCP/IP socket {@link Channel}. */ public interface SocketChannel extends DuplexChannel { @Override ServerSocketChannel parent(); @Override SocketChannelConfig config(); @Override InetSocketAddress localAddress(); @Override InetSocketAddress remoteAddress(); }
//DuplexChannel
package io.netty.channel.socket; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; import java.net.Socket; /** * A duplex {@link Channel} that has two sides that can be shutdown independently. */ public interface DuplexChannel extends Channel { /** * Returns {@code true} if and only if the remote peer shut down its output so that no more * data is received from this channel. Note that the semantic of this method is different from * that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}. 当且仅当,远端peer关闭输出流,通道不在有数据要接受时,返回true。需要注意的是,此方法不同于 Socket#shutdownInput和Socket#isInputShutdown方法。 */ boolean isInputShutdown(); /** * @see Socket#shutdownInput() 关闭通道socket输入流 */ ChannelFuture shutdownInput(); /** * Will shutdown the input and notify {@link ChannelPromise}. *与上面不同的是,多了一个异步任务结果 * @see Socket#shutdownInput() */ ChannelFuture shutdownInput(ChannelPromise promise); /** * @see Socket#isOutputShutdown() 判断是否关闭socket输出流 */ boolean isOutputShutdown(); /** * @see Socket#shutdownOutput() 关闭socket输出流 */ ChannelFuture shutdownOutput(); /** * Will shutdown the output and notify {@link ChannelPromise}. *与上面不同的是,多了一个异步任务结果 * @see Socket#shutdownOutput() */ ChannelFuture shutdownOutput(ChannelPromise promise); /** * Determine if both the input and output of this channel have been shutdown. 判断输出流和输入流是否都关闭 */ boolean isShutdown(); /** * Will shutdown the input and output sides of this channel. * @return will be completed when both shutdown operations complete. 关闭通道的输入输出流,完成时,并通知异步任务结果ChannelFuture */ ChannelFuture shutdown(); /** * Will shutdown the input and output sides of this channel. * @param promise will be completed when both shutdown operations complete. * @return will be completed when both shutdown operations complete. 关闭通道的输入输出流,完成时,并通知异步可写任务结果ChannelPromise */ ChannelFuture shutdown(ChannelPromise promise); }
从上面看出Socket通道实际上是一个可写可读的流通道。
总结:
通道Channel,关联一个事件循环,及通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器;一个字节buf分配器ByteBufAllocator,用于分配字节buf,还有一些获取通道状态的方式,是否注册到事件循环,通道是否打开,是否可写;另外还要获取通道配置ChannelConfig,通道元数据ChannelMetadata的方法;最重要的是,关联一个Unsafe,用于通道的地址绑定,连接操作以及断开,通道的读写,注册到事件循环以及反注册。
附:
//ChannelId
package io.netty.channel; import java.io.Serializable; /** * Represents the globally unique identifier of a {@link Channel}. * <p> * The identifier is generated from various sources listed in the following: * [list] * [*]MAC address (EUI-48 or EUI-64) or the network adapter, preferably a globally unique one, * [*]the current process ID, * [*]{@link System#currentTimeMillis()}, * [*]{@link System#nanoTime()}, * [*]a random 32-bit integer, and * [*]a sequentially incremented 32-bit integer. * [/list] * * * The global uniqueness of the generated identifier mostly depends on the MAC address and the current process ID, * which are auto-detected at the class-loading time in best-effort manner. If all attempts to acquire them fail, * a warning message is logged, and random values will be used instead. Alternatively, you can specify them manually * via system properties: * [list] * <li>{@code io.netty.machineId} - hexadecimal representation of 48 (or 64) bit integer, * optionally separated by colon or hyphen.</li> * [*]{@code io.netty.processId} - an integer between 0 and 65535 * [/list] * */ public interface ChannelId extends Serializable, Comparable<ChannelId> { /** * Returns the short but globally non-unique string representation of the {@link ChannelId}. */ String asShortText(); /** * Returns the long yet globally unique string representation of the {@link ChannelId}. */ String asLongText(); }
//ChannelConfig
package io.netty.channel; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.socket.SocketChannelConfig; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.Map; /** * A set of configuration properties of a {@link Channel}. * * Please down-cast to more specific configuration type such as * {@link SocketChannelConfig} or use {@link #setOptions(Map)} to set the * transport-specific properties: * <pre> * {@link Channel} ch = ...; * {@link SocketChannelConfig} cfg = [b]({@link SocketChannelConfig}) ch.getConfig();[/b] * cfg.setTcpNoDelay(false); * </pre> * * <h3>Option map</h3> * * An option map property is a dynamic write-only property which allows * the configuration of a {@link Channel} without down-casting its associated * {@link ChannelConfig}. To update an option map, please call {@link #setOptions(Map)}. * <p> * All {@link ChannelConfig} has the following options: * * <table border="1" cellspacing="0" cellpadding="6"> * <tr> * <th>Name</th><th>Associated setter method</th> * </tr><tr> * <td>{@link ChannelOption#CONNECT_TIMEOUT_MILLIS}</td><td>{@link #setConnectTimeoutMillis(int)}</td> * </tr><tr> * <td>{@link ChannelOption#WRITE_SPIN_COUNT}</td><td>{@link #setWriteSpinCount(int)}</td> * </tr><tr> * <td>{@link ChannelOption#WRITE_BUFFER_WATER_MARK}</td><td>{@link #setWriteBufferWaterMark(WriteBufferWaterMark)}</td> * </tr><tr> * <td>{@link ChannelOption#ALLOCATOR}</td><td>{@link #setAllocator(ByteBufAllocator)}</td> * </tr><tr> * <td>{@link ChannelOption#AUTO_READ}</td><td>{@link #setAutoRead(boolean)}</td> * </tr> * </table> * <p> * More options are available in the sub-types of {@link ChannelConfig}. For * example, you can configure the parameters which are specific to a TCP/IP * socket as explained in {@link SocketChannelConfig}. */ public interface ChannelConfig { /** * Return all set {@link ChannelOption}'s. */ Map<ChannelOption<?>, Object> getOptions(); /** * Sets the configuration properties from the specified {@link Map}. */ boolean setOptions(Map<ChannelOption<?>, ?> options); /** * Return the value of the given {@link ChannelOption} */ <T> T getOption(ChannelOption<T> option); /** * Sets a configuration property with the specified name and value. * To override this method properly, you must call the super class: * <pre> * public boolean setOption(ChannelOption<T> option, T value) { * if (super.setOption(option, value)) { * return true; * } * * if (option.equals(additionalOption)) { * .... * return true; * } * * return false; * } * </pre> * * @return {@code true} if and only if the property has been set */ <T> boolean setOption(ChannelOption<T> option, T value); /** * Returns the connect timeout of the channel in milliseconds. If the * {@link Channel} does not support connect operation, this property is not * used at all, and therefore will be ignored. * * @return the connect timeout in milliseconds. {@code 0} if disabled. */ int getConnectTimeoutMillis(); /** * Sets the connect timeout of the channel in milliseconds. If the * {@link Channel} does not support connect operation, this property is not * used at all, and therefore will be ignored. * * @param connectTimeoutMillis the connect timeout in milliseconds. * {@code 0} to disable. */ ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis); /** * @deprecated Use {@link MaxMessagesRecvByteBufAllocator} * <p> * Returns the maximum number of messages to read per read loop. * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. */ @Deprecated int getMaxMessagesPerRead(); /** * @deprecated Use {@link MaxMessagesRecvByteBufAllocator} * <p> * Sets the maximum number of messages to read per read loop. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. */ @Deprecated ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead); /** * Returns the maximum loop count for a write operation until * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. * It is similar to what a spin lock is used for in concurrency programming. * It improves memory utilization and write throughput depending on * the platform that JVM runs on. The default value is {@code 16}. */ int getWriteSpinCount(); /** * Sets the maximum loop count for a write operation until * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. * It is similar to what a spin lock is used for in concurrency programming. * It improves memory utilization and write throughput depending on * the platform that JVM runs on. The default value is {@code 16}. * * @throws IllegalArgumentException * if the specified value is {@code 0} or less than {@code 0} */ ChannelConfig setWriteSpinCount(int writeSpinCount); /** * Returns {@link ByteBufAllocator} which is used for the channel * to allocate buffers. */ ByteBufAllocator getAllocator(); /** * Set the {@link ByteBufAllocator} which is used for the channel * to allocate buffers. */ ChannelConfig setAllocator(ByteBufAllocator allocator); /** * Returns {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers. */ <T extends RecvByteBufAllocator> T getRecvByteBufAllocator(); /** * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers. */ ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator); /** * Returns {@code true} if and only if {@link ChannelHandlerContext#read()} will be invoked automatically so that * a user application doesn't need to call it at all. The default value is {@code true}. */ boolean isAutoRead(); /** * Sets if {@link ChannelHandlerContext#read()} will be invoked automatically so that a user application doesn't * need to call it at all. The default value is {@code true}. */ ChannelConfig setAutoRead(boolean autoRead); /** * @deprecated Auto close will be removed in a future release. * * Returns {@code true} if and only if the {@link Channel} will be closed automatically and immediately on * write failure. The default is {@code false}. */ @Deprecated boolean isAutoClose(); /** * @deprecated Auto close will be removed in a future release. * * Sets whether the {@link Channel} should be closed automatically and immediately on write failure. * The default is {@code false}. */ @Deprecated ChannelConfig setAutoClose(boolean autoClose); /** * Returns the high water mark of the write buffer. If the number of bytes * queued in the write buffer exceeds this value, {@link Channel#isWritable()} * will start to return {@code false}. */ int getWriteBufferHighWaterMark(); /** * <p> * Sets the high water mark of the write buffer. If the number of bytes * queued in the write buffer exceeds this value, {@link Channel#isWritable()} * will start to return {@code false}. */ ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark); /** * Returns the low water mark of the write buffer. Once the number of bytes * queued in the write buffer exceeded the * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then * dropped down below this value, {@link Channel#isWritable()} will start to return * {@code true} again. */ int getWriteBufferLowWaterMark(); /** * <p> * Sets the low water mark of the write buffer. Once the number of bytes * queued in the write buffer exceeded the * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then * dropped down below this value, {@link Channel#isWritable()} will start to return * {@code true} again. */ ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark); /** * Returns {@link MessageSizeEstimator} which is used for the channel * to detect the size of a message. */ MessageSizeEstimator getMessageSizeEstimator(); /** * Set the {@link MessageSizeEstimator} which is used for the channel * to detect the size of a message. */ ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator); /** * Returns the {@link WriteBufferWaterMark} which is used for setting the high and low * water mark of the write buffer. */ WriteBufferWaterMark getWriteBufferWaterMark(); /** * Set the {@link WriteBufferWaterMark} which is used for setting the high and low * water mark of the write buffer. */ ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark); }
//ChannelMetadata
package io.netty.channel; import java.net.SocketAddress; /** * Represents the properties of a {@link Channel} implementation. */ public final class ChannelMetadata { private final boolean hasDisconnect; private final int defaultMaxMessagesPerRead; /** * Create a new instance * * @param hasDisconnect {@code true} if and only if the channel has the {@code disconnect()} operation * that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)} * again, such as UDP/IP. */ public ChannelMetadata(boolean hasDisconnect) { this(hasDisconnect, 1); } /** * Create a new instance * * @param hasDisconnect {@code true} if and only if the channel has the {@code disconnect()} operation * that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)} * again, such as UDP/IP. * @param defaultMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this value will be * set for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}. */ public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) { if (defaultMaxMessagesPerRead <= 0) { throw new IllegalArgumentException("defaultMaxMessagesPerRead: " + defaultMaxMessagesPerRead + " (expected > 0)"); } this.hasDisconnect = hasDisconnect; this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead; } /** * Returns {@code true} if and only if the channel has the {@code disconnect()} operation * that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)} again, * such as UDP/IP. */ public boolean hasDisconnect() { return hasDisconnect; } /** * If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the default value for * {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. */ public int defaultMaxMessagesPerRead() { return defaultMaxMessagesPerRead; } }
//ChannelOutboundBuffer
package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; /** * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * outbound write requests. * <p> * All methods must be called by a transport implementation from an I/O thread, except the following ones: * [list] * [*]{@link #size()} and {@link #isEmpty()} * [*]{@link #isWritable()} * [*]{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)} * [/list] * */ public final class ChannelOutboundBuffer { // Assuming a 64-bit JVM: // - 16 bytes object header // - 8 reference fields // - 2 long fields // - 2 int fields // - 1 boolean field // - padding static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96); private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class); private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() { @Override protected ByteBuffer[] initialValue() throws Exception { return new ByteBuffer[1024]; } }; private final Channel channel;//关联通道 // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) // // The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry; // The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry; // The Entry which represents the tail of the buffer private Entry tailEntry; // The number of flushed entries that are not written yet private int flushed; private int nioBufferCount; private long nioBufferSize; private boolean inFail; private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); @SuppressWarnings("UnusedDeclaration") private volatile long totalPendingSize; private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); @SuppressWarnings("UnusedDeclaration") private volatile int unwritable; private volatile Runnable fireChannelWritabilityChangedTask; ChannelOutboundBuffer(AbstractChannel channel) { this.channel = channel; } ... }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1312netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2050netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2436netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1310netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1304netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1591netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1841netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1393netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2823netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2174netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2033netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1076netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1875netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1215netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1200netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 954netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1306netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2187netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1071netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1850netty 管道线定义-ChannelPipeline:htt ...
相关推荐
Netty的ChannelHandlerContext则用来管理与通道相关的上下文信息,如发送和接收数据。 在Netty中,我们定义一个ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter来处理串口的输入和输出事件。例如,...
- Netty 使用非阻塞 I/O(NIO)模型,通过事件循环(EventLoop)和通道(Channel)处理网络连接,提高了处理大量并发连接的能力。 - 事件驱动架构:Netty 通过 ChannelHandler 处理各种网络事件,如连接建立、数据...
客户端发送16进制给服务端,并行实现socket通道活动状态和断开重新连接的功能, 监听接口是否存在数据,如果存在socket客户端发送给socket服务端的实现 随着物联网的发展,随之出现了各种传感器监测数据的实时发送,...
4. **Spring与Netty的集成**:通过Spring的`WebSocketMessageBrokerConfigurer`接口,将Spring的消息传递系统与Netty的WebSocket服务器连接起来。这通常涉及设置WebSocket消息的订阅者和发布者。 5. **客户端支持**...
首先,我们需要定义一个UDPChannelHandler,这是Netty中的处理器接口,负责处理接收到的声呐数据和发送出去的命令。我们可以重写其channelRead方法来处理接收到的数据,例如解码JSON格式的声呐信息。JSON...
3. **ServerChannel**:服务器端的通道接口,如NioServerSocketChannel,用于监听和接受新的连接。 4. **ChannelInitializer**:用于添加服务器端的处理器链,比如解码器、编码器、业务逻辑处理器等。 5. **...
1. **异步事件驱动**:Netty 使用了非阻塞I/O模型,基于Java NIO(非阻塞输入/输出)库,通过事件循环(EventLoop)和通道(Channel)机制,实现了高并发性能,降低了系统的资源消耗。 2. **高效的数据处理**:...
在系统B中,我们可以利用Netty的ChannelHandler接口来定义业务逻辑,处理来自系统A的短连接请求,同时保持与系统C的长连接。ChannelHandlerContext对象用于在ChannelHandler之间传递消息,实现事件驱动的通信。 ...
- **定义**: Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器与客户端。 - **目标用户**: 适合Java网络编程新手及有经验的开发者。 - **核心优势**: 提供了简单易用的API,将...
定义了通道的基本操作。 #### 类AbstractChannel 这是一个抽象类,实现了`Channel`接口,提供了部分通用的行为。 #### ChannelFactory 用于创建新的`Channel`实例。 #### ChannelUnsafe 提供了一些不安全的操作,...
4. **Pipeline**:Netty的事件处理模型基于事件通道(Event Pipeline),它是一个链式处理结构,每个节点都是一个ChannelHandler,数据在管道中流动时会被逐个处理,实现了职责分离和解耦。 5. **NIO基础**:Netty...
开发者可以通过实现ChannelInboundHandler和ChannelOutboundHandler接口,定义自己的业务逻辑。 5. **Pipeline**: Pipeline是一个处理链,每个ChannelHandler都在链上占据一个位置。数据在网络中传输时,会经过...
通过上述简单的服务端和客户端示例,我们可以看出Netty框架在服务器和客户端的代码结构上保持了一致性,都利用了ChannelHandler来处理事件,并且Netty通过一系列的接口和抽象类来提供灵活的编程模型。 Netty还提供...
`META-INF`目录通常存储元数据,如服务定义和服务提供者接口(SPI)信息;`WEB-INF`目录则包含Web应用的配置文件(如`web.xml`)、库(JAR文件)和其他不直接暴露给用户的资源。 在学习这个示例时,你需要关注以下...
利用Netty,我们可以轻松实现TCP服务器,创建ServerBootstrap,配置通道处理器pipeline,监听客户端连接,并处理接收到的数据。 Netty中的ChannelHandlerContext类是事件处理的核心,它提供了发送和接收数据的方法...
MyBatis允许开发者在XML或注解中定义SQL语句,通过Mapper接口与Java对象进行交互。这种设计模式提高了数据库操作的灵活性和效率。 Netty是一个高性能、异步事件驱动的网络应用框架,主要用于快速开发可维护的高性能...
- 在处理链中,`ChannelHandlerContext`是上下文对象,用于触发I/O事件、获取引用到下一个处理器、关闭通道等。 6. **多路复用与NIO选择器**: - Netty利用Java NIO的多路复用器(Selector)来监听和调度多个连接...
3. **ChannelHandler**:Netty中的`ChannelHandler`接口是处理I/O事件的核心组件。ServerChannel会将接收到的客户端连接转换为新的`Channel`,每个新建立的连接都有自己的事件处理管道。你可以通过`ChannelPipeline`...
此外,Netty的ChannelHandler接口允许我们定义业务逻辑,而EventLoopGroup则负责线程管理和事件调度。 对比之下,Socket是基于TCP/IP协议族的低级通信接口,它提供了进程间通信的基础。在Java中,Socket类代表一个...