`
Donald_Draper
  • 浏览: 987644 次
社区版块
存档分类
最新评论

netty 通道配置接口定义

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
引言
上一篇文章,我们看了nio服务端socket通道,先来回顾一下:
nio服务端socket通道NioServerSocketChannel内部有两个变量,一个为选择器提供者SelectorProvider,一个为通道配置ServerSocketChannelConfig。

通道实际绑定socket地址,首先判断jdk版本信息,如果jdk版本大于1.7 则使用通道bind方法,绑定socket地址,否则为通道关联Socket的bind方法。

doReadMessages方法,实际为当接受客户端的连接请求时,创建一个与客户端交互的socket通道,并添加到读操作结果集中,实际为socket通道集。并将socket通道集交给ServerBootStrap的引导配置监听器ServerBootstrapAcceptor处理,Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。 客户端连接服务端时,首先向服务端发送连接请求数据,服务端接受到连接请求时,创建一个与客户端交互的socket通道。

由于服务端通道用于接受客户端的请求,所有不支持连接,写消息,消息过滤等等操作。

上一篇文章中我们遗留了一点,即nio服务端通道配置,今天我们就来看一通道配置:
//Nio服务端通道配置, 为NioServerSocketChannel的内部类,这个我们单独列一篇文章来说
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
    private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
        super(channel, javaSocket);
    }

    @Override
    protected void autoReadCleared() {
        clearReadPending();
    }
}

//DefaultServerSocketChannelConfig
/**
 * The default {@link ServerSocketChannelConfig} implementation.
 */
public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
                                              implements ServerSocketChannelConfig {


//ServerSocketChannelConfig
public interface ServerSocketChannelConfig extends ChannelConfig {


//DefaultChannelConfig
/**
 * The default {@link ChannelConfig} implementation.
 */
public class DefaultChannelConfig implements ChannelConfig {

我们先来看通道配置接口ChannelConfig的定义:
package io.netty.channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig;

import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Map;

/**
 * A set of configuration properties of a {@link Channel}.
 通道配置属性集
 * 
 * Please down-cast to more specific configuration type such as
 * {@link SocketChannelConfig} or use {@link #setOptions(Map)} to set the
 * transport-specific properties:
 可以强制转化为特殊的配置,不如socket通道配置,设置transport属性如下:
 * <pre>
 * {@link Channel} ch = ...;
 * {@link SocketChannelConfig} cfg = [b]({@link SocketChannelConfig}) ch.getConfig();[/b]
 * cfg.setTcpNoDelay(false);
 * </pre>
 *
 * <h3>Option map</h3>
 *
 * An option map property is a dynamic write-only property which allows
 * the configuration of a {@link Channel} without down-casting its associated
 * {@link ChannelConfig}.  To update an option map, please call {@link #setOptions(Map)}.
 选项Map是一个动态只写的集合,允许在没有转化为通道配置的情况下,配置选项:
 * <p>
 * All {@link ChannelConfig} has the following options:
 *所有通道配置选项
 * <table border="1" cellspacing="0" cellpadding="6">
 * <tr>
 * <th>Name</th><th>Associated setter method</th>
 * </tr><tr>
 * <td>{@link ChannelOption#CONNECT_TIMEOUT_MILLIS}</td><td>{@link #setConnectTimeoutMillis(int)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#WRITE_SPIN_COUNT}</td><td>{@link #setWriteSpinCount(int)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#WRITE_BUFFER_WATER_MARK}</td><td>{@link #setWriteBufferWaterMark(WriteBufferWaterMark)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#ALLOCATOR}</td><td>{@link #setAllocator(ByteBufAllocator)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#AUTO_READ}</td><td>{@link #setAutoRead(boolean)}</td>
 * </tr>
 * </table>
 * <p>
 * More options are available in the sub-types of {@link ChannelConfig}.  For
 * example, you can configure the parameters which are specific to a TCP/IP
 * socket as explained in {@link SocketChannelConfig}.
 */
public interface ChannelConfig {

    /**
     * Return all set {@link ChannelOption}'s.
     获取所有通道选项
     */
    Map<ChannelOption<?>, Object> getOptions();

    /**
     * Sets the configuration properties from the specified {@link Map}.
     配置通道选项
     */
    boolean setOptions(Map<ChannelOption<?>, ?> options);

    /**
     * Return the value of the given {@link ChannelOption}
     获取给定通道选项的值
     */
    <T> T getOption(ChannelOption<T> option);

    /**
     * Sets a configuration property with the specified name and value.
     * To override this method properly, you must call the super class:
     * <pre>
     * public boolean setOption(ChannelOption<T> option, T value) {
     *     if (super.setOption(option, value)) {
     *         return true;
     *     }
     *
     *     if (option.equals(additionalOption)) {
     *         ....
     *         return true;
     *     }
     *
     *     return false;
     * }
     * </pre>
     *配置给定通道选项的值
     * @return {@code true} if and only if the property has been set
     */
    <T> boolean setOption(ChannelOption<T> option, T value);

    /**
     * Returns the connect timeout of the channel in milliseconds.  If the
     * {@link Channel} does not support connect operation, this property is not
     * used at all, and therefore will be ignored.
     *获取通道连接超时时间
     * @return the connect timeout in milliseconds.  {@code 0} if disabled.
     */
    int getConnectTimeoutMillis();

    /**
     * Sets the connect timeout of the channel in milliseconds.  If the
     * {@link Channel} does not support connect operation, this property is not
     * used at all, and therefore will be ignored.
     *设置通道连接超时时间
     * @param connectTimeoutMillis the connect timeout in milliseconds.
     *                             {@code 0} to disable.
     */
    ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

    /**
     * @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
     * <p>
     * Returns the maximum number of messages to read per read loop.
     * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
     * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
     这个方法已经被丢弃,返回每次通道处理器读操作事件,允许读取的最大消息数量,
     如果值大于1,则每个事件循环尝试多次读操作,以产生多个消息。
     */
    @Deprecated
    int getMaxMessagesPerRead();

    /**
     * @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
     设置每次读取允许读取的消息数,当前方法已丢弃,请使用MaxMessagesRecvByteBufAllocator
     * <p>
     * Sets the maximum number of messages to read per read loop.
     * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
     */
    @Deprecated
    ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

    /**
     * Returns the maximum loop count for a write operation until
     * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
     * It is similar to what a spin lock is used for in concurrency programming.
     * It improves memory utilization and write throughput depending on
     * the platform that JVM runs on.  The default value is {@code 16}.
     返回直到可写字节通道写操作返回非零值时,允许循环写的次数。相当于并发编程中的
     自旋锁。能否改善内存使用率和写吞吐量依赖句具体的JVM平台。默认值为16
     */
    int getWriteSpinCount();

    /**
     * Sets the maximum loop count for a write operation until
     * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
     * It is similar to what a spin lock is used for in concurrency programming.
     * It improves memory utilization and write throughput depending on
     * the platform that JVM runs on.  The default value is {@code 16}.
     *设置写操作最大自旋数
     * @throws IllegalArgumentException
     *         if the specified value is {@code 0} or less than {@code 0}
     */
    ChannelConfig setWriteSpinCount(int writeSpinCount);
    //获取设置字节分配器
    /**
     * Returns {@link ByteBufAllocator} which is used for the channel
     * to allocate buffers.
     */
    ByteBufAllocator getAllocator();

    /**
     * Set the {@link ByteBufAllocator} which is used for the channel
     * to allocate buffers.
     */
    ChannelConfig setAllocator(ByteBufAllocator allocator);
    //设置获取接受字节分配器
    /**
     * Returns {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
     */
    <T extends RecvByteBufAllocator> T getRecvByteBufAllocator();

    /**
     * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
     */
    ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);
    //获取设置,通道上下文是否自动调用#read操作,默认为true
    /**
     * Returns {@code true} if and only if {@link ChannelHandlerContext#read()} will be invoked automatically so that
     * a user application doesn't need to call it at all. The default value is {@code true}.
     */
    boolean isAutoRead();

    /**
     * Sets if {@link ChannelHandlerContext#read()} will be invoked automatically so that a user application doesn't
     * need to call it at all. The default value is {@code true}.
     */
    ChannelConfig setAutoRead(boolean autoRead);
    //是否自动关闭,已丢弃
    /**
     * @deprecated  Auto close will be removed in a future release.
     *
     * Returns {@code true} if and only if the {@link Channel} will be closed automatically and immediately on
     * write failure.  The default is {@code false}.
     */
    @Deprecated
    boolean isAutoClose();

    /**
     * @deprecated  Auto close will be removed in a future release.
     *
     * Sets whether the {@link Channel} should be closed automatically and immediately on write failure.
     * The default is {@code false}.
     */
    @Deprecated
    ChannelConfig setAutoClose(boolean autoClose);
    //返回写buffer的高位掩码,如果写buf中缓存的字节数大于高位掩码值,则Channel#isWritabl方法返回false
    /**
     * Returns the high water mark of the write buffer.  If the number of bytes
     * queued in the write buffer exceeds this value, {@link Channel#isWritable()}
     * will start to return {@code false}.
     */
    int getWriteBufferHighWaterMark();

    /**
     * <p>
     * Sets the high water mark of the write buffer.  If the number of bytes
     * queued in the write buffer exceeds this value, {@link Channel#isWritable()}
     * will start to return {@code false}.
     */
    ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
    //返回写buffer的低位掩码,如果写buf中缓存的字节数大于高位掩码值,则Channel#isWritabl方法返回true
    /**
     * Returns the low water mark of the write buffer.  Once the number of bytes
     * queued in the write buffer exceeded the
     * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
     * dropped down below this value, {@link Channel#isWritable()} will start to return
     * {@code true} again.
     */
    int getWriteBufferLowWaterMark();

    /**
     * <p>
     * Sets the low water mark of the write buffer.  Once the number of bytes
     * queued in the write buffer exceeded the
     * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
     * dropped down below this value, {@link Channel#isWritable()} will start to return
     * {@code true} again.
     */
    ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
    //消息size估算器
    /**
     * Returns {@link MessageSizeEstimator} which is used for the channel
     * to detect the size of a message.
     */
    MessageSizeEstimator getMessageSizeEstimator();

    /**
     * Set the {@link MessageSizeEstimator} which is used for the channel
     * to detect the size of a message.
     */
    ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
    //获取写buf高位掩码
    /**
     * Returns the {@link WriteBufferWaterMark} which is used for setting the high and low
     * water mark of the write buffer.
     */
    WriteBufferWaterMark getWriteBufferWaterMark();

    /**
     * Set the {@link WriteBufferWaterMark} which is used for setting the high and low
     * water mark of the write buffer.
     */
    ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
}

从上面来看,通道配置接口,主要配置通道的字节buf分配器,接受buf分配器,消息size估算器,和通道选项。

socket通道配置和server socket通道配置,很简单,我们不再讲解,简单看一下:


//ServerSocketChannelConfig

package io.netty.channel.socket;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;

import java.net.ServerSocket;
import java.net.StandardSocketOptions;

/**
 * A {@link ChannelConfig} for a {@link ServerSocketChannel}.
 *
 * <h3>Available options</h3>
 *
 * In addition to the options provided by {@link ChannelConfig},
 * {@link ServerSocketChannelConfig} allows the following options in the
 * option map:
 *
 * <table border="1" cellspacing="0" cellpadding="6">
 * <tr>
 * <th>Name</th><th>Associated setter method</th>
 * </tr><tr>
 * <td>{@code "backlog"}</td><td>{@link #setBacklog(int)}</td>
 * </tr><tr>
 * <td>{@code "reuseAddress"}</td><td>{@link #setReuseAddress(boolean)}</td>
 * </tr><tr>
 * <td>{@code "receiveBufferSize"}</td><td>{@link #setReceiveBufferSize(int)}</td>
 * </tr>
 * </table>
 */
public interface ServerSocketChannelConfig extends ChannelConfig {

    /**
     * Gets the backlog value to specify when the channel binds to a local
     * address.
     */
    int getBacklog();

    /**
     * Sets the backlog value to specify when the channel binds to a local
     * address.
     */
    ServerSocketChannelConfig setBacklog(int backlog);

    /**
     * Gets the {@link StandardSocketOptions#SO_REUSEADDR} option.
     */
    boolean isReuseAddress();

    /**
     * Sets the {@link StandardSocketOptions#SO_REUSEADDR} option.
     */
    ServerSocketChannelConfig setReuseAddress(boolean reuseAddress);

    /**
     * Gets the {@link StandardSocketOptions#SO_RCVBUF} option.
     */
    int getReceiveBufferSize();

    /**
     * Gets the {@link StandardSocketOptions#SO_SNDBUF} option.
     */
    ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize);

    /**
     * Sets the performance preferences as specified in
     * {@link ServerSocket#setPerformancePreferences(int, int, int)}.
     */
    ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth);

    @Override
    ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

    @Override
    @Deprecated
    ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

    @Override
    ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount);

    @Override
    ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator);

    @Override
    ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);

    @Override
    ServerSocketChannelConfig setAutoRead(boolean autoRead);

    @Override
    ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);

    @Override
    ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);

    @Override
    ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);

    @Override
    ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);

}



//SocketChannelConfig


package io.netty.channel.socket;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;

import java.net.Socket;
import java.net.StandardSocketOptions;

/**
 * A {@link ChannelConfig} for a {@link SocketChannel}.
 *
 * <h3>Available options</h3>
 *
 * In addition to the options provided by {@link ChannelConfig},
 * {@link SocketChannelConfig} allows the following options in the option map:
 *
 * <table border="1" cellspacing="0" cellpadding="6">
 * <tr>
 * <th>Name</th><th>Associated setter method</th>
 * </tr><tr>
 * <td>{@link ChannelOption#SO_KEEPALIVE}</td><td>{@link #setKeepAlive(boolean)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#SO_REUSEADDR}</td><td>{@link #setReuseAddress(boolean)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#SO_LINGER}</td><td>{@link #setSoLinger(int)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#TCP_NODELAY}</td><td>{@link #setTcpNoDelay(boolean)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#SO_RCVBUF}</td><td>{@link #setReceiveBufferSize(int)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#SO_SNDBUF}</td><td>{@link #setSendBufferSize(int)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#IP_TOS}</td><td>{@link #setTrafficClass(int)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#ALLOW_HALF_CLOSURE}</td><td>{@link #setAllowHalfClosure(boolean)}</td>
 * </tr>
 * </table>
 */
public interface SocketChannelConfig extends ChannelConfig {

    /**
     * Gets the {@link StandardSocketOptions#TCP_NODELAY} option.  Please note that the default value of this option
     * is {@code true} unlike the operating system default ({@code false}). However, for some buggy platforms, such as
     * Android, that shows erratic behavior with Nagle's algorithm disabled, the default value remains to be
     * {@code false}.
     */
    boolean isTcpNoDelay();

    /**
     * Sets the {@link StandardSocketOptions#TCP_NODELAY} option.  Please note that the default value of this option
     * is {@code true} unlike the operating system default ({@code false}). However, for some buggy platforms, such as
     * Android, that shows erratic behavior with Nagle's algorithm disabled, the default value remains to be
     * {@code false}.
     */
    SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay);

    /**
     * Gets the {@link StandardSocketOptions#SO_LINGER} option.
     */
    int getSoLinger();

    /**
     * Sets the {@link StandardSocketOptions#SO_LINGER} option.
     */
    SocketChannelConfig setSoLinger(int soLinger);

    /**
     * Gets the {@link StandardSocketOptions#SO_SNDBUF} option.
     */
    int getSendBufferSize();

    /**
     * Sets the {@link StandardSocketOptions#SO_SNDBUF} option.
     */
    SocketChannelConfig setSendBufferSize(int sendBufferSize);

    /**
     * Gets the {@link StandardSocketOptions#SO_RCVBUF} option.
     */
    int getReceiveBufferSize();

    /**
     * Sets the {@link StandardSocketOptions#SO_RCVBUF} option.
     */
    SocketChannelConfig setReceiveBufferSize(int receiveBufferSize);

    /**
     * Gets the {@link StandardSocketOptions#SO_KEEPALIVE} option.
     */
    boolean isKeepAlive();

    /**
     * Sets the {@link StandardSocketOptions#SO_KEEPALIVE} option.
     */
    SocketChannelConfig setKeepAlive(boolean keepAlive);

    /**
     * Gets the {@link StandardSocketOptions#IP_TOS} option.
     */
    int getTrafficClass();

    /**
     * Sets the {@link StandardSocketOptions#IP_TOS} option.
     */
    SocketChannelConfig setTrafficClass(int trafficClass);

    /**
     * Gets the {@link StandardSocketOptions#SO_REUSEADDR} option.
     */
    boolean isReuseAddress();

    /**
     * Sets the {@link StandardSocketOptions#SO_REUSEADDR} option.
     */
    SocketChannelConfig setReuseAddress(boolean reuseAddress);

    /**
     * Sets the performance preferences as specified in
     * {@link Socket#setPerformancePreferences(int, int, int)}.
     */
    SocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth);

    /**
     * Returns {@code true} if and only if the channel should not close itself when its remote
     * peer shuts down output to make the connection half-closed.  If {@code false}, the connection
     * is closed automatically when the remote peer shuts down output.
     */
    boolean isAllowHalfClosure();

    /**
     * Sets whether the channel should not close itself when its remote peer shuts down output to
     * make the connection half-closed.  If {@code true} the connection is not closed when the
     * remote peer shuts down output. Instead,
     * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}
     * is invoked with a {@link ChannelInputShutdownEvent} object. If {@code false}, the connection
     * is closed automatically.
     设置当远端peer关闭输出流,使连接半关闭时,通道是否应该关闭。如果是true,表示当远端peer关闭输出流,
     不应该关闭通道
     */
    SocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure);

    @Override
    SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

    @Override
    @Deprecated
    SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

    @Override
    SocketChannelConfig setWriteSpinCount(int writeSpinCount);

    @Override
    SocketChannelConfig setAllocator(ByteBufAllocator allocator);

    @Override
    SocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);

    @Override
    SocketChannelConfig setAutoRead(boolean autoRead);

    @Override
    SocketChannelConfig setAutoClose(boolean autoClose);

    @Override
    SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);

    @Override
    SocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);

}



总结:

通道配置接口,主要配置通道的字节buf分配器,接受buf分配器,消息size估算器,和通道选项。通通配置有两类分别为Socket通道和ServerSocket通道配置,大部分配置与Socket和SeverSocket的基本相同




附:
//ByteBufAllocator
package io.netty.buffer;

/**
 * Implementations are responsible to allocate buffers. Implementations of this interface are expected to be
 * thread-safe.
 字节buf分配器,主要负责分配buf,接口的实现必须是线程安全的
 */
public interface ByteBufAllocator {

    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

    /**
     * Allocate a {@link ByteBuf}. If it is a direct or heap buffer
     * depends on the actual implementation.
     分配一个字节buf,具体buf是direct还是heap依赖于具体的实现
     */
    ByteBuf buffer();

    /**
     * Allocate a {@link ByteBuf} with the given initial capacity.
     * If it is a direct or heap buffer depends on the actual implementation.
    分配一个给定初始化容量的字节buf,具体buf是direct还是heap依赖于具体的实现
     */
    ByteBuf buffer(int initialCapacity);

    /**
     * Allocate a {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity. If it is a direct or heap buffer depends on the actual
     * implementation.
     不上面不同的是,限制了最大容量
     */
    ByteBuf buffer(int initialCapacity, int maxCapacity);

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     分配字节buf,优先为direct类型的buf
     */
    ByteBuf ioBuffer();

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     与上面不同的是,多一个初始化容量参数
     */
    ByteBuf ioBuffer(int initialCapacity);

    /**
     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
     与上面不同的是,限制了最大容量
     */
    ByteBuf ioBuffer(int initialCapacity, int maxCapacity);
    //分配一个heap buf
    /**
     * Allocate a heap {@link ByteBuf}.
     */
    ByteBuf heapBuffer();

    /**
     * Allocate a heap {@link ByteBuf} with the given initial capacity.
     */
    ByteBuf heapBuffer(int initialCapacity);

    /**
     * Allocate a heap {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity.
     */
    ByteBuf heapBuffer(int initialCapacity, int maxCapacity);
    //分配一个direct buf
    /**
     * Allocate a direct {@link ByteBuf}.
     */
    ByteBuf directBuffer();

    /**
     * Allocate a direct {@link ByteBuf} with the given initial capacity.
     */
    ByteBuf directBuffer(int initialCapacity);

    /**
     * Allocate a direct {@link ByteBuf} with the given initial capacity and the given
     * maximal capacity.
     */
    ByteBuf directBuffer(int initialCapacity, int maxCapacity);
    //分配复合buf,具体为direct还是heap,依赖于具体的实现
    /**
     * Allocate a {@link CompositeByteBuf}.
     * If it is a direct or heap buffer depends on the actual implementation.
     */
    CompositeByteBuf compositeBuffer();

    /**
     * Allocate a {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     * If it is a direct or heap buffer depends on the actual implementation.
     */
    CompositeByteBuf compositeBuffer(int maxNumComponents);
    //分配heap类型的复合buf
    /**
     * Allocate a heap {@link CompositeByteBuf}.
     */
    CompositeByteBuf compositeHeapBuffer();

    /**
     * Allocate a heap {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     */
    CompositeByteBuf compositeHeapBuffer(int maxNumComponents);
    //分配direct类型的复合buf
    /**
     * Allocate a direct {@link CompositeByteBuf}.
     */
    CompositeByteBuf compositeDirectBuffer();

    /**
     * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.
     */
    CompositeByteBuf compositeDirectBuffer(int maxNumComponents);

    /**
     * Returns {@code true} if direct {@link ByteBuf}'s are pooled
     判断buffer是否为direct类型的池buf
     */
    boolean isDirectBufferPooled();

    /**
     * Calculate the new capacity of a {@link ByteBuf} that is used when a {@link ByteBuf} needs to expand by the
     * {@code minNewCapacity} with {@code maxCapacity} as upper-bound.
     当buf在新的最小容量为minNewCapacity,最大容量为情况maxCapacity下,需要扩展时,计算buf的新容量
     */
    int calculateNewCapacity(int minNewCapacity, int maxCapacity);
 }

从上面来看字节buf分配器,主要定义了分配buf的方法,bufffer*方法分配的buffer具体类型依赖于具体的实现,可能为heap,可能为direct类型;ioBuffer*方法分配的buf,优先为direct类型;directBuffer*方法,分配的是direct类型的buf;heapBuffer*分配的是heap类型的buf,compositeBuffer方法,分配的是复合buf,具体可能为heap,可能为direct类型,
依赖于具体的实现;compositeDirectBuffer*分配的为复合direct  buf;compositeHeapBuffer*方法分配的为复合heap buf。

再来看默认的字节buf分配器
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;


public final class ByteBufUtil {
 static {
        String allocType = SystemPropertyUtil.get(
                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();
        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }
        DEFAULT_ALLOCATOR = alloc;

        THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024);
        logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);

        MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);
        logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
    }
}

从字节buf工具类,可以看出,如果系统属性io.netty.allocator.type,配置的为unpooled,
则默认的buf为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,对于Android平台,默认为UnpooledByteBufAllocator。

//UnpooledByteBufAllocator
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {

    private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();
    private final boolean disableLeakDetector;

    /**
     * Default instance which uses leak-detection for direct buffers.
     */
    public static final UnpooledByteBufAllocator DEFAULT =
            new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
	    ...
}

//PooledByteBufAllocator
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
 public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
	   ...
}


//WriteBufferWaterMark
package io.netty.channel;

/**
 * WriteBufferWaterMark is used to set low water mark and high water mark for the write buffer.
 * <p>
 * If the number of bytes queued in the write buffer exceeds the
 * {@linkplain #high high water mark}, {@link Channel#isWritable()}
 * will start to return {@code false}.
 * <p>
 * If the number of bytes queued in the write buffer exceeds the
 * {@linkplain #high high water mark} and then
 * dropped down below the {@linkplain #low low water mark},
 * {@link Channel#isWritable()} will start to return
 * {@code true} again.
 */
public final class WriteBufferWaterMark {
    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;//默认低位掩码
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;//默认高位掩码
    public static final WriteBufferWaterMark DEFAULT =
            new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);
    private final int low;
    private final int high;
    /**
     * Create a new instance.
     *
     * @param low low water mark for write buffer.
     * @param high high water mark for write buffer
     */
    public WriteBufferWaterMark(int low, int high) {
        this(low, high, true);
    }
    /**
     * This constructor is needed to keep backward-compatibility.
     */
    WriteBufferWaterMark(int low, int high, boolean validate) {
        if (validate) {
            if (low < 0) {
                throw new IllegalArgumentException("write buffer's low water mark must be >= 0");
            }
            if (high < low) {
                throw new IllegalArgumentException(
                        "write buffer's high water mark cannot be less than " +
                                " low water mark (" + low + "): " +
                                high);
            }
        }
        this.low = low;
        this.high = high;
    }

    /**
     * Returns the low water mark for the write buffer.
     */
    public int low() {
        return low;
    }
    /**
     * Returns the high water mark for the write buffer.
     */
    public int high() {
        return high;
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder(55)
            .append("WriteBufferWaterMark(low: ")
            .append(low)
            .append(", high: ")
            .append(high)
            .append(")");
        return builder.toString();
    }
}

再来看MaxMessagesRecvByteBufAllocator:
package io.netty.channel;

/**
 * {@link RecvByteBufAllocator} that limits the number of read operations that will be attempted when a read operation
 * is attempted by the event loop.
 在事件循环中,尝试读操作是,限制读操作的尝试次数
 */
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
    /**
     * Returns the maximum number of messages to read per read loop.
     * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
     * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
     获取尝试读操作次数
     */
    int maxMessagesPerRead();

    /**
     设置最大尝试读操作次数
     * Sets the maximum number of messages to read per read loop.
     * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
     */
    MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}

//RecvByteBufAllocator
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
import io.netty.util.internal.UnstableApi;

import static io.netty.util.internal.ObjectUtil.checkNotNull;

/**
 * Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough
 * not to waste its space.
 */
public interface RecvByteBufAllocator {
    /**
     * Creates a new handle.  The handle provides the actual operations and keeps the internal information which is
     * required for predicting an optimal buffer capacity.
     创建一个新的Handle,提供实际的操作和维护buf的容量等信息
     */
    Handle newHandle();

    /**
     * @deprecated Use {@link ExtendedHandle}.
     */
    @Deprecated
    interface Handle {
        /**
         * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
         * enough not to waste its space.
	 创建一个接收字节buf,可以存放从读取的inbound数据,并且不会浪费空间
         */
        ByteBuf allocate(ByteBufAllocator alloc);

        /**
         * Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
         * capacity.
	 返回分配buf的容量
         */
        int guess();

        /**
         * Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
         * read loop.
	 重置所有累计和下一次循环需要读取的字节数计数器
         * <p>
         * This may be used by {@link #continueReading()} to determine if the read operation should complete.
	 用于#continueReading方法,决定读操作是否完成
         * 

         * This is only ever a hint and may be ignored by the implementation.
         * @param config The channel configuration which may impact this object's behavior.
         */
        void reset(ChannelConfig config);

        /**
         * Increment the number of messages that have been read for the current read loop.
         * @param numMessages The amount to increment by.
	 增加当前读循环读取的消息数
         */
        void incMessagesRead(int numMessages);

        /**
         * Set the bytes that have been read for the last read operation.
	 设置上一次读操作,读取的字节数
         * This may be used to increment the number of bytes that have been read.
	 用于增加读字节数计数器
         * @param bytes The number of bytes from the previous read operation. This may be negative if an read error
         * occurs. If a negative value is seen it is expected to be return on the next call to
         * {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
         * to this class and is not required to be enforced in {@link #continueReading()}.
	 上次读操作,读取的字节数。如果读错误发生,可以为一个负数,负数表示不需要继续读操作
         */
        void lastBytesRead(int bytes);

        /**
         * Get the amount of bytes for the previous read operation.
	 获取先前读操作,读取的数据
         * @return The amount of bytes for the previous read operation.
         */
        int lastBytesRead();

        /**
         * Set how many bytes the read operation will (or did) attempt to read.
         * @param bytes How many bytes the read operation will (or did) attempt to read.
	 设置读操作将会或已经读取多少字节数据
         */
        void attemptedBytesRead(int bytes);

        /**
         * Get how many bytes the read operation will (or did) attempt to read.
	 获取读操作将会或已经读取多少字节数据
         * @return How many bytes the read operation will (or did) attempt to read.
         */
        int attemptedBytesRead();

        /**
         * Determine if the current read loop should should continue.
	 决定一个读循环是否应该继续
         * @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
         */
        boolean continueReading();

        /**
         * The read has completed.
         */
        void readComplete();
    }
    //拓展Handle
    @SuppressWarnings("deprecation")
    @UnstableApi
    interface ExtendedHandle extends Handle {
        /**
         * Same as {@link Handle#continueReading()} except "more data" is determined by the supplier parameter.
         * @param maybeMoreDataSupplier A supplier that determines if there maybe more data to read.
	 由于参数maybeMoreDataSupplier决定是否需要继续读取数据。
         */
        boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier);
    }

    /**
     * A {@link Handle} which delegates all call to some other {@link Handle}.
     Hanlder代理
     */
    class DelegatingHandle implements Handle {
        private final Handle delegate;//代理Handle

        public DelegatingHandle(Handle delegate) {
            this.delegate = checkNotNull(delegate, "delegate");
        }

        /**
         * Get the {@link Handle} which all methods will be delegated to.
         * @return the {@link Handle} which all methods will be delegated to.
         */
        protected final Handle delegate() {
            return delegate;
        }

        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return delegate.allocate(alloc);
        }

        @Override
        public int guess() {
            return delegate.guess();
        }

        @Override
        public void reset(ChannelConfig config) {
            delegate.reset(config);
        }

        @Override
        public void incMessagesRead(int numMessages) {
            delegate.incMessagesRead(numMessages);
        }

        @Override
        public void lastBytesRead(int bytes) {
            delegate.lastBytesRead(bytes);
        }

        @Override
        public int lastBytesRead() {
            return delegate.lastBytesRead();
        }

        @Override
        public boolean continueReading() {
            return delegate.continueReading();
        }

        @Override
        public int attemptedBytesRead() {
            return delegate.attemptedBytesRead();
        }

        @Override
        public void attemptedBytesRead(int bytes) {
            delegate.attemptedBytesRead(bytes);
        }

        @Override
        public void readComplete() {
            delegate.readComplete();
        }
    }
}



//UncheckedBooleanSupplier
package io.netty.util;

/**
 * Represents a supplier of {@code boolean}-valued results which doesn't throw any checked exceptions.
 */
public interface UncheckedBooleanSupplier extends BooleanSupplier {
    /**
     * Gets a boolean value.
     * @return a boolean value.
     */
    @Override
    boolean get();

    /**
     * A supplier which always returns {@code false} and never throws.
     */
    UncheckedBooleanSupplier FALSE_SUPPLIER = new UncheckedBooleanSupplier() {
        @Override
        public boolean get() {
            return false;
        }
    };

    /**
     * A supplier which always returns {@code true} and never throws.
     */
    UncheckedBooleanSupplier TRUE_SUPPLIER = new UncheckedBooleanSupplier() {
        @Override
        public boolean get() {
            return true;
        }
    };
}

//MessageSizeEstimator

package io.netty.channel;

/**
 * Responsible to estimate size of a message. The size represent how much memory the message will ca. reserve in
 * memory.
 */
public interface MessageSizeEstimator {

    /**
     * Creates a new handle. The handle provides the actual operations.
     */
    Handle newHandle();

    interface Handle {

        /**
         * Calculate the size of the given message.
         *计算给定消息的大小
         * @param msg       The message for which the size should be calculated
         * @return size     The size in bytes. The returned size must be >= 0
         */
        int size(Object msg);
    }
}


0
1
分享到:
评论

相关推荐

    learning-netty

    定义了通道的基本操作。 #### 类AbstractChannel 这是一个抽象类,实现了`Channel`接口,提供了部分通用的行为。 #### ChannelFactory 用于创建新的`Channel`实例。 #### ChannelUnsafe 提供了一些不安全的操作,...

    JAVA netty 获取串口数据并且下发数据

    Netty的ChannelHandlerContext则用来管理与通道相关的上下文信息,如发送和接收数据。 在Netty中,我们定义一个ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter来处理串口的输入和输出事件。例如,...

    Netty实现前置系统

    在系统B中,我们可以利用Netty的ChannelHandler接口来定义业务逻辑,处理来自系统A的短连接请求,同时保持与系统C的长连接。ChannelHandlerContext对象用于在ChannelHandler之间传递消息,实现事件驱动的通信。 ...

    Spring+Netty+WebSocket实例

    3. **Netty服务器**:创建一个Netty服务器,配置WebSocket服务器通道处理器,处理WebSocket的连接请求和数据帧。 4. **Spring与Netty的集成**:通过Spring的`WebSocketMessageBrokerConfigurer`接口,将Spring的...

    netty整合SpringMVC实现下载

    - Netty 使用非阻塞 I/O(NIO)模型,通过事件循环(EventLoop)和通道(Channel)处理网络连接,提高了处理大量并发连接的能力。 - 事件驱动架构:Netty 通过 ChannelHandler 处理各种网络事件,如连接建立、数据...

    基于java netty的udp客户端声呐数据对接

    首先,我们需要定义一个UDPChannelHandler,这是Netty中的处理器接口,负责处理接收到的声呐数据和发送出去的命令。我们可以重写其channelRead方法来处理接收到的数据,例如解码JSON格式的声呐信息。JSON...

    netty资料.rar

    1. **异步事件驱动**:Netty 使用了非阻塞I/O模型,基于Java NIO(非阻塞输入/输出)库,通过事件循环(EventLoop)和通道(Channel)机制,实现了高并发性能,降低了系统的资源消耗。 2. **高效的数据处理**:...

    java应用netty服务端和客户端

    3. **ServerChannel**:服务器端的通道接口,如NioServerSocketChannel,用于监听和接受新的连接。 4. **ChannelInitializer**:用于添加服务器端的处理器链,比如解码器、编码器、业务逻辑处理器等。 5. **...

    netty与socket交互实例

    此外,Netty的ChannelHandler接口允许我们定义业务逻辑,而EventLoopGroup则负责线程管理和事件调度。 对比之下,Socket是基于TCP/IP协议族的低级通信接口,它提供了进程间通信的基础。在Java中,Socket类代表一个...

    spring-netty.zip

    2. **构建Netty服务器**: 初始化Netty的ServerBootstrap,配置Bootstrap的Group、ChildGroup、通道处理器链等。 3. **创建UDP ChannelHandler**: 实现自定义的`ChannelInboundHandler`,在`channelRead`方法中处理...

    netty及时通讯通讯DEMO

    开发者可以通过实现ChannelInboundHandler和ChannelOutboundHandler接口,定义自己的业务逻辑。 5. **Pipeline**: Pipeline是一个处理链,每个ChannelHandler都在链上占据一个位置。数据在网络中传输时,会经过...

    Java_NIO框架Netty教程.pdf

    通过上述简单的服务端和客户端示例,我们可以看出Netty框架在服务器和客户端的代码结构上保持了一致性,都利用了ChannelHandler来处理事件,并且Netty通过一系列的接口和抽象类来提供灵活的编程模型。 Netty还提供...

    springboot+netty 实现简单的一对一聊天

    - 创建一个`WebSocketConfig`类,继承自`WebSocketMessageBrokerConfigurer`,并实现其方法,如`enableSimpleBroker`、`registerStompEndpoints`等,以定义STOMP(Simple Text Oriented Message Protocol)消息代理...

    netty聊天室实例

    - 在处理链中,`ChannelHandlerContext`是上下文对象,用于触发I/O事件、获取引用到下一个处理器、关闭通道等。 6. **多路复用与NIO选择器**: - Netty利用Java NIO的多路复用器(Selector)来监听和调度多个连接...

    c++客户端和java(Netty)服务器端tcp通讯

    利用Netty,我们可以轻松实现TCP服务器,创建ServerBootstrap,配置通道处理器pipeline,监听客户端连接,并处理接收到的数据。 Netty中的ChannelHandlerContext类是事件处理的核心,它提供了发送和接收数据的方法...

    netty学习之ServerChannel

    这个类提供了一种配置服务器端点的方式,包括通道类型、管道工厂、线程池等。通过`bind()`方法,我们可以将ServerBootstrap绑定到指定的IP地址和端口上,从而启动监听服务。 2. **ChannelFuture**:`bind()`方法...

    Netty4 使用

    1. **异步事件驱动**:Netty 使用非阻塞I/O模型,通过EventLoop(事件循环)和Channel(通道)处理网络I/O操作,极大地提高了系统的并发能力。 2. **高效的缓冲区**:Netty 提供了ByteBuf,这是一个高性能的字节...

    netty通讯示例

    `META-INF`目录通常存储元数据,如服务定义和服务提供者接口(SPI)信息;`WEB-INF`目录则包含Web应用的配置文件(如`web.xml`)、库(JAR文件)和其他不直接暴露给用户的资源。 在学习这个示例时,你需要关注以下...

    基于netty的nio使用demo源码

    在Netty中,通道是一个接口,它提供了读写数据的方法,比如read()和write()。 2. **缓冲区(Buffer)**:缓冲区是NIO的核心,用于存储数据。在Netty中,有多种类型的缓冲区,如ByteBuffer、CharBuffer、...

    Java Netty基于对象数据传输Demo

    服务器端会创建一个`ServerBootstrap`,配置线程池、通道工厂、管道等,然后绑定一个端口开始监听。客户端使用`Bootstrap`,配置好对应的参数,连接到服务器。在服务器和客户端的管道中,都需要添加`ObjectDecoder`...

Global site tag (gtag.js) - Google Analytics