`
Donald_Draper
  • 浏览: 978838 次
社区版块
存档分类
最新评论

netty 通道接口定义

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 抽象BootStrap定义:http://donald-draper.iteye.com/blog/2392492
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
netty Bootstrap解析:http://donald-draper.iteye.com/blog/2392593
在前面看服务端和客户端引导配置的时候,我们配置完Nio事件循环组之后配置的是Nio通道,
在服务端和客户端的相关代码如下:
服务端:
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
	//ServerBootstrap,用于配置服务端,一般为ServerSocket通道
    ServerBootstrap serverBoot = new ServerBootstrap(); 
    serverBoot.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class) 


客户端:
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//Bootstrap,用于配置客户端,一般为Socket通道
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)

从代码片段中可以,看出服务端的通道为NioServerSocketChannel,而客户端的为
NioSocketChannel我们先来看一下两种通道的声明定义:
先来看NioServerSocketChannel的声明定义:
//NioServerSocketChannel
/**
 * A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses
 * NIO selector based implementation to accept new connections.
 基于nio选择器的服务端socket通道的实现,用于接受新的连接
 */
public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {


/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
 */
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {



/**
 * Abstract base class for {@link Channel} implementations which use a Selector based approach.
 */
public abstract class AbstractNioChannel extends AbstractChannel {


/**
 * A skeletal {@link Channel} implementation.
 */
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {


public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {


再来看另外一个分支ServerSocketChannel的声明定义:
/**
 * A TCP/IP {@link ServerChannel} which accepts incoming TCP/IP connections.
 */
public interface ServerSocketChannel extends ServerChannel {

/**
 * A {@link Channel} that accepts an incoming connection attempt and creates
 * its child {@link Channel}s by accepting them.  {@link ServerSocketChannel} is
 * a good example.
 */
public interface ServerChannel extends Channel {



再来看NioSocketChannel的声明定义:
//NioSocketChannel
/**
 * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
 基于nio选择器的socket通道
 */
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {


/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
 抽象nio字节通道
 */
public abstract class AbstractNioByteChannel extends AbstractNioChannel {



再来看另外一个分支SocketChannel:
/**
 * A TCP/IP socket {@link Channel}.
 */
public interface SocketChannel extends DuplexChannel {


/**
 * A duplex {@link Channel} that has two sides that can be shutdown independently.
 */
public interface DuplexChannel extends Channel {


从上面可以看出Nio服务端socket通道和socket通道都是一种通道,不同的时服务端socket通道是,基于消息的通道,而socket通道是基于字节的通道,不同的是Server Socket通道用于接受socket连接请求,而socket通道用于客户端,连接Server通道,或者Server通道接收客户端的请求,产生与客户端交互的Socket通道。

为了理解NioSocketChannel和NioServerSocketChannel的作用,我们先从通道接口定义来看起:
//Channel
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.AttributeMap;

import java.net.InetSocketAddress;
import java.net.SocketAddress;


/**
 * A nexus to a network socket or a component which is capable of I/O
 * operations such as read, write, connect, and bind.
 * 
 * A channel provides a user:
 * [list]
 * [*]the current state of the channel (e.g. is it open? is it connected?),

 * [*]the {@linkplain ChannelConfig configuration parameters} of the channel (e.g. receive buffer size),

 * [*]the I/O operations that the channel supports (e.g. read, write, connect, and bind), and

 * <li>the {@link ChannelPipeline} which handles all I/O events and requests
 *     associated with the channel.</li>
 * [/list]
 *
 * <h3>All I/O operations are asynchronous.</h3>
 * <p>
 * All I/O operations in Netty are asynchronous.  It means any I/O calls will
 * return immediately with no guarantee that the requested I/O operation has
 * been completed at the end of the call.  Instead, you will be returned with
 * a {@link ChannelFuture} instance which will notify you when the requested I/O
 * operation has succeeded, failed, or canceled.
 *
 * <h3>Channels are hierarchical</h3>
 * <p>
 * A {@link Channel} can have a {@linkplain #parent() parent} depending on
 * how it was created.  For instance, a {@link SocketChannel}, that was accepted
 * by {@link ServerSocketChannel}, will return the {@link ServerSocketChannel}
 * as its parent on {@link #parent()}.
 * <p>
 * The semantics of the hierarchical structure depends on the transport
 * implementation where the {@link Channel} belongs to.  For example, you could
 * write a new {@link Channel} implementation that creates the sub-channels that
 * share one socket connection, as [url=http://beepcore.org/]BEEP[/url] and
 * [url=http://en.wikipedia.org/wiki/Secure_Shell]SSH[/url] do.
 *
 * <h3>Downcast to access transport-specific operations</h3>
 * <p>
 * Some transports exposes additional operations that is specific to the
 * transport.  Down-cast the {@link Channel} to sub-type to invoke such
 * operations.  For example, with the old I/O datagram transport, multicast
 * join / leave operations are provided by {@link DatagramChannel}.
 *
 * <h3>Release resources</h3>
 * <p>
 * It is important to call {@link #close()} or {@link #close(ChannelPromise)} to release all
 * resources once you are done with the {@link Channel}. This ensures all resources are
 * released in a proper way, i.e. filehandles.
 */
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

    /**
     * Returns the globally unique identifier of this {@link Channel}.
     返回通道全局唯一id
     */
    ChannelId id();

    /**
     * Return the {@link EventLoop} this {@link Channel} was registered to.
     返回通道注册的事件循环
     */
    EventLoop eventLoop();

    /**
     * Returns the parent of this channel.
     * 返回所属通道
     * @return the parent channel.
     *         {@code null} if this channel does not have a parent channel.
     */
    Channel parent();

    /**
     * Returns the configuration of this channel.
     返回通道配置
     */
    ChannelConfig config();

    /**
     * Returns {@code true} if the {@link Channel} is open and may get active later
     判断通道是否打开
     */
    boolean isOpen();

    /**
     * Returns {@code true} if the {@link Channel} is registered with an {@link EventLoop}.
     判断通道是否注册到事件循环
     */
    boolean isRegistered();

    /**
     * Return {@code true} if the {@link Channel} is active and so connected.
     判断当前同时是否激活,及连接
     */
    boolean isActive();

    /**
     * Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}.
     返回通道元数据
     */
    ChannelMetadata metadata();

    /**
     * Returns the local address where this channel is bound to.  The returned
     * {@link SocketAddress} is supposed to be down-cast into more concrete
     * type such as {@link InetSocketAddress} to retrieve the detailed
     * information.
     * 返回通道绑定的socket地址
     * @return the local address of this channel.
     *         {@code null} if this channel is not bound.
     */
    SocketAddress localAddress();

    /**
     * Returns the remote address where this channel is connected to.  The
     * returned {@link SocketAddress} is supposed to be down-cast into more
     * concrete type such as {@link InetSocketAddress} to retrieve the detailed
     * information.
     *返回远端socket地址。如果没有连接,返回null。对于报文通道,返回null
     * @return the remote address of this channel.
     *         {@code null} if this channel is not connected.
     *         If this channel is not connected but it can receive messages
     *         from arbitrary remote addresses (e.g. {@link DatagramChannel},
     *         use {@link DatagramPacket#recipient()} to determine
     *         the origination of the received message as this method will
     *         return {@code null}.
     */
    SocketAddress remoteAddress();

    /**
     * Returns the {@link ChannelFuture} which will be notified when this
     * channel is closed.  This method always returns the same future instance.
     返回通道任务结果,当通道关闭,将会通知通道任务结果。此方总是返回同一任务实例
     */
    ChannelFuture closeFuture();

    /**
     * Returns {@code true} if and only if the I/O thread will perform the
     * requested write operation immediately.  Any write requests made when
     * this method returns {@code false} are queued until the I/O thread is
     * ready to process the queued write requests.
     当且仅当IO线程立刻自行写请求操作。当方法返回false时,写请求被放在队列中,直到
     有IO线程准备处理写请求队列
     */
    boolean isWritable();

    /**
     * Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
     * This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
     在判断是否可写方法返回false前,估算有多少字节需要发送,反之的值,总是非负数,如果#isWritable方法,
     返回false,则此方法返回0。此方法的作用就是判断在IO线程处理写请求队列时,判断有多少字节要发送。
     */
    long bytesBeforeUnwritable();

    /**
     * Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}.
     * This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
     此方法与上述方法相反,用于估算在#isWritable方法返回true前,写请求队列中待发送的字节数
     */
    long bytesBeforeWritable();

    /**
     * Returns an [i]internal-use-only[/i] object that provides unsafe operations.
     返回通道内部线程线程安全操作辅助Unsafe
     */
    Unsafe unsafe();

    /**
     * Return the assigned {@link ChannelPipeline}.
     返回内部的Channel管道
     */
    ChannelPipeline pipeline();

    /**
     * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
     返回通道的字节buf分配器
     */
    ByteBufAllocator alloc();
   //下面两个方法时重写了ChannelOutboundInvoker的方法
    @Override
    Channel read();

    @Override
    Channel flush();

    /**
     * [i]Unsafe[/i] operations that should [i]never[/i] be called from user-code. These methods
     * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
     * following methods:
     Unsafe不可以,用户代码直接调用,这些方法仅仅能被实际的transport实现,必须被IO线程调用,除了如下的方法
     * [list]
     *   [*]{@link #localAddress()}

     *   [*]{@link #remoteAddress()}

     *   [*]{@link #closeForcibly()}

     *   [*]{@link #register(EventLoop, ChannelPromise)}

     *   [*]{@link #deregister(ChannelPromise)}

     *   [*]{@link #voidPromise()}

     * [/list]
     */
    interface Unsafe {

        /**
         * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
         * receiving data.
	 返回Unsafe内部的接收字节部分分配器Handle,用于分配接收数据时,存放数据的字节部分。
         */
        RecvByteBufAllocator.Handle recvBufAllocHandle();

        /**
         * Return the {@link SocketAddress} to which is bound local or
         * {@code null} if none.
	 返回绑定socket地址
         */
        SocketAddress localAddress();

        /**
         * Return the {@link SocketAddress} to which is bound remote or
         * {@code null} if none is bound yet.
	 返回远端socket地址,没有绑定,则为null
         */
        SocketAddress remoteAddress();

        /**
         * Register the {@link Channel} of the {@link ChannelPromise} and notify
         * the {@link ChannelFuture} once the registration was complete.
	 注册通道到事件循环,当注册完成时,通知可写任务结果ChannelPromise
         */
        void register(EventLoop eventLoop, ChannelPromise promise);

        /**
         * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
         * it once its done.
	 绑定socket地址,绑定完毕,通知可写任务结果ChannelPromise
         */
        void bind(SocketAddress localAddress, ChannelPromise promise);

        /**
         * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
         * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
         * pass {@code null} to it.
         *
         * The {@link ChannelPromise} will get notified once the connect operation was complete.
	 连接远端socket地址,如果需要本地socket地址,则传入,没有则传null
         */
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

        /**
         * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
         * operation was complete.
	 断开通道连接,待完成,通知可写任务结果ChannelPromise
         */
        void disconnect(ChannelPromise promise);

        /**
         * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
         * operation was complete.
	 关闭通道,待完成,通知可写任务结果ChannelPromise
         */
        void close(ChannelPromise promise);

        /**
         * Closes the {@link Channel} immediately without firing any events.  Probably only useful
         * when registration attempt failed.
	 在不触发任务事件的情况下,关闭通道
         */
        void closeForcibly();

        /**
         * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
         * {@link ChannelPromise} once the operation was complete.
	 从事件循环,反注册通道,待完成,通知可写任务结果ChannelPromise
         */
        void deregister(ChannelPromise promise);

        /**
         * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
         * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
	 调度一个读操作,填充字节数据到管道中第一个Inbound处理器的inbound buf中。如果有一个正在读取的操作,则
	 此方法什么都不做

         */
        void beginRead();

        /**
         * Schedules a write operation.
	 调度一下写操作
         */
        void write(Object msg, ChannelPromise promise);

        /**
         * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
	 通过#write,刷新所有写请求操作
         */
        void flush();

        /**
         * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
         * It will never be notified of a success or error and so is only a placeholder for operations
         * that take a {@link ChannelPromise} as argument but for which you not want to get notified.
	 返回一个可以重用和转递Unsafe操作的可选通道任务结果。当操作失败或成功时,不会被通知。
         */
        ChannelPromise voidPromise();

        /**
         * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
        返回一个Outbound buf,用于存放写请求。
	 */
        ChannelOutboundBuffer outboundBuffer();
    }
}

从上面可以看出通道,关联一个事件循环,及通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器;一个字节buf分配器ByteBufAllocator,用于分配字节buf,还有一些获取通道状态的方式,是否注册到事件循环,通道是否打开,是否可写;另外还要获取通道配置ChannelConfig,通道元数据ChannelMetadata的方法;最重要的是,关联一个Unsafe,用于通道的地址绑定,连接操作以及断开,通道的读写,注册到事件循环以及反注册。

再来看ServerSocketChanel接口定义:


//ServerSocketChannel
package io.netty.channel.socket;

import io.netty.channel.ServerChannel;

import java.net.InetSocketAddress;

/**
 * A TCP/IP {@link ServerChannel} which accepts incoming TCP/IP connections.
 */
public interface ServerSocketChannel extends ServerChannel {
    @Override
    ServerSocketChannelConfig config();
    @Override
    InetSocketAddress localAddress();
    @Override
    InetSocketAddress remoteAddress();
}

//ServerChannel
package io.netty.channel;

import io.netty.channel.socket.ServerSocketChannel;

/**
 * A {@link Channel} that accepts an incoming connection attempt and creates
 * its child {@link Channel}s by accepting them.  {@link ServerSocketChannel} is
 * a good example.
 */
public interface ServerChannel extends Channel {
    // This is a tag interface.
}

从上面来看ServerSocketChannel没有什么特殊的,只是标注是一个可以接受客户端连接的通道。
再来看SocketChanel接口定义:

//SocketChannel
package io.netty.channel.socket;

import io.netty.channel.Channel;

import java.net.InetSocketAddress;

/**
 * A TCP/IP socket {@link Channel}.
 */
public interface SocketChannel extends DuplexChannel {
    @Override
    ServerSocketChannel parent();

    @Override
    SocketChannelConfig config();
    @Override
    InetSocketAddress localAddress();
    @Override
    InetSocketAddress remoteAddress();
}

//DuplexChannel
package io.netty.channel.socket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;

import java.net.Socket;

/**
 * A duplex {@link Channel} that has two sides that can be shutdown independently.
 */
public interface DuplexChannel extends Channel {
    /**
     * Returns {@code true} if and only if the remote peer shut down its output so that no more
     * data is received from this channel.  Note that the semantic of this method is different from
     * that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}.
     当且仅当,远端peer关闭输出流,通道不在有数据要接受时,返回true。需要注意的是,此方法不同于
     Socket#shutdownInput和Socket#isInputShutdown方法。
     */
    boolean isInputShutdown();

    /**
     * @see Socket#shutdownInput()
     关闭通道socket输入流
     */
    ChannelFuture shutdownInput();

    /**
     * Will shutdown the input and notify {@link ChannelPromise}.
     *与上面不同的是,多了一个异步任务结果
     * @see Socket#shutdownInput()
     */
    ChannelFuture shutdownInput(ChannelPromise promise);

    /**
     * @see Socket#isOutputShutdown()
     判断是否关闭socket输出流
     */
    boolean isOutputShutdown();

    /**
     * @see Socket#shutdownOutput()
     关闭socket输出流
     */
    ChannelFuture shutdownOutput();

    /**
     * Will shutdown the output and notify {@link ChannelPromise}.
     *与上面不同的是,多了一个异步任务结果
     * @see Socket#shutdownOutput()
     */
    ChannelFuture shutdownOutput(ChannelPromise promise);

    /**
     * Determine if both the input and output of this channel have been shutdown.
     判断输出流和输入流是否都关闭
     */
    boolean isShutdown();
    /**
     * Will shutdown the input and output sides of this channel.
     * @return will be completed when both shutdown operations complete.
     关闭通道的输入输出流,完成时,并通知异步任务结果ChannelFuture
     */
    ChannelFuture shutdown();
    /**
     * Will shutdown the input and output sides of this channel.
     * @param promise will be completed when both shutdown operations complete.
     * @return will be completed when both shutdown operations complete.
     关闭通道的输入输出流,完成时,并通知异步可写任务结果ChannelPromise
     */
    ChannelFuture shutdown(ChannelPromise promise);
}

从上面看出Socket通道实际上是一个可写可读的流通道。

总结:

通道Channel,关联一个事件循环,及通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器;一个字节buf分配器ByteBufAllocator,用于分配字节buf,还有一些获取通道状态的方式,是否注册到事件循环,通道是否打开,是否可写;另外还要获取通道配置ChannelConfig,通道元数据ChannelMetadata的方法;最重要的是,关联一个Unsafe,用于通道的地址绑定,连接操作以及断开,通道的读写,注册到事件循环以及反注册。

附:
//ChannelId
package io.netty.channel;

import java.io.Serializable;

/**
 * Represents the globally unique identifier of a {@link Channel}.
 * <p>
 * The identifier is generated from various sources listed in the following:
 * [list]
 * [*]MAC address (EUI-48 or EUI-64) or the network adapter, preferably a globally unique one,

 * [*]the current process ID,

 * [*]{@link System#currentTimeMillis()},

 * [*]{@link System#nanoTime()},

 * [*]a random 32-bit integer, and

 * [*]a sequentially incremented 32-bit integer.

 * [/list]
 * 

 * 
 * The global uniqueness of the generated identifier mostly depends on the MAC address and the current process ID,
 * which are auto-detected at the class-loading time in best-effort manner.  If all attempts to acquire them fail,
 * a warning message is logged, and random values will be used instead.  Alternatively, you can specify them manually
 * via system properties:
 * [list]
 * <li>{@code io.netty.machineId} - hexadecimal representation of 48 (or 64) bit integer,
 *     optionally separated by colon or hyphen.</li>
 * [*]{@code io.netty.processId} - an integer between 0 and 65535

 * [/list]
 * 

 */
public interface ChannelId extends Serializable, Comparable<ChannelId> {
    /**
     * Returns the short but globally non-unique string representation of the {@link ChannelId}.
     */
    String asShortText();

    /**
     * Returns the long yet globally unique string representation of the {@link ChannelId}.
     */
    String asLongText();
}


//ChannelConfig
package io.netty.channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig;

import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Map;

/**
 * A set of configuration properties of a {@link Channel}.
 * 
 * Please down-cast to more specific configuration type such as
 * {@link SocketChannelConfig} or use {@link #setOptions(Map)} to set the
 * transport-specific properties:
 * <pre>
 * {@link Channel} ch = ...;
 * {@link SocketChannelConfig} cfg = [b]({@link SocketChannelConfig}) ch.getConfig();[/b]
 * cfg.setTcpNoDelay(false);
 * </pre>
 *
 * <h3>Option map</h3>
 *
 * An option map property is a dynamic write-only property which allows
 * the configuration of a {@link Channel} without down-casting its associated
 * {@link ChannelConfig}.  To update an option map, please call {@link #setOptions(Map)}.
 * <p>
 * All {@link ChannelConfig} has the following options:
 *
 * <table border="1" cellspacing="0" cellpadding="6">
 * <tr>
 * <th>Name</th><th>Associated setter method</th>
 * </tr><tr>
 * <td>{@link ChannelOption#CONNECT_TIMEOUT_MILLIS}</td><td>{@link #setConnectTimeoutMillis(int)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#WRITE_SPIN_COUNT}</td><td>{@link #setWriteSpinCount(int)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#WRITE_BUFFER_WATER_MARK}</td><td>{@link #setWriteBufferWaterMark(WriteBufferWaterMark)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#ALLOCATOR}</td><td>{@link #setAllocator(ByteBufAllocator)}</td>
 * </tr><tr>
 * <td>{@link ChannelOption#AUTO_READ}</td><td>{@link #setAutoRead(boolean)}</td>
 * </tr>
 * </table>
 * <p>
 * More options are available in the sub-types of {@link ChannelConfig}.  For
 * example, you can configure the parameters which are specific to a TCP/IP
 * socket as explained in {@link SocketChannelConfig}.
 */
public interface ChannelConfig {

    /**
     * Return all set {@link ChannelOption}'s.
     */
    Map<ChannelOption<?>, Object> getOptions();

    /**
     * Sets the configuration properties from the specified {@link Map}.
     */
    boolean setOptions(Map<ChannelOption<?>, ?> options);

    /**
     * Return the value of the given {@link ChannelOption}
     */
    <T> T getOption(ChannelOption<T> option);

    /**
     * Sets a configuration property with the specified name and value.
     * To override this method properly, you must call the super class:
     * <pre>
     * public boolean setOption(ChannelOption<T> option, T value) {
     *     if (super.setOption(option, value)) {
     *         return true;
     *     }
     *
     *     if (option.equals(additionalOption)) {
     *         ....
     *         return true;
     *     }
     *
     *     return false;
     * }
     * </pre>
     *
     * @return {@code true} if and only if the property has been set
     */
    <T> boolean setOption(ChannelOption<T> option, T value);

    /**
     * Returns the connect timeout of the channel in milliseconds.  If the
     * {@link Channel} does not support connect operation, this property is not
     * used at all, and therefore will be ignored.
     *
     * @return the connect timeout in milliseconds.  {@code 0} if disabled.
     */
    int getConnectTimeoutMillis();

    /**
     * Sets the connect timeout of the channel in milliseconds.  If the
     * {@link Channel} does not support connect operation, this property is not
     * used at all, and therefore will be ignored.
     *
     * @param connectTimeoutMillis the connect timeout in milliseconds.
     *                             {@code 0} to disable.
     */
    ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis);

    /**
     * @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
     * <p>
     * Returns the maximum number of messages to read per read loop.
     * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
     * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
     */
    @Deprecated
    int getMaxMessagesPerRead();

    /**
     * @deprecated Use {@link MaxMessagesRecvByteBufAllocator}
     * <p>
     * Sets the maximum number of messages to read per read loop.
     * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
     */
    @Deprecated
    ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead);

    /**
     * Returns the maximum loop count for a write operation until
     * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
     * It is similar to what a spin lock is used for in concurrency programming.
     * It improves memory utilization and write throughput depending on
     * the platform that JVM runs on.  The default value is {@code 16}.
     */
    int getWriteSpinCount();

    /**
     * Sets the maximum loop count for a write operation until
     * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
     * It is similar to what a spin lock is used for in concurrency programming.
     * It improves memory utilization and write throughput depending on
     * the platform that JVM runs on.  The default value is {@code 16}.
     *
     * @throws IllegalArgumentException
     *         if the specified value is {@code 0} or less than {@code 0}
     */
    ChannelConfig setWriteSpinCount(int writeSpinCount);

    /**
     * Returns {@link ByteBufAllocator} which is used for the channel
     * to allocate buffers.
     */
    ByteBufAllocator getAllocator();

    /**
     * Set the {@link ByteBufAllocator} which is used for the channel
     * to allocate buffers.
     */
    ChannelConfig setAllocator(ByteBufAllocator allocator);

    /**
     * Returns {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
     */
    <T extends RecvByteBufAllocator> T getRecvByteBufAllocator();

    /**
     * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
     */
    ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator);

    /**
     * Returns {@code true} if and only if {@link ChannelHandlerContext#read()} will be invoked automatically so that
     * a user application doesn't need to call it at all. The default value is {@code true}.
     */
    boolean isAutoRead();

    /**
     * Sets if {@link ChannelHandlerContext#read()} will be invoked automatically so that a user application doesn't
     * need to call it at all. The default value is {@code true}.
     */
    ChannelConfig setAutoRead(boolean autoRead);

    /**
     * @deprecated  Auto close will be removed in a future release.
     *
     * Returns {@code true} if and only if the {@link Channel} will be closed automatically and immediately on
     * write failure.  The default is {@code false}.
     */
    @Deprecated
    boolean isAutoClose();

    /**
     * @deprecated  Auto close will be removed in a future release.
     *
     * Sets whether the {@link Channel} should be closed automatically and immediately on write failure.
     * The default is {@code false}.
     */
    @Deprecated
    ChannelConfig setAutoClose(boolean autoClose);

    /**
     * Returns the high water mark of the write buffer.  If the number of bytes
     * queued in the write buffer exceeds this value, {@link Channel#isWritable()}
     * will start to return {@code false}.
     */
    int getWriteBufferHighWaterMark();

    /**
     * <p>
     * Sets the high water mark of the write buffer.  If the number of bytes
     * queued in the write buffer exceeds this value, {@link Channel#isWritable()}
     * will start to return {@code false}.
     */
    ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);

    /**
     * Returns the low water mark of the write buffer.  Once the number of bytes
     * queued in the write buffer exceeded the
     * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
     * dropped down below this value, {@link Channel#isWritable()} will start to return
     * {@code true} again.
     */
    int getWriteBufferLowWaterMark();

    /**
     * <p>
     * Sets the low water mark of the write buffer.  Once the number of bytes
     * queued in the write buffer exceeded the
     * {@linkplain #setWriteBufferHighWaterMark(int) high water mark} and then
     * dropped down below this value, {@link Channel#isWritable()} will start to return
     * {@code true} again.
     */
    ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);

    /**
     * Returns {@link MessageSizeEstimator} which is used for the channel
     * to detect the size of a message.
     */
    MessageSizeEstimator getMessageSizeEstimator();

    /**
     * Set the {@link MessageSizeEstimator} which is used for the channel
     * to detect the size of a message.
     */
    ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);

    /**
     * Returns the {@link WriteBufferWaterMark} which is used for setting the high and low
     * water mark of the write buffer.
     */
    WriteBufferWaterMark getWriteBufferWaterMark();

    /**
     * Set the {@link WriteBufferWaterMark} which is used for setting the high and low
     * water mark of the write buffer.
     */
    ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark);
}

//ChannelMetadata
package io.netty.channel;

import java.net.SocketAddress;

/**
 * Represents the properties of a {@link Channel} implementation.
 */
public final class ChannelMetadata {

    private final boolean hasDisconnect;
    private final int defaultMaxMessagesPerRead;

    /**
     * Create a new instance
     *
     * @param hasDisconnect     {@code true} if and only if the channel has the {@code disconnect()} operation
     *                          that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
     *                          again, such as UDP/IP.
     */
    public ChannelMetadata(boolean hasDisconnect) {
        this(hasDisconnect, 1);
    }

    /**
     * Create a new instance
     *
     * @param hasDisconnect     {@code true} if and only if the channel has the {@code disconnect()} operation
     *                          that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
     *                          again, such as UDP/IP.
     * @param defaultMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this value will be
     * set for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}.
     */
    public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) {
        if (defaultMaxMessagesPerRead <= 0) {
            throw new IllegalArgumentException("defaultMaxMessagesPerRead: " + defaultMaxMessagesPerRead +
                                               " (expected > 0)");
        }
        this.hasDisconnect = hasDisconnect;
        this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead;
    }

    /**
     * Returns {@code true} if and only if the channel has the {@code disconnect()} operation
     * that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)} again,
     * such as UDP/IP.
     */
    public boolean hasDisconnect() {
        return hasDisconnect;
    }

    /**
     * If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this is the default value for
     * {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}.
     */
    public int defaultMaxMessagesPerRead() {
        return defaultMaxMessagesPerRead;
    }
}

//ChannelOutboundBuffer
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

/**
 * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
 * outbound write requests.
 * <p>
 * All methods must be called by a transport implementation from an I/O thread, except the following ones:
 * [list]
 * [*]{@link #size()} and {@link #isEmpty()}

 * [*]{@link #isWritable()}

 * [*]{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}

 * [/list]
 * 

 */
public final class ChannelOutboundBuffer {
    // Assuming a 64-bit JVM:
    //  - 16 bytes object header
    //  - 8 reference fields
    //  - 2 long fields
    //  - 2 int fields
    //  - 1 boolean field
    //  - padding
    static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
            SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);

    private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
        @Override
        protected ByteBuffer[] initialValue() throws Exception {
            return new ByteBuffer[1024];
        }
    };

    private final Channel channel;//关联通道

    // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //
    // The Entry that is the first in the linked-list structure that was flushed
    private Entry flushedEntry;
    // The Entry which is the first unflushed in the linked-list structure
    private Entry unflushedEntry;
    // The Entry which represents the tail of the buffer
    private Entry tailEntry;
    // The number of flushed entries that are not written yet
    private int flushed;

    private int nioBufferCount;
    private long nioBufferSize;

    private boolean inFail;

    private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

    @SuppressWarnings("UnusedDeclaration")
    private volatile long totalPendingSize;

    private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

    @SuppressWarnings("UnusedDeclaration")
    private volatile int unwritable;

    private volatile Runnable fireChannelWritabilityChangedTask;

    ChannelOutboundBuffer(AbstractChannel channel) {
        this.channel = channel;
    }
    ...
}
0
0
分享到:
评论

相关推荐

    JAVA netty 获取串口数据并且下发数据

    Netty的ChannelHandlerContext则用来管理与通道相关的上下文信息,如发送和接收数据。 在Netty中,我们定义一个ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter来处理串口的输入和输出事件。例如,...

    netty整合SpringMVC实现下载

    - Netty 使用非阻塞 I/O(NIO)模型,通过事件循环(EventLoop)和通道(Channel)处理网络连接,提高了处理大量并发连接的能力。 - 事件驱动架构:Netty 通过 ChannelHandler 处理各种网络事件,如连接建立、数据...

    Netty 实现scoket 主动推送数据到服务和服务端实现方式

    客户端发送16进制给服务端,并行实现socket通道活动状态和断开重新连接的功能, 监听接口是否存在数据,如果存在socket客户端发送给socket服务端的实现 随着物联网的发展,随之出现了各种传感器监测数据的实时发送,...

    Spring+Netty+WebSocket实例

    4. **Spring与Netty的集成**:通过Spring的`WebSocketMessageBrokerConfigurer`接口,将Spring的消息传递系统与Netty的WebSocket服务器连接起来。这通常涉及设置WebSocket消息的订阅者和发布者。 5. **客户端支持**...

    基于java netty的udp客户端声呐数据对接

    首先,我们需要定义一个UDPChannelHandler,这是Netty中的处理器接口,负责处理接收到的声呐数据和发送出去的命令。我们可以重写其channelRead方法来处理接收到的数据,例如解码JSON格式的声呐信息。JSON...

    java应用netty服务端和客户端

    3. **ServerChannel**:服务器端的通道接口,如NioServerSocketChannel,用于监听和接受新的连接。 4. **ChannelInitializer**:用于添加服务器端的处理器链,比如解码器、编码器、业务逻辑处理器等。 5. **...

    netty资料.rar

    1. **异步事件驱动**:Netty 使用了非阻塞I/O模型,基于Java NIO(非阻塞输入/输出)库,通过事件循环(EventLoop)和通道(Channel)机制,实现了高并发性能,降低了系统的资源消耗。 2. **高效的数据处理**:...

    Netty实现前置系统

    在系统B中,我们可以利用Netty的ChannelHandler接口来定义业务逻辑,处理来自系统A的短连接请求,同时保持与系统C的长连接。ChannelHandlerContext对象用于在ChannelHandler之间传递消息,实现事件驱动的通信。 ...

    Netty In Action中文版.pdf

    - **定义**: Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器与客户端。 - **目标用户**: 适合Java网络编程新手及有经验的开发者。 - **核心优势**: 提供了简单易用的API,将...

    learning-netty

    定义了通道的基本操作。 #### 类AbstractChannel 这是一个抽象类,实现了`Channel`接口,提供了部分通用的行为。 #### ChannelFactory 用于创建新的`Channel`实例。 #### ChannelUnsafe 提供了一些不安全的操作,...

    Netty权威指南.rar

    4. **Pipeline**:Netty的事件处理模型基于事件通道(Event Pipeline),它是一个链式处理结构,每个节点都是一个ChannelHandler,数据在管道中流动时会被逐个处理,实现了职责分离和解耦。 5. **NIO基础**:Netty...

    netty及时通讯通讯DEMO

    开发者可以通过实现ChannelInboundHandler和ChannelOutboundHandler接口,定义自己的业务逻辑。 5. **Pipeline**: Pipeline是一个处理链,每个ChannelHandler都在链上占据一个位置。数据在网络中传输时,会经过...

    Java_NIO框架Netty教程.pdf

    通过上述简单的服务端和客户端示例,我们可以看出Netty框架在服务器和客户端的代码结构上保持了一致性,都利用了ChannelHandler来处理事件,并且Netty通过一系列的接口和抽象类来提供灵活的编程模型。 Netty还提供...

    netty通讯示例

    `META-INF`目录通常存储元数据,如服务定义和服务提供者接口(SPI)信息;`WEB-INF`目录则包含Web应用的配置文件(如`web.xml`)、库(JAR文件)和其他不直接暴露给用户的资源。 在学习这个示例时,你需要关注以下...

    c++客户端和java(Netty)服务器端tcp通讯

    利用Netty,我们可以轻松实现TCP服务器,创建ServerBootstrap,配置通道处理器pipeline,监听客户端连接,并处理接收到的数据。 Netty中的ChannelHandlerContext类是事件处理的核心,它提供了发送和接收数据的方法...

    ssm+netty的源码

    MyBatis允许开发者在XML或注解中定义SQL语句,通过Mapper接口与Java对象进行交互。这种设计模式提高了数据库操作的灵活性和效率。 Netty是一个高性能、异步事件驱动的网络应用框架,主要用于快速开发可维护的高性能...

    netty聊天室实例

    - 在处理链中,`ChannelHandlerContext`是上下文对象,用于触发I/O事件、获取引用到下一个处理器、关闭通道等。 6. **多路复用与NIO选择器**: - Netty利用Java NIO的多路复用器(Selector)来监听和调度多个连接...

    netty学习之ServerChannel

    3. **ChannelHandler**:Netty中的`ChannelHandler`接口是处理I/O事件的核心组件。ServerChannel会将接收到的客户端连接转换为新的`Channel`,每个新建立的连接都有自己的事件处理管道。你可以通过`ChannelPipeline`...

    netty与socket交互实例

    此外,Netty的ChannelHandler接口允许我们定义业务逻辑,而EventLoopGroup则负责线程管理和事件调度。 对比之下,Socket是基于TCP/IP协议族的低级通信接口,它提供了进程间通信的基础。在Java中,Socket类代表一个...

Global site tag (gtag.js) - Google Analytics