`
Donald_Draper
  • 浏览: 987966 次
社区版块
存档分类
最新评论

netty 事件执行器组和事件执行器定义及抽象实现

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
netty 默认Channel管道线-添加通道处理器:http://donald-draper.iteye.com/blog/2388726
netty 默认Channel管道线-通道处理器移除与替换:http://donald-draper.iteye.com/blog/2388793
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
netty 通道处理器上下文定义:http://donald-draper.iteye.com/blog/2389214
netty 通道处理器上下文:http://donald-draper.iteye.com/blog/2389299
引言:
     在前面的文章中,当IO事件发生,Channel管道线处理相关事件的方法,如果管道线事件执行器处理当前事件循环组中,则直接执行,否则从事件执行器组映射关系childExecutors(Map<EventExecutorGroup, EventExecutor>)中获取事件执行器对应的事件执行器,并将IO事件的相关操作委托给事件执行器,这个事件执行器就是构造通道处理器上下文时的事件执行器executor(EventExecutor)。
从今天开始我们来看一下事件执行器组,事件执行器的还以及作用:
在前面的实例中,创建netty服务端以下一段代码:
/*
 * EventLoopGroup(多线程事件loop),处理IO操作,这里我们用了两个事件loop
 * 第一个用于处理器监听连接请求,第二个用于数据的传输;
 * 具体线程是多少依赖于事件loop的具体实现
 * */
 EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 	//ServerBootstrap,用于配置服务端,一般为ServerSocket通道
     ServerBootstrap serverBoot = new ServerBootstrap(); 
     serverBoot.group(bossGroup, workerGroup)
      .channel(NioServerSocketChannel.class) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 
          @Override
          public void initChannel(SocketChannel ch) throws Exception {
         	//添加通道处理器到通道关联的管道,准确的中文翻译为管道线, 此管道线与Mina中过滤链十分相似,
         	//ChannelInitializer用于配置通道的管道线,ChannelPipeline
         	 ChannelPipeline pipeline = ch.pipeline();
              if (sslCtx != null) {
             	 pipeline.addLast(sslCtx.newHandler(ch.alloc()));
              }
              pipeline.addLast(new LoggingHandler(LogLevel.INFO));
              pipeline.addLast(new EchoServerHandler());
          }
      })
      .option(ChannelOption.SO_BACKLOG, 128)//socket监听器连接队列大小、
      .childOption(ChannelOption.SO_KEEPALIVE, true); //保活,此配置针对ServerSocket通道接收连接产生的Socket通道
     InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
     // 绑定地址,开始监听
     ChannelFuture f = serverBoot.bind(inetSocketAddress).sync();
     log.info("=========Server is start=========");
     //等待,直到ServerSocket关闭
     f.channel().closeFuture().sync();
 } finally {
     workerGroup.shutdownGracefully();
     bossGroup.shutdownGracefully();
 }

其中用到了NioEventLoopGroup;
在客户端:
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
	//Bootstrap,用于配置客户端,一般为Socket通道
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(workerGroup)
     .channel(NioSocketChannel.class)
     .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
        	 //添加安全套接字处理器和通道处理器到
             ChannelPipeline pipeline = ch.pipeline();
             if (sslCtx != null) {
            	 pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port));
             }
             pipeline.addLast(new LoggingHandler(LogLevel.INFO));
             pipeline.addLast(new EchoClientHandler());
         }
     });
    InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
    //连接socket地址
    ChannelFuture f = bootstrap.connect(inetSocketAddress).sync();
    log.info("=========Client is start=========");
    //等待,直到连接关闭
    f.channel().closeFuture().sync();
} finally {
	workerGroup.shutdownGracefully();
}

同样也用到了NioEventLoopGroup,
我们来看一下事件循环组EventLoopGroup和事件执行器组EventExecutorGroup及事件执行器EventExecutor的关系;
/**
 * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
 Nio事件循环组NioEventLoopGroup为多线程事件循环组的事件,主要用于基于通道的Nio选择器相关操作。
 */
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

/**
 * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 多线程事件循环组MultithreadEventLoopGroup为事件循环组的实现,可以在同一时间多线程处理任务。
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {


/**
 * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 多线程事件执行器组MultithreadEventExecutorGroup可以在同一时间多线程处理任务。
 */
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {


/**
 * Abstract base class for {@link EventExecutorGroup} implementations.
 事件循环中的抽象实现
 */
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

/**
 * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use
 * via its {@link #next()} method. Besides this, it is also responsible for handling their
 * life-cycle and allows shutting them down in a global fashion.
 事件执行器组通道next方法提供事件执行器。除此之外,负责他们的生命循环,并允许以全局的方式关闭
 *
 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

调度执行器ScheduledExecutorService为JUC包中的执行器服务,用迭代器Iterable<EventExecutor>管理组内的
事件执行器。



/**
 * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
 * way to access methods.
事件执行器EventExecutor是一个特殊的事件执行器组,如果线程在事件循环中执行,事件执行器可以处理
相关的操作。除此之外,拓展了事件执行器组的相关方法,可以用一般的方式访问事件执行器组的相关方法。
 *
 */
public interface EventExecutor extends EventExecutorGroup {


再来看事件循环组的另一个分支EventLoopGroup

/**
 * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get
 * processed for later selection during the event loop.
 事件循环组为一个特殊的事件执行器组,可以注册通道,以便在事件循环中,被后面的选择操作处理器。
 *
 */
public interface EventLoopGroup extends EventExecutorGroup {


从上面可以看出事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable<EventExecutor>管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。


package io.netty.util.concurrent;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use
 * via its {@link #next()} method. Besides this, it is also responsible for handling their
 * life-cycle and allows shutting them down in a global fashion.
 事件执行器组通道next方法提供事件执行器。除此之外,负责他们的生命循环,并允许以全局的方式关闭
 *
 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

    /**
     * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup}
     * are being {@linkplain #shutdownGracefully() shut down gracefully} or was {@linkplain #isShutdown() shut down}.
     当且仅当事件执行器组关联的事件执行器被#shutdownGracefully或#isShutdown关闭时,此方法返回true
     */
    boolean isShuttingDown();

    /**
     * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values.
     *#shutdownGracefully超时方法的快捷方式,超时时间为默认值
     * @return the {@link #terminationFuture()}
     返回的为#terminationFuture方法的异步任务
     */
    Future<?> shutdownGracefully();

    /**
     * Signals this executor that the caller wants the executor to be shut down.  Once this method is called,
     * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
     * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
     * (usually a couple seconds) before it shuts itself down.  If a task is submitted during the quiet period,
     * it is guaranteed to be accepted and the quiet period will start over.
     通知执行器,调用希望关闭执行器。一旦方法被调用,#isShuttingDown方法返回true,执行器准备关闭。不像#shutdown方法,
     graceful方式的关闭,确保在关闭前,没有任务在默认间隔内提交到执行器。如果任务在默认间隔内被提交,执行器必须能够保证在默认间隔内任务可以被接收,猜测应该是任务可以在默认间隔内执行完,所以接收。
     *
     * @param quietPeriod the quiet period as described in the documentation
     默认间隔
     * @param timeout     the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
     *                    regardless if a task was submitted during the quiet period
     不管任务在默认间隔内被提交,执行调用#shutdown方法关闭执行器,最大超时等待时间
     * @param unit        the unit of, {@code quietPeriod} and {@code timeout}
     *默认间隔和超时时间单元
     * @return the {@link #terminationFuture()}
     */
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

    /**
     * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
     * {@link EventExecutorGroup} have been terminated.
     当事件执行器管理的所有事件执行器terminated时,异步任务结果Future将会被通知
     */
    Future<?> terminationFuture();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     已弃用,被#shutdownGracefully方法代替
     */
    @Override
    @Deprecated
    void shutdown();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     已弃用,被#shutdownGracefully方法代替
     */
    @Override
    @Deprecated
    List<Runnable> shutdownNow();

    /**
     * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
     返回事件执行器组管理的下一个事件执行器
     */
    EventExecutor next();
   //返回事件执行器组管理的事件执行器迭代器
    @Override
    Iterator<EventExecutor> iterator();
   //下面方法与JUC调度执行器服务相同,就不说了,JUC分类文章中已说
    @Override
    Future<?> submit(Runnable task);

    @Override
    <T> Future<T> submit(Runnable task, T result);

    @Override
    <T> Future<T> submit(Callable<T> task);

    @Override
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    @Override
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

从上面可以看出,事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取
事件执行器组管理的事件执行器和执行任务线程方法。
再来看事件执行器:
package io.netty.util.concurrent;

/**
 * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
 * way to access methods.
 *
 */
public interface EventExecutor extends EventExecutorGroup {

    /**
     * Returns a reference to itself.
     返回事件执行器组的下一个事件执行器
     */
    @Override
    EventExecutor next();

    /**
     * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
     返回所属的事件执行器组
     */
    EventExecutorGroup parent();

    /**
     * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
     判断当前线程是否在事件循环中
     */
    boolean inEventLoop();

    /**
     * Return {@code true} if the given {@link Thread} is executed in the event loop,
     * {@code false} otherwise.
     如果指定的线程在当前事件循环中执行,则返回true
     */
    boolean inEventLoop(Thread thread);

    /**
     * Return a new {@link Promise}.
     创建一个可写的异步任务结果
     */
    <V> Promise<V> newPromise();

    /**
     * Create a new {@link ProgressivePromise}.
     创建一个可写的异步任务解读结果
     */
    <V> ProgressivePromise<V> newProgressivePromise();

    /**
     * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
     * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     创建一个已经标记成功的异步任务结果。所以任务的Future#isSuccess方法,返回true。所有监控任务的监听器将会被
     直接通知。所有阻塞方法调用,将会无阻塞直接返回。
     */
    <V> Future<V> newSucceededFuture(V result);

    /**
     * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
     * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     创建一个已经标记失败的异步任务结果。所以任务的Future#isSuccess方法,返回false。所有监控任务的监听器将会被
     直接通知。所有阻塞方法调用,将会无阻塞直接返回。
     */
    <V> Future<V> newFailedFuture(Throwable cause);
}

从上面可以看出事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的
异步结果。

再来看一下抽象事件执行器组AbstractEventExecutorGroup:
package io.netty.util.concurrent;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static io.netty.util.concurrent.AbstractEventExecutor.*;


/**
 * Abstract base class for {@link EventExecutorGroup} implementations.
 */
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
    //所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器的下一个事件执行器相应方法执行。
    @Override
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return next().submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return next().submit(task);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return next().schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return next().schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return next().scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
   //graceful方式关闭事件执行器组
    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     已启用
     */
    @Override
    @Deprecated
    public abstract void shutdown();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     已启用
     */
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return next().invokeAll(tasks);
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return next().invokeAll(tasks, timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return next().invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        return next().invokeAny(tasks, timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }
}

我们来看一下抽象事件执行器的关闭方法
 //graceful方式关闭事件执行器组
@Override
public Future<?> shutdownGracefully() {
    return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}


来看一下这些默认值在哪里定义:
/**
 * Abstract base class for {@link EventExecutor} implementations.
 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

    private final EventExecutorGroup parent;
    private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);
    ...
}



实际定义在抽象事件执行器中,默认关闭间隔为2s,超时时间为25s。

抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s。

再来看一下抽象事件执行器AbstractEventExecutor,
package io.netty.util.concurrent;

import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

/**
 * Abstract base class for {@link EventExecutor} implementations.
 继承了抽象执行器服务AbstractExecutorService
 */
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;//关闭执行器默认间隔
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;//关闭执行器超时等待时间

    private final EventExecutorGroup parent;//所属事件执行器组
    //当前事件执行器单例集
    private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);

    protected AbstractEventExecutor() {
        this(null);
    }

    protected AbstractEventExecutor(EventExecutorGroup parent) {
        this.parent = parent;
    }

    @Override
    public EventExecutorGroup parent() {
        return parent;
    }
    //next方法返回的为自己
    @Override
    public EventExecutor next() {
        return this;
    }
    //判断当前方法是否在事件循环中
    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

    @Override
    public Iterator<EventExecutor> iterator() {
        return selfCollection.iterator();
    } 
    //关闭执行器
    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     */
    @Override
    @Deprecated
    public abstract void shutdown();

    /**
     * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
     */
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }
    //创建异步任务结果
    @Override
    public <V> Promise<V> newPromise() {
        return new DefaultPromise<V>(this);
    }

    @Override
    public <V> ProgressivePromise<V> newProgressivePromise() {
        return new DefaultProgressivePromise<V>(this);
    }

    @Override
    public <V> Future<V> newSucceededFuture(V result) {
        return new SucceededFuture<V>(this, result);
    }

    @Override
    public <V> Future<V> newFailedFuture(Throwable cause) {
        return new FailedFuture<V>(this, cause);
    }
    //提交任务线程,直接委托给父类抽象执行器服务
    @Override
    public Future<?> submit(Runnable task) {
        return (Future<?>) super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return (Future<T>) super.submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return (Future<T>) super.submit(task);
    }

    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new PromiseTask<T>(this, runnable, value);
    }

    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new PromiseTask<T>(this, callable);
    }
    //从下面来看,不支持延时调度的周期间歇性调度任务线程
    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay,
                                       TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    /**
     * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
     安全地执行给定任务线程,捕捉抛出的异常
     */
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }
}

从上面来看,抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.<EventExecutor>singleton(this)),是自己单例集,next方法返回的是自己。

总结:


     事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable<EventExecutor>管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。
    事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取事件执行器组管理的事件执行器和执行任务线程方法。
    事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的异步结果。
    抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器组的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s,具体定义在抽象事件执行器AbstractEventExecutor中。
    抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.<EventExecutor>singleton(this)),是自己单例集,next方法返回的是自己。
0
0
分享到:
评论

相关推荐

    netty实现简单的聊天

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在这个“netty实现简单的聊天”的项目中,我们主要关注的是如何利用Netty构建一个基本的聊天系统,这涉及到...

    netty实现简易tomcat

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本项目中,我们将探讨如何利用 Netty 实现一个简易版的 Tomcat,即一个基础的 HTTP 服务器容器。Tomcat 是...

    Android基于Netty框架实现通信

    Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于多种协议的服务器和客户端应用。本篇文章将详细探讨如何在Android环境中利用Netty来构建通信系统。 首先,我们需要理解Android环境对Netty的特殊性。...

    netty框架图及netty面试知识点解析

    事件循环(EventLoop)是Netty处理事件的关键组件,它负责调度和执行事件处理器。 "粘包与半包"是网络通信中常见的问题。当发送的数据超过缓冲区大小时,可能会导致一个完整的数据包被拆分,或者多个数据包被合并到...

    netty实现微信聊天.zip

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨Netty的基本原理和如何利用它来实现微信聊天功能。 首先,理解Netty的核心概念至关...

    Netty和protocolbuf的通讯例子

    Bootstrap是用于创建服务器或客户端的启动器,Channel是网络连接的抽象,Handler是处理网络事件的接口,EventLoopGroup则是执行事件处理的线程池。在结合protobuf时,我们通常会自定义一个ByteToMessageDecoder来...

    netty官网学习手册中文版

    - **SingleThreadEventExecutor**:单线程事件执行器,每个EventLoop对应一个。 - **Worker Threads**:工作线程,负责处理I/O事件。 - **Boss Threads**:负责接收新的连接请求。 5. **Netty的性能优化** - **...

    Netty权威指南-Netty源码

    EventLoop 是 Netty 的事件处理机制,用于调度和执行任务。Pipeline 是一系列处理器的链,每个处理器(Handler)都有自己的职责,如解码、编码或者业务逻辑处理。 源码分析时,首先需要关注的是 Netty 的启动流程,...

    Netty example

    学习 Netty 的过程中,你还可以探索如何实现这些协议的解码器和编码器。 总的来说,这个“Netty example”项目是学习和实践 Netty 框架的好起点。通过深入研究和修改代码,你可以更好地掌握如何利用 Netty 构建高效...

    netty资源包.zip

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“netty资源包.zip”文件包含了开发人员在使用 Netty 实现通信时所需的各种 jar 包。这些库提供了丰富的...

    netty4.0源码,netty例子,netty api文档

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个压缩包包含的是Netty 4.0.0.CR3版本的相关资源,包括源码、示例以及API文档,对于学习和理解Netty的工作...

    Android-netty和socket通信的demo

    Netty的核心组件包括Bootstrap(启动器)、ServerBootstrap(服务器启动器)、Channel(通道)、EventLoop(事件循环)和ChannelHandler(处理程序)。通过这些组件,开发者可以方便地创建出可伸缩、低延迟的网络...

    netty实战教程、netty代码demo

    5. **EventLoop** 和 **EventLoopGroup**:线程模型,EventLoop 负责执行 Channel 上的 I/O 操作和事件处理,EventLoopGroup 是 EventLoop 的管理器,负责分配和调度 EventLoop。 三、Netty 的事件驱动模型 Netty ...

    netty-socketio api接口文档.7z

    Netty是一个高性能、异步事件驱动的网络应用程序框架,它简化了创建网络服务,如TCP和UDP服务器的过程。SocketIO则是一个实时应用框架,用于构建实时、双向通信的应用,它在客户端和服务器之间提供了基于WebSocket的...

    Netty权威指南源码-maven版

    5. **Channel 和 EventLoop**:Netty 中的 Channel 是网络连接的抽象,EventLoop 是处理 Channel 上事件的单线程执行器,这种设计使得 Netty 能够高效地处理 I/O 事件。 6. **零拷贝**:通过内存池和直接内存分配,...

    Netty项目代码

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“Netty项目代码”很可能是包含了使用Netty框架编写的网络通信程序示例或完整项目。在深入理解Netty之前...

    Netty in action 第二版 源码

    5. **EventLoop**: 事件循环是Netty异步模型的基础,它负责调度和执行事件处理器。每个EventLoop通常对应一个线程,处理多个Channel的事件。 6. **Bootstrap**: Bootstrap是启动服务器或客户端的配置类,可以设置...

    netty开发工具包

    6. **线程模型**:Netty 使用多路复用器(Selector)和事件循环组(EventLoopGroup)进行线程管理,使得系统能处理大量并发连接。 7. **协议支持**:Netty 内置了对多种常见网络协议的支持,如HTTP、WebSocket、FTP...

    netty实战源码13章

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨 Netty 源码之前,我们先了解一下 Netty 的基本概念和架构。 Netty 提供了一种高度抽象的模型来...

Global site tag (gtag.js) - Google Analytics