`
Donald_Draper
  • 浏览: 981129 次
社区版块
存档分类
最新评论

netty nio事件循环组

阅读更多
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
netty 单线程事件执行器初始化:http://donald-draper.iteye.com/blog/2391895
netty 单线程事件执行器执行任务与graceful方式关闭:http://donald-draper.iteye.com/blog/2392051
netty 单线程事件循环:http://donald-draper.iteye.com/blog/2392067
netty nio事件循环初始化:http://donald-draper.iteye.com/blog/2392161
netty nio事件循环后续:http://donald-draper.iteye.com/blog/2392264
引言:
前面的文章我们看了Nio事件循环,先来回顾一下:
    Nio事件循环内部有一个取消选择key计数器清理间隔CLEANUP_INTERVAL,用于当取消的选择key达到256个时,重置取消选择key计数器cancelledKeys(int),并重新进行选择操作;选择器自动重构阈值SELECTOR_AUTO_REBUILD_THRESHOLD,默认选择操作发生512次,用于控制当选择器发生多少次选择操作时,重构选择器;选择器状态判断器selectNowSupplier,用于获取Nio事件循环内部选择器的选择操作结果;同时有一个选择器selector,未包装过的选择器unwrappedSelector和一个选择器提供者provider,一个选择key就绪集合selectedKeys(SelectedSelectionKeySet);当选择器的选择操作阻塞时,wakenUp(AtomicBoolean)属性决定是否应该break选择操作过程;一个Nio处理Io事件的时间占比ioRatio(int),默认为50,即IO事件处理时间和其他事件处理时间各占Nio事件循环一半;一个选择策略selectStrategy用于控制选择循环,如果返回结果为-1,则下一步应该阻塞选择操作,如果返回结果为-2,则下一步应该调回IO事件循环,处理IO事件,而不是继续执行选择操作,返回值大于0,表示需要有工作要做,即注册到选择器的选择通道有IO事件就绪。
    Nio事件循环初始化,主要是将Nio事件循环组和事件执行器及任务拒绝策略传给父类单线程事件循环(单线程事件执行器),同时打开一个选择器。
    打开选择器过程,委托给选择器提供者打开一个选择器,如果需要优化选择器的,在当前线程访问控制选择下,加载选择器实现类,不初始化,如果从系统类加载器加载的选择key实现类不是Class实例,或不是裸选择器类型,不进行选择器key集合优化,及选择器为选择器提供者打开的裸选择器;否则在当前线程相同访问控制权限下,获取系统选择器实现类的,选择器就绪key集合selectedKeysField及其代理publicSelectedKeysField,设置选择器就绪key集合selectedKeysField及其代理publicSelectedKeysField访问控制权限,将系统选择器的就绪key集合selectedKeysField及其代理publicSelectedKeysField设值为selectedKeySet(SelectedSelectionKeySet),并将选择器selector包装为SelectedSelectionKeySetSelector。
    Nio事件循环启动后,首先选择策略根据选择器结果提供者和任务队列是否有任务生成下一步的操作策略,如果选择操作结果返回为SelectStrategy.CONTINUE,则跳出当前事件循环,如果为SelectStrategy.SELECT,则执行选择操作,并阻塞下一次选择操作,如果需要唤醒选择器,则唤醒;然后重置取消选择key计数器cancelledKeys为0,置是否需要重新选择属性needsToSelectAgain为false,然后处理选择器就绪的选择key,在根据当前IO事件处理时间百分比ioRatio,决定是执行任务队列所有任务还是超时执行任务队列任务,如果ioRatio小于100,则为超时执行任务队列中任务;最后检查事件循环是否正在关闭,是则反注册选择器中的选择通道,并关闭,确定事件循环关闭;上述整个过程在事件循环运行期间,不断地重复。在事件循环的每次处理过程中,在最后都要检查意见事件循环是否关闭,如果正在关闭,则关闭注册到选择器的所有通道,并确保事件循环关闭。
    选择操作的过程为,首先重置选择操作计数器为0,计算选择操作延时时间;如果延时时间已过去0.5毫秒,且选择操作计数器当前为0,即第一次执行选择操作,执行立刻执行选择操作,更新选择操作计数器,并跳出当前选择操作过程;如果任务队列中有任务,且wakeUp属性为false,并更新为true成功,则立刻执行选择操作,更新选择操作计数器,跳出当前选择操作过程;如果上面两种情况都不是,则执行超时选择操作,如果有选择key就绪,或原始wakeUp属性为true,或当前wakeUp属性为true,或任务队列有任务,或调度任务队列有调度任务,则跳出当前选择操作;如果选择器重构阈值大于0,且当前选择操作计数器的值大于阈值,则重新构造选择器,即创建新的选择器将原始选择器关联的选择key,注册到新的选择器中。
    默认的处理选择器就绪选择key集合过程,为遍历选择key集合,处理就绪的选择key,首先选择key当前必须有效,再判断选择key通道事件循环是否是当前循环,否则直接返回,是则判断就绪key的就绪事件是连接请求事件,写事件还是读事件,如果事件连接操作,则委托通道的Unsafe完成通道连接,并移除连接事件;如果是如果是写事件,则委托通道的Unsafe刷新写请求队列,释放内存;果是读事件,则委托给通道的Unsafe的read方法;如果在处理就绪选择key的过程,需要重新执行选择操作,则立刻执行,并更新当前就绪选择key集合及其迭代器。
    Nio事件循环内部有一个选择器,所有注册到选择器的通道都在一个事件循环中,Nio事件循环是单线程事件循环,即单线程事件执行器,在处理选择器的就绪选择key时,当且仅当,就绪选择key关联通道所在的事件循环为当前事件循环时,才出来就绪选择key关联通道的就绪IO事件,从而保证通道的读写等操作线程安全。
    Nio事件循环实际的工作就是执行选择操作,并处理选择器的就绪选择key,Nio事件循环与Mina的IoProcessor有点相似,都可以看着一个线程执行器,执行通道的IO事件操作,而不同的是Nio管理的是选择器Selector,而Mina的IoProcessor管理的会话IoSession。

今天我们来看一下Nio事件循环组NioEventLoopGroup:
package io.netty.channel.nio;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers;

import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

/**
 * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
 nio事件循环组实际为一个多线程事件循环组,用于管理基于通道的选择器
 */
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    /**
     * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
     * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     创建一个默认线程数,默认线程工厂的选择器提供者nio事件循环组,如果没有配置io.netty.eventLoopThreads系统属性的话,
     默认线程数为处理器的2倍,see#MultithreadEventLoopGroup
     */
    public NioEventLoopGroup() {
        this(0);
    }

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     创建指定线程数量的事件循环组
     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    //这些构造都和简单,不一一讲了see##MultithreadEventLoopGroup讲解
    /**
     * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }

    /**
     * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
     * {@link SelectorProvider}.
     */
    public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
        this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
        final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    }
    /**
     * Sets the percentage of the desired amount of time spent for I/O in the child event loops.  The default value is
     * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
     设置nio事件循环组的IO事件处理时间百分比
     */
    public void setIoRatio(int ioRatio) {
        //遍历事件循环组,设置nio事件循环的IO事件处理时间百分比
        for (EventExecutor e: this) {
            ((NioEventLoop) e).setIoRatio(ioRatio);
        }
    }

    /**
     * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
     * around the  infamous epoll 100% CPU bug.
     重构nio事件循环组选择器
     */
    public void rebuildSelectors() {
        //遍历事件循环组,重构nio事件循环的选择器
        for (EventExecutor e: this) {
            ((NioEventLoop) e).rebuildSelector();
        }
    }
    //创建Nio事件循环
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
}

从上面可以看出,nio事件循环组实际为一个多线程事件循环组,主要用于管理nio事件循环;从设置nio事件循环组的IO事件处理时间百分比
和重构nio事件循环组选择器方法,可以看出事件循环组继承迭代器的原因;nio事件循环组可以统一设置组内的nio事件循环的IO事件处理时间百分比,
而nio事件可以动态变更自己的IO事件处理时间百分比,重构选择器也有这么点意思。
nio事件循环组是多线程的,而nio事件循环时单线程的,这个与Mina的IoProcessor和processor的关系有点像。
public interface EventLoopGroup extends EventExecutorGroup {

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {




我们来看构造:
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}

//MultithreadEventLoopGroup
/**
 * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
 * EventExecutorChooserFactory, Object...)
 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}

参数args为选择器提供者,选择策略工厂,和拒绝执行策略,传递给父类,我们往下看,看这些参数给予了谁?
//MultithreadEventExecutorGroup
 
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
       ...
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
	        //关键在这,创建事件执行器
                children[i] = newChild(executor, args);
                success = true;
         }
	...
}
/**
 * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
 * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
 *待子类扩展
 */
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

回到nio事件循环组:
有一个newChild方法,返回的是nio事件循环,即单线程事件循环(单线程事件执行器)

//创建Nio事件循环
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

到这里,我们看到了上是的args参数,事件传给了Nio事件循环。

下面来看一下选择策略工厂:
 public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

//DefaultSelectStrategyFactory
public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
    public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();

    private DefaultSelectStrategyFactory() { }

    @Override
    public SelectStrategy newSelectStrategy() {
        return DefaultSelectStrategy.INSTANCE;
    }
}

//DefaultSelectStrategy
package io.netty.channel;

import io.netty.util.IntSupplier;

/**
 * Default select strategy.
 */
final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

    private DefaultSelectStrategy() { }
    //获取选择器当前状态
    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        //如果有任务则返回,选择器选择后的结果值,否则返回为SelectStrategy.SELECT
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
}

从上面可以看出,默认选择策略DefaultSelectStrategy,策略方法主要根据任务队列是否有任务,来确定第一步的工作,
如果没有任务,则下一步执行选择操作,否则返回选择器选择操作后的结果值,不执行选操作,然后处理操作就绪选择key和
任务队列中的任务。
总结:
nio事件循环组实际为一个多线程事件循环组,主要用于管理nio事件循环;从设置nio事件循环组的IO事件处理时间百分比和重构nio事件循环组选择器方法,可以看出事件循环组继承迭代器的原因;nio事件循环组可以统一设置组内的nio事件循环的IO事件处理时间百分比,而nio事件可以动态变更自己的IO事件处理时间百分比,重构选择器也有这么点意思。nio事件循环组是多线程的,而nio事件循环时单线程的,这个与Mina的IoProcessor和processor的关系有点像。不同的是nio事件循环面向的是选择器Selector,而processor面向的是会话IoSession。
0
0
分享到:
评论

相关推荐

    Java高并发编程代码(Netty NIO 实例)

    Netty是在Java NIO的基础上进行了封装和优化,提供了一套完整的事件驱动、异步网络通信框架。它简化了网络编程的复杂性,如连接管理、数据传输、异常处理等,使得开发者可以专注于业务逻辑,而不用过多关注底层细节...

    Java_NIO框架Netty教程.pdf

    1. **创建ServerBootstrap**:这是启动服务器的起点,可以配置线程池、事件循环组、通道处理器等。 2. **绑定监听端口**:通过ServerBootstrap的bind方法指定监听的端口,服务器开始监听客户端连接。 3. **定义...

    Java-NIO-Netty框架学习

    1. **异步事件驱动**:Netty采用事件驱动模型,通过事件循环(EventLoop)和事件处理器(ChannelHandler)处理I/O事件,使得在高并发场景下性能卓越。 2. **零拷贝**:Netty支持零拷贝技术,减少了内存到内存的数据...

    BIO、NIO、AIO、Netty 、TCP全网最全解析!Netty中提供了哪些线程模型?

    在Netty中,使用NIO或AIO实现的TCP连接,可以结合其线程模型,如EventLoopGroup(事件循环组)和ChannelHandler(通道处理器)等组件,实现高效、可扩展的网络通信。例如,BossGroup处理新的连接请求,WorkerGroup...

    Java网络编程 NIO Netty

    1. 异步事件驱动:基于Reactor模式,通过事件循环(EventLoop)处理I/O事件,减少了线程上下文切换的开销。 2. 高效的缓冲区:Netty自定义了ByteBuf,相比Java NIO的ByteBuffer,提供了更多便捷的操作方法,如合并、...

    Socket 之 BIO、NIO、Netty 简单实现

    Netty的核心组件包括:Bootstrap(启动引导类)、ServerBootstrap(服务器启动引导类)、Channel(网络连接通道)、Pipeline(处理链)和EventLoop(事件循环)。 **RPC(Remote Procedure Call)与Socket** RPC是...

    基于netty编写的socket服务端

    Netty的`Bootstrap`类是用来初始化和配置服务器的启动参数,如绑定的IP地址和端口号、事件循环组、处理器链等。例如,我们可以通过以下代码创建一个简单的Netty服务器: ```java EventLoopGroup bossGroup = new ...

    Netty权威指南 PDF 完整版

    EventLoop是Netty的事件循环,负责调度和执行任务。 Netty的异步模型是其效率的关键。通过使用NIO的非阻塞I/O,Netty能够在单个线程中处理多个连接,极大地提高了并发能力。同时,Netty的Buffer类提供了一种高效的...

    netty需要的包

    2. **EventLoop(事件循环)**: Netty 的事件循环模型是其性能的关键。每个EventLoop负责处理一组连接,执行I/O操作和执行用户定义的任务。 3. **Channel(通道)**: 通道是Netty中I/O操作的载体,可以读取和写入...

    netty的视频90集

    - 异步模型:Netty采用NIO(非阻塞I/O)模型,通过事件驱动方式实现高并发处理,提高系统性能。 - ByteBuf:Netty自定义的高效字节缓冲区,用于更有效地处理网络数据。 2. **Netty Channel**: - Channel接口:...

    Netty案例集锦(并发编程篇).pdf

    6. **线程模型**:Netty提供了EventLoopGroup(事件循环组),用于管理和调度EventLoop,保证并发处理多个连接,并确保每个连接在一个独立的线程中处理,避免了线程切换的开销。 7. **Netty与并发编程**:Netty通过...

    Netty大纲-同步netty专栏

    - **启动流程**:了解Netty服务器启动过程,包括事件循环组和事件循环的创建。 - **EventLoop剖析**:深入EventLoop的工作原理,它是Netty高并发性能的关键。 - **accept和read流程**:解析接受新连接和处理读取...

    Netty In Action 中文版

    2. **Netty架构**:介绍Netty的事件驱动、异步非阻塞架构,包括处理器链(Channel Pipeline)、事件循环(Event Loop)和多线程模型,理解这些核心组件如何协同工作以实现高效的网络通信。 3. **Netty组件**:深入...

    netty实战源码13章

    Netty 的事件驱动模型基于 NIO(非阻塞 I/O),使得它能够高效地处理大量并发连接。 在描述中提到的 "netty 实战源码 13 章",可能是指一系列关于 Netty 使用和实现的教程,覆盖了从基础到进阶的多个主题。这通常会...

    netty开发工具包

    6. **线程模型**:Netty 使用多路复用器(Selector)和事件循环组(EventLoopGroup)进行线程管理,使得系统能处理大量并发连接。 7. **协议支持**:Netty 内置了对多种常见网络协议的支持,如HTTP、WebSocket、FTP...

    Netty源码依赖包

    - `NioEventLoopGroup`:针对NIO的事件循环组实现。 - `EpollEventLoopGroup`:专门针对Linux系统的EPOLL机制的事件循环组实现。 - `ServerBootstrap` 和 `Bootstrap`:分别用于创建服务端和客户端的引导类。 ##...

    netty框架 jar包

    而Netty采用非阻塞I/O(NIO)和事件驱动模型,通过一个或少数几个线程就能处理成千上万的并发连接,极大地提高了系统的并行性和效率。 Netty的事件驱动模型基于“事件循环”(Event Loop)和“管道”(Channel ...

    基于Java NIO的网络服务器Netty生产实例.zip

    Netty是一个高性能、异步事件驱动的网络应用框架,它极大地简化了基于Java NIO进行网络编程的复杂性。在“基于Java NIO的网络服务器Netty生产实例.zip”压缩包中,可能包含了关于如何使用Netty构建实际生产环境中的...

    Netty 3中文版

    - **EventLoop**: EventLoop是Netty的事件循环,负责处理I/O事件和执行任务,保证了高并发下的性能表现。 - **ByteBuf**: ByteBuf是Netty的字节缓冲区,替代了Java NIO的ByteBuffer,提供了更高效且安全的数据操作...

Global site tag (gtag.js) - Google Analytics