  • 浏览: 996143 次

netty 单线程事件执行器初始化

netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
      抽象调度事件执行器AbstractScheduledEventExecutor,内部有一个调度任务队列scheduledTaskQueue(PriorityQueue),用于存储待调度的任务。抽象调度事件执行器无论是调度任务线程,周期性任务,还是间歇性任务,先将任务包装成调度任务ScheduledFutureTask,然后委托给#schedule(final ScheduledFutureTask<V> task)方法,#schedule方法首先判断线程是否在当前事务循环,如果在,则添加调度任务到调度任务队列,否则直接创建一个线程,完成添加调度任务到调度任务队列工作;移除调度任务的思想与调度任务相同,只不过执行移除操作。

package io.netty.util.concurrent;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

 * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

    private static final InternalLogger logger =
    private static final int ST_NOT_STARTED = 1;//就绪
    private static final int ST_STARTED = 2;//开始
    private static final int ST_SHUTTING_DOWN = 3;//正在关闭
    private static final int ST_SHUTDOWN = 4;//已关闭
    private static final int ST_TERMINATED = 5;//终止
    private static final Runnable WAKEUP_TASK = new Runnable() {
        public void run() {
            // Do nothing.
    private static final Runnable NOOP_TASK = new Runnable() {
        public void run() {
            // Do nothing.
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
                    SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
    private final Queue<Runnable> taskQueue;

    private volatile Thread thread;//当前事件执行器线程
    private volatile ThreadProperties threadProperties;
    private final Executor executor;
    private volatile boolean interrupted;//是否中断

    private final Semaphore threadLock = new Semaphore(0);//事件执行器关闭信号量
    private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();//关闭Hooks任务
    private final boolean addTaskWakesUp;
    private final int maxPendingTasks;//最大执行器任务
    private final RejectedExecutionHandler rejectedExecutionHandler;//任务拒绝策略

    private long lastExecutionTime;//上次执行器时间

    @SuppressWarnings({ "FieldMayBeFinal", "unused" })
    private volatile int state = ST_NOT_STARTED;

    private volatile long gracefulShutdownQuietPeriod;//关闭间隔QuietPeriod
    private volatile long gracefulShutdownTimeout;//关闭超时时间
    private long gracefulShutdownStartTime;//关闭开始时间
    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);


 * Create a new instance
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);

 * Create a new instance
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
 * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
 * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory,
        boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
    this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);

 * Create a new instance
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param executor          the {@link Executor} which will be used for executing
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
    this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());

 * Create a new instance
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param executor          the {@link Executor} which will be used for executing
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
 * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
 * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");

 * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
 * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
 * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
 * implementation that does not support blocking operations at all.
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    return new LinkedBlockingQueue<Runnable>(maxPendingTasks);




protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory,
        boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
    this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);

//ThreadPerTaskExecutor 线程执行器
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;//线程工厂

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        this.threadFactory = threadFactory;
    public void execute(Runnable command) {

package io.netty.util.concurrent;

import io.netty.util.internal.StringUtil;

import java.util.Locale;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

 * A {@link ThreadFactory} implementation with a simple naming rule.
public class DefaultThreadFactory implements ThreadFactory {

    private static final AtomicInteger poolId = new AtomicInteger();//线程池id生成器

    private final AtomicInteger nextId = new AtomicInteger();//线程id生成器
    private final String prefix;//线程名前缀
    private final boolean daemon;//是否为守候模式
    private final int priority;//优先级
    protected final ThreadGroup threadGroup;//线程组

    public DefaultThreadFactory(Class<?> poolType) {
        this(poolType, false, Thread.NORM_PRIORITY);

    public DefaultThreadFactory(String poolName) {
        this(poolName, false, Thread.NORM_PRIORITY);

    public DefaultThreadFactory(Class<?> poolType, boolean daemon) {
        this(poolType, daemon, Thread.NORM_PRIORITY);

    public DefaultThreadFactory(String poolName, boolean daemon) {
        this(poolName, daemon, Thread.NORM_PRIORITY);

    public DefaultThreadFactory(Class<?> poolType, int priority) {
        this(poolType, false, priority);

    public DefaultThreadFactory(String poolName, int priority) {
        this(poolName, false, priority);

    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
        this(toPoolName(poolType), daemon, priority);

    public static String toPoolName(Class<?> poolType) {
        if (poolType == null) {
            throw new NullPointerException("poolType");

        String poolName = StringUtil.simpleClassName(poolType);
        switch (poolName.length()) {
            case 0:
                return "unknown";
            case 1:
                return poolName.toLowerCase(Locale.US);
                if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
                    return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
                } else {
                    return poolName;

    public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
        if (poolName == null) {
            throw new NullPointerException("poolName");
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(
                    "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");

        prefix = poolName + '-' + poolId.incrementAndGet() + '-';
        this.daemon = daemon;
        this.priority = priority;
        this.threadGroup = threadGroup;
    public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
        this(poolName, daemon, priority, System.getSecurityManager() == null ?
                Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
    public Thread newThread(Runnable r) {
        Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {

            if (t.getPriority() != priority) {
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        return t;
    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    private static final class DefaultRunnableDecorator implements Runnable {

        private final Runnable r;

        DefaultRunnableDecorator(Runnable r) {
            this.r = r;

        public void run() {
            try {
            } finally {

private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

//全局事务执行器 GlobalEventExecutor
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

 * Single-thread singleton {@link EventExecutor}.  It starts the thread automatically and stops it when there is no
 * task pending in the task queue for 1 second.  Please note it is not scalable to schedule large number of tasks to
 * this executor; use a dedicated executor.
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
    private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);
    public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
    final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
    final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
            this, Executors.<Void>callable(new Runnable() {
        public void run() {
            // NOOP

    // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
    // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
    // be sticky about its thread group
    // visible for testing
    final ThreadFactory threadFactory =
            new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
    private final TaskRunner taskRunner = new TaskRunner();
    private final AtomicBoolean started = new AtomicBoolean();
    volatile Thread thread;
    private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());

    private GlobalEventExecutor() {

     * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
     * @return {@code null} if the executor thread has been interrupted or waken up.
    Runnable takeTask() {
        BlockingQueue<Runnable> taskQueue = this.taskQueue;
        for (;;) {
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
            if (scheduledTask == null) {
                Runnable task = null;
                try {
                    task = taskQueue.take();
                } catch (InterruptedException e) {
                    // Ignore
                return task;
            } else {
                long delayNanos = scheduledTask.delayNanos();
                Runnable task;
                if (delayNanos > 0) {
                    try {
                        task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        // Waken up.
                        return null;
                } else {
                    task = taskQueue.poll();

                if (task == null) {
                    task = taskQueue.poll();

                if (task != null) {
                    return task;
    private void fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            scheduledTask = pollScheduledTask(nanoTime);

     * Return the number of tasks that are pending for processing.
     * [b]Be aware that this operation may be expensive as it depends on the internal implementation of the
     * SingleThreadEventExecutor. So use it was care![/b]
    public int pendingTasks() {
        return taskQueue.size();

     * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
     * before.
    private void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        return terminationFuture();
    public Future<?> terminationFuture() {
        return terminationFuture;
    public void shutdown() {
        throw new UnsupportedOperationException();
    public boolean isShuttingDown() {
        return false;

    public boolean isShutdown() {
        return false;

    public boolean isTerminated() {
        return false;
    public boolean awaitTermination(long timeout, TimeUnit unit) {
        return false;

     * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
     * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
     * when you want to ensure that the worker thread is terminated [b]after[/b] your application is shut
     * down and there's no chance of submitting a new task afterwards.
     * @return {@code true} if and only if the worker thread has been terminated
    public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
        if (unit == null) {
            throw new NullPointerException("unit");

        final Thread thread = this.thread;
        if (thread == null) {
            throw new IllegalStateException("thread was not started");
	//  * Waits at most {@code millis} milliseconds for this thread to
        //  * die. A timeout of {@code 0} means to wait forever.
        return !thread.isAlive();
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        if (!inEventLoop()) {
    private void startThread() {
        if (started.compareAndSet(false, true)) {
            Thread t = threadFactory.newThread(taskRunner);
            // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
            // an assert error.
            // See https://github.com/netty/netty/issues/4357
            thread = t;
    final class TaskRunner implements Runnable {
        public void run() {
            for (;;) {
                Runnable task = takeTask();
                if (task != null) {
                    try {
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from the global event executor: ", t);
                    if (task != quietPeriodTask) {
                Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
                // Terminate if there is no task in the queue (except the noop task).
                if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                    // Mark the current thread as stopped.
                    // The following CAS must always success and must be uncontended,
                    // because only one thread should be running at the same time.
                    boolean stopped = started.compareAndSet(true, false);
                    assert stopped;

                    // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
                    if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
                        // A) No new task was added and thus there's nothing to handle
                        //    -> safe to terminate because there's nothing left to do
                        // B) A new thread started and handled all the new tasks.
                        //    -> safe to terminate the new thread will take care the rest

                    // There are pending tasks added again.
                    if (!started.compareAndSet(false, true)) {
                        // startThread() started a new thread and set 'started' to true.
                        // -> terminate this thread so that the new thread reads from taskQueue exclusively.

                    // New tasks were added, but this worker was faster to set 'started' to true.
                    // i.e. a new worker thread was not started by startThread().
                    // -> keep this thread alive to handle the newly added entries.


protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
    this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());

package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

 * Expose helper methods which create different {@link RejectedExecutionHandler}s.
public final class RejectedExecutionHandlers {
    private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
        public void rejected(Runnable task, SingleThreadEventExecutor executor) {
            throw new RejectedExecutionException();

    private RejectedExecutionHandlers() { }

     * Returns a {@link RejectedExecutionHandler} that will always just throw a {@link RejectedExecutionException}.
    public static RejectedExecutionHandler reject() {
        return REJECT;

     * Tries to backoff when the task can not be added due restrictions for an configured amount of time. This
     * is only done if the task was added from outside of the event loop which means
     * {@link EventExecutor#inEventLoop()} returns {@code false}.
    public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {
        ObjectUtil.checkPositive(retries, "retries");
        final long backOffNanos = unit.toNanos(backoffAmount);
        return new RejectedExecutionHandler() {
            public void rejected(Runnable task, SingleThreadEventExecutor executor) {
                if (!executor.inEventLoop()) {
                    for (int i = 0; i < retries; i++) {
                        // Try to wake up the executor so it will empty its task queue.
                        if (executor.offerTask(task)) {
                // Either we tried to add the task from within the EventLoop or we was not able to add it even with
                // backoff.
                throw new RejectedExecutionException();

   protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop || state == ST_SHUTTING_DOWN) {
            // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
            // is already something in the queue.

package io.netty.util.concurrent;

 * Similar to {@link java.util.concurrent.RejectedExecutionHandler} but specific to {@link SingleThreadEventExecutor}.
public interface RejectedExecutionHandler {

     * Called when someone tried to add a task to {@link SingleThreadEventExecutor} but this failed due capacity
     * restrictions.
    void rejected(Runnable task, SingleThreadEventExecutor executor);

private volatile ThreadProperties threadProperties;

package io.netty.util.concurrent;

 * Expose details for a {@link Thread}.
public interface ThreadProperties {
     * @see Thread#getState()
    Thread.State state();
     * @see Thread#getPriority()
    int priority();

     * @see Thread#isInterrupted()
    boolean isInterrupted();
     * @see Thread#isDaemon()
    boolean isDaemon();
     * @see Thread#getName()
    String name();
     * @see Thread#getId()
    long id();
     * @see Thread#getStackTrace()
    StackTraceElement[] stackTrace();
     * @see Thread#isAlive()
    boolean isAlive();

private static final class DefaultThreadProperties implements ThreadProperties {
        private final Thread t;
        DefaultThreadProperties(Thread t) {
            this.t = t;
        public State state() {
            return t.getState();
        public int priority() {
            return t.getPriority();
        public boolean isInterrupted() {
            return t.isInterrupted();
        public boolean isDaemon() {
            return t.isDaemon();
        public String name() {
            return t.getName();
        public long id() {
            return t.getId();
        public StackTraceElement[] stackTrace() {
            return t.getStackTrace();
        public boolean isAlive() {
            return t.isAlive();


    Netty简介 Netty线程模型和EventLoop Codec编码与解码 ByteBuf容器



    5. **Bootstrap**: Bootstrap 类用于初始化服务器端(ServerBootstrap)或客户端(Bootstrap)的配置,包括选择器、事件循环组、通道类等。 6. **Codec**: Netty 提供了一系列的编解码器,用于将各种数据类型转换为...

    Netty In Action中文版

    10. **线程模型**:Netty使用事件驱动和单线程或多线程模型相结合的方式,提高了并发处理能力。例如,一个EventLoop通常负责多个连接,减少了线程切换的开销。 通过学习《Netty In Action》中文版,读者不仅可以...

    netty4.0 demo

    - **EventLoop**: 负责执行事件处理的线程,Netty使用了单线程模型来处理每个连接。 - **Buffer**: Netty 提供了ByteBuf,用于高效地处理网络数据。 3. **Netty的编码与解码** Netty 提供了多种编解码器,如 ...


    - **Reactor模式**:Netty采用了Reactor模式来处理I/O事件,该模式的核心在于事件循环(Event Loop)和多路复用器(Multiplexer),使得单个线程能够处理多个Channel的I/O操作。 - **Zero-Copy**:Netty支持零拷贝技术,...

    Java网络编程 NIO Netty

    选择器用于监控多个通道,当通道上有可读、可写事件发生时,选择器会通知用户线程,这样可以实现单线程管理多个连接,降低了系统的线程开销。 Netty是一个高性能、异步事件驱动的网络应用框架,专为Java设计,用于...


    单线程版本的`EventExecutor`,所有任务都在同一个线程中执行。 #### 类SingleThreadEventLoop `SingleThreadEventExecutor`在Netty中的具体实现,专门用于处理单个线程中的事件。 #### 类NioEventLoop 继承自`...



    netty4.x 与 spring 集成

    - Netty的非阻塞I/O模型使得它可以高效地处理大量并发连接,这与Spring的单线程模型不同。 - 需要理解如何在Spring中正确地处理Netty的异步回调,以避免线程安全问题。 5. **整合Netty的事件循环和Spring的任务...


    Netty 使用多线程模型,EventLoop 负责执行事件处理器(ChannelHandler)的方法。 3. **ChannelFuture** - ChannelFuture 代表了一个I/O操作的未来结果,它是异步编程的关键。你可以注册监听器来接收操作完成的通知...


    - **好处**: 分散负载,避免单线程处理过多 Channel 导致的性能瓶颈。 ##### **2.3 事件处理** - **机制**: Netty 通过事件驱动模型来处理 I/O 事件,包括但不限于连接建立、读写事件等。 - **注册**: 当 Channel ...

    Netty in Action 2014 v10 MEAP

    6. 启动引导:Netty通过其Bootstrap类提供了灵活的网络应用程序启动和初始化机制,支持服务端和客户端的启动配置。 7. 示例:Netty的书籍通常包括大量示例,如单元测试、使用WebSockets、SPDY协议、以及使用UDP进行...


    1. **NIO(Non-blocking I/O)**:Netty 基于 Java NIO(非阻塞I/O)构建,允许单线程处理多个并发连接,从而提高了性能和效率。 2. **Channel**:在 Netty 中,Channel 是网络连接的抽象,它可以是 TCP、UDP 或...


    3. **高度可定制的线程模型**:Netty 允许开发者根据应用需求定制线程模型,比如单线程、单个线程池或多线程池等。 4. **无连接数据报支持**:自 3.1 版本起,Netty 支持无连接的数据报套接字,进一步拓宽了应用场景...





Global site tag (gtag.js) - Google Analytics