`
Donald_Draper
  • 浏览: 988059 次
社区版块
存档分类
最新评论

netty 多线程事件循环组

阅读更多
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
引言:
上一篇文章我们看了多线程事件执行器组,先来回顾一下:
     多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
     获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
本想这篇看一下多线程事件循环组呢,但是其实现了事件循环组,我们这篇先来看一下EventLoopGroup
//MultithreadEventLoopGroup
package io.netty.channel;

import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

/**
 * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {


下面来看事件循环组EventLoopGroup的定义
package io.netty.channel;

import io.netty.util.concurrent.EventExecutorGroup;

/**
 * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get
 * processed for later selection during the event loop.
 *
 */
public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     返回下一个事件循环
     */
    @Override
    EventLoop next();

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
     * will get notified once the registration was complete.
     注册通道到事件循环,返回异步通道注册结果,当注册完成通知结果。
     */
    ChannelFuture register(Channel channel);

    /**
     * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
     * {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
     注册可写异步通道任务结果关联的通道
     */
    ChannelFuture register(ChannelPromise promise);

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
     * will get notified once the registration was complete and also will get returned.
     *注册通道到事件循环,当注册完成,通知异步通道注册结果。
     * @deprecated Use {@link #register(ChannelPromise)} instead.
     */
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
}

从上面可以看出,事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,
事件循环组主要所做的工作为通道注册。

再来看事件循环EventLoop接口的定义:
//EventLoop
package io.netty.channel;

import io.netty.util.concurrent.OrderedEventExecutor;

/**
 * Will handle all the I/O operations for a {@link Channel} once registered.
 *
 * One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on
 * implementation details and internals.
 一个事件循环实例可以处理多个通道,这个具体要依赖于具体的实现。
 *
 */
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
   //获取事件循环所属的事件循环组
    @Override
    EventLoopGroup parent();
}
//OrderedEventExecutor
package io.netty.util.concurrent;

/**
 * Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
 标记一个事件执行器顺序、串行的方式处理提交的任务
 */
public interface OrderedEventExecutor extends EventExecutor {
}

从上面可以看出事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。

再来看事件循环抽象实现:

package io.netty.channel;

import io.netty.util.concurrent.AbstractEventExecutor;

/**
 * Skeletal implementation of {@link EventLoop}.
 */
public abstract class AbstractEventLoop extends AbstractEventExecutor implements EventLoop {
    protected AbstractEventLoop() { }
    protected AbstractEventLoop(EventLoopGroup parent) {
        super(parent);
    }
    @Override
    public EventLoopGroup parent() {
        return (EventLoopGroup) super.parent();
    }
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
}

抽象事件循环AbstractEventLoop继承了抽象事件执行器,实现了事件循环接口。

鉴于当这样已经把事件循环和事件循环组看完,那就来看下多线程事件循环组:
package io.netty.channel;

import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

/**
 * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

    private static final int DEFAULT_EVENT_LOOP_THREADS;//默认事件循环线程数

    static {
        //默认事件循环线程数为1和可用处理器数的2倍中的最大者
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    //下面的构造函数都是具体可以参考多线程事件执行器组的相应的构造
    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }
    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
     * EventExecutorChooserFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                     Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
    }

    @Override
    protected ThreadFactory newDefaultThreadFactory() {
        //创建的默认线程工厂的线程优先级默认为最大优先级
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    //创建事务循环,待子类实现
    @Override
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }
}

从上面可以看出,多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,
相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;
默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。


//NioEventLoopGroup
/**
 * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
 */
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    ...
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
}

从Nio事件循环组创建事件循环可以看出事件循环为NioEventLoop,这也就是接下来的文章要看的,先列出
Nio事件循环声明继承树。
/**
 * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
 * {@link Selector} and so does the multi-plexing of these in the event loop.
 *
 */
public final class NioEventLoop extends SingleThreadEventLoop {

/**
 * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

/**
 * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {


总结:

     事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,事件循环组主要所做的工作为通道注册。
     事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。抽象事件循环AbstractEventLoop继承了抽象事件执行器AbstractEventExecutor,实现了事件循环接口。
     多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。

附:
在多线程事件循环组的静态语句中,初始化默认事件循环线程数有下面一段:
private static final int DEFAULT_EVENT_LOOP_THREADS;//默认事件循环线程数
static {
    //默认事件循环线程数为1和可用处理器数的2倍中的最大者
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

我们来看一下NettyRuntime

package io.netty.util;

import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.SystemPropertyUtil;

import java.util.Locale;

/**
 * A utility class for wrapping calls to {@link Runtime}.
 运行时包装类
 */
public final class NettyRuntime {
    //可利用处理器holder
    private static final AvailableProcessorsHolder holder = new AvailableProcessorsHolder();
    /**
     * Holder class for available processors to enable testing.
     */
    static class AvailableProcessorsHolder {
        private int availableProcessors;//可利用的处理器数量

        /**
         * Set the number of available processors.
         *设置可利用的处理器数量
         * @param availableProcessors the number of available processors
         * @throws IllegalArgumentException if the specified number of available processors is non-positive
         * @throws IllegalStateException    if the number of available processors is already configured
         */
        synchronized void setAvailableProcessors(final int availableProcessors) {
            ObjectUtil.checkPositive(availableProcessors, "availableProcessors");
            if (this.availableProcessors != 0) {
                final String message = String.format(
                        Locale.ROOT,
                        "availableProcessors is already set to [%d], rejecting [%d]",
                        this.availableProcessors,
                        availableProcessors);
                throw new IllegalStateException(message);
            }
            this.availableProcessors = availableProcessors;
        }

        /**
         * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}.
         * This can be overridden by setting the system property "io.netty.availableProcessors" or by invoking
         * {@link #setAvailableProcessors(int)} before any calls to this method.
         *
         * @return the configured number of available processors
         */
        @SuppressForbidden(reason = "to obtain default number of available processors")
        synchronized int availableProcessors() {
            if (this.availableProcessors == 0) {
	        //获取可以用的系统可用的处理器数量
                final int availableProcessors =
                        SystemPropertyUtil.getInt(
                                "io.netty.availableProcessors",
                                Runtime.getRuntime().availableProcessors());
                setAvailableProcessors(availableProcessors);
            }
            return this.availableProcessors;
        }
    }

    /**
     * Set the number of available processors.
     *设置可利用的处理器数量
     * @param availableProcessors the number of available processors
     * @throws IllegalArgumentException if the specified number of available processors is non-positive
     * @throws IllegalStateException    if the number of available processors is already configured
     */
    @SuppressWarnings("unused,WeakerAccess") // this method is part of the public API
    public static void setAvailableProcessors(final int availableProcessors) {
        holder.setAvailableProcessors(availableProcessors);
    }

    /**
     * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}. This
     * can be overridden by setting the system property "io.netty.availableProcessors" or by invoking
     * {@link #setAvailableProcessors(int)} before any calls to this method.
     *获取配置的可用处理器数量,默认的为Runtime#availableProcessors()。在调用此方法前,
     这个值可以被设置io.netty.availableProcessors属性或#setAvailableProcessors(int)重写。
     * @return the configured number of available processors
     */
    public static int availableProcessors() {
        return holder.availableProcessors();
    }

    /**
     * No public constructor to prevent instances from being created.
     */
    private NettyRuntime() {
    }
}

0
0
分享到:
评论

相关推荐

    netty5多线程编程

    总结来说,Netty5的多线程编程是一个复杂的主题,它不仅涉及对Netty框架内部事件驱动和线程模型的理解,也需要掌握Java并发编程的基础知识,包括Java内存模型、多线程安全、锁、volatile关键字以及CAS指令等。...

    使用Netty4实现多线程的消息分发

    1. **EventLoopGroup**:Netty 中的 EventLoopGroup 是一组事件循环(EventLoop)的集合,每个事件循环对应一个独立的工作线程。在服务器端,通常会创建两个 EventLoopGroup,一个用于接收客户端连接(BossGroup),...

    netty技术文档,socket技术 多线程

    5. **线程模型**:Netty采用了EventLoop(事件循环)和EventExecutorGroup(事件执行器组)来管理线程,有效地利用了多核处理器资源。 **Socket技术** Socket是网络通信的接口,允许两个进程通过网络进行通信。在...

    Netty通讯框架-多线程-源代码

    多线程在Netty中的应用主要体现在EventLoopGroup上,这是Netty的事件循环组,它由一组线程组成,负责处理I/O事件。通常,我们会有两个EventLoopGroup,一个用于接收客户端连接(BossGroup),另一个用于处理连接后的...

    Netty简介 Netty线程模型和EventLoop Codec编码与解码 ByteBuf容器

    EventLoop是Netty的核心组件,它是一个单线程执行任务的循环,负责处理I/O事件并分发到对应的处理器。每个EventLoop都包含一个Selector,用于监听多个通道的事件。通过多路复用技术,EventLoop可以高效地处理大量...

    Netty核心精讲之Reactor线程模型源码分析.mp4

    Netty的Reactor模型主要由三部分组成:主线程(BossGroup)、工作线程(WorkerGroup)和事件循环(EventLoop)。 主线程BossGroup通常只有一个,它的职责是接收新的连接请求,一旦接收到新连接,它会将连接交由工作...

    04-Netty线程模型源码剖析1

    此外,Netty利用NIO的多路复用技术,通过一个线程管理多个套接字连接,极大地提高了服务器的并发处理能力。非阻塞I/O使得Netty在等待数据到达时不会阻塞其他任务的执行,进一步提升了效率。 在Netty中,ByteBuf是...

    netty权威指南 第二版 李林锋pdf

    6. **EventLoop与EventLoopGroup**:EventLoop是Netty中的事件循环,负责处理I/O事件和调度任务。EventLoopGroup是一组EventLoop的集合,通常一个线程对应一个EventLoop,负责监听和处理多个Channel。 7. **Netty的...

    Netty4事件处理传播机制,java高级开发工程师要求(csdn)————程序.pdf

    如果当前不在事件循环线程中,会提交一个任务到事件循环,以便在合适的时机执行添加操作和回调。 `ChannelPipeline` 的事件传播机制基于这些处理器链。当一个事件(如连接建立、数据接收等)发生时,它会从头节点...

    Netty源码解读之线程

    本文将深入剖析Netty线程模型的关键组件——`NioEventLoopGroup`及`NioEventLoop`,通过源码分析来理解Netty如何管理线程以及如何处理I/O事件。 #### 二、NioEventLoopGroup与NioEventLoop的关系 在Netty中,`...

    netty快速入门教程3-4集 共12集

    Netty使用EventLoopGroup作为其多线程模型的基础,通常包含一个BossGroup负责接受新的连接,而多个WorkerGroup则处理已连接的通道上的读写事件。每个EventLoop都是一个独立的线程,它们负责处理分配给它们的通道。...

    netty资源包.zip

    11. **Reactor 模式**: Netty 基于单线程或多线程的 Reactor 模型运行,实现了高效的并发处理。 12. **Extensibility**: Netty 的设计使其易于扩展,可以轻松地添加新的协议支持或者自定义处理逻辑。 在开发过程中...

    netty框架图及netty面试知识点解析

    事件循环(EventLoop)是Netty处理事件的关键组件,它负责调度和执行事件处理器。 "粘包与半包"是网络通信中常见的问题。当发送的数据超过缓冲区大小时,可能会导致一个完整的数据包被拆分,或者多个数据包被合并到...

    netty-netty-4.1.69.Final.tar.gz

    7. **线程模型**:Netty的EventLoopGroup负责管理和调度事件循环,确保线程资源的合理使用,避免了多线程同步的复杂性。 8. **零拷贝**:Netty通过直接将数据从网络缓冲区传递到用户缓冲区,或者反之,实现了零拷贝...

    springboot整合netty的demo

    Netty Server端通常会包含一个`ServerBootstrap`,用于设置服务器的各种属性,如事件循环组、通道处理器等。Client端则会使用`Bootstrap`来连接到服务器,同样也需要配置相应的处理器链。 在Server端,我们可以使用...

    Netty架构源码剖析_netty_

    Netty使用单线程的事件循环来处理多个Channel,提高并发性能。 5. **Pipeline**:通道处理管道,由一系列处理器(ChannelHandler)组成。每个处理器负责处理特定类型的事件或数据,可以自定义添加、删除或替换...

    netty实战源码13章

    5. **多线程模型**: Netty 的事件循环(EventLoop)是基于单线程的,但可以有多个 EventLoop 组成一个 EventLoopGroup,这样可以并发处理多个连接。 6. **编解码器**: 在 UDP 应用中,你可能需要自定义编解码器来...

    Netty源码依赖包

    - `EventLoopGroup`:负责管理多个事件循环器(EventLoop),每个事件循环器都有一个独立的线程。 - `Channel`:代表了与一个实体的连接,如TCP连接。它为数据的读写提供了一个统一的接口。 - `ByteBuf`:用于替代...

    netty-4.1_javaNetty_netty_服务器_

    8. **线程模型**:理解 Netty 的多线程模型,包括 NIO 的选择器和工作线程的协同工作。 9. **心跳与保持连接**:了解如何在 Netty 中实现心跳机制,以检测连接是否断开,以及如何维持长连接。 10. **安全性**:...

Global site tag (gtag.js) - Google Analytics