- 浏览: 981030 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
netty 单线程事件执行器初始化:http://donald-draper.iteye.com/blog/2391895
netty 单线程事件执行器执行任务与graceful方式关闭:http://donald-draper.iteye.com/blog/2392051
netty 单线程事件循环:http://donald-draper.iteye.com/blog/2392067
netty nio事件循环初始化:http://donald-draper.iteye.com/blog/2392161
netty nio事件循环后续:http://donald-draper.iteye.com/blog/2392264
netty nio事件循环组:http://donald-draper.iteye.com/blog/2392300
前面我们来看netty的事件循环,从今天起,我们来看一下BootStap和ServerBootStrap:
在netty的相关测试实例中服务端有如下代码:
而客户端的为:
上面有两个类型我们需要关注,是下面两句:
从上面来看服务端引导配置ServerBootstrap用于配置服务端的Nio事件循环组,通道,通道处理器,及通道相关配置,客户端引导配置Bootstap用于配置客户端的Nio事件循环组,通道,通道处理器,及通道相关配置。
我们来看一下两种引导配置的定义:
两种引导配置都继承与AbstractBootstrap,只是类型参数不同,服务端通道类型参数为ServerChannel,
客户端为Channel。
我们先来看一下AbstractBootstrap:
从上面来看抽象引导程序AbstractBootstrap,内部关联的一个事件循环组EventLoopGroup,
一个通道处理器ChannelHandler,一个通道配置集和一个本地Socket地址及一个通道属性集。
下面来看其他方法:
回到配置通道方法的这一句:
我们来看一下反射通道工厂:
再来看配置本地地址:
再来看配置通道选项
/
ChannelOption定义见附篇。
//配置通道属性,与配置通道选项思路相同
AttributeKey定义见附篇。
从上面可以看出,注册通道到事件循环组,首先由通道工厂创建通道实例,然后初始化通道,初始化工作待子类实现;然后将实际注册工作委托给事件循环组。
再来看地址绑定方法:
//SocketUtils
从上面来看,绑定socket地址实际通过doBind方法
//PendingRegistrationPromise,通道注册任务异步任务结果
再来看实际绑定工作
从上面可以看出绑定socket地址,首先注册通道到事件循环组,待注册完成时,创建一个绑定任务线程完成地址绑定,
实际将地址绑定工作委托给通道,并将绑定任务线程交由通道关联的事件循环的事件执行器执行。
再来看其他方法,
下面几个方法主要是获取通道属性,通道选型,通道处理器,通道工厂
总结:
抽象引导程序AbstractBootstrap,内部关联的一个事件循环组EventLoopGroup,一个通道处理器ChannelHandler,一个通道选项集和一个本地Socket地址及一个通道属性集。内部的方法主要配置事件循环组,通道处理,通道选项集,socket地址,及通道属性,通道注册,地址绑定。注册通道到事件循环组,首先由通道工厂创建通道实例,然后初始化通道,初始化工作待子类实现;然后将实际注册工作委托给事件循环组。绑定定socket地址,首先注册通道到事件循环组,待注册完成时,创建一个绑定任务线程完成地址绑定,实际将地址绑定工作委托给通道,并将绑定任务线程交由通道关联的事件循环的事件执行器执行。
附:
从上面可以看出通道选项实际为一个抽象常量,主要的操作都是委托给内部的常量池ConstantPool,
下面来看常量池的定义:
从上面可以看出常量池,实际上用一个并发Map来存放常量,所有的操作委托为Map。
再来看通道选项的父类,抽象常量
//Constant
再来看属性key
从上面来看属性key也是一个常量,内部用常量池存放属性常量。
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
netty 单线程事件执行器初始化:http://donald-draper.iteye.com/blog/2391895
netty 单线程事件执行器执行任务与graceful方式关闭:http://donald-draper.iteye.com/blog/2392051
netty 单线程事件循环:http://donald-draper.iteye.com/blog/2392067
netty nio事件循环初始化:http://donald-draper.iteye.com/blog/2392161
netty nio事件循环后续:http://donald-draper.iteye.com/blog/2392264
netty nio事件循环组:http://donald-draper.iteye.com/blog/2392300
前面我们来看netty的事件循环,从今天起,我们来看一下BootStap和ServerBootStrap:
在netty的相关测试实例中服务端有如下代码:
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //ServerBootstrap,用于配置服务端,一般为ServerSocket通道 ServerBootstrap serverBoot = new ServerBootstrap(); serverBoot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //添加通道处理器到通道关联的管道,准确的中文翻译为管道线, 此管道线与Mina中过滤链十分相似, //ChannelInitializer用于配置通道的管道线,ChannelPipeline ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new EchoServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128)//socket监听器连接队列大小、 .childOption(ChannelOption.SO_KEEPALIVE, true); //保活,此配置针对ServerSocket通道接收连接产生的Socket通道 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port); // 绑定地址,开始监听 ChannelFuture f = serverBoot.bind(inetSocketAddress).sync(); log.info("=========Server is start========="); //等待,直到ServerSocket关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }
而客户端的为:
EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Bootstrap,用于配置客户端,一般为Socket通道 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加安全套接字处理器和通道处理器到 ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port)); } pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new EchoClientHandler()); } }); InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port); //连接socket地址 ChannelFuture f = bootstrap.connect(inetSocketAddress).sync(); log.info("=========Client is start========="); //等待,直到连接关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); }
上面有两个类型我们需要关注,是下面两句:
ServerBootstrap serverBoot = new ServerBootstrap(); Bootstrap bootstrap = new Bootstrap();
从上面来看服务端引导配置ServerBootstrap用于配置服务端的Nio事件循环组,通道,通道处理器,及通道相关配置,客户端引导配置Bootstap用于配置客户端的Nio事件循环组,通道,通道处理器,及通道相关配置。
我们来看一下两种引导配置的定义:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
两种引导配置都继承与AbstractBootstrap,只是类型参数不同,服务端通道类型参数为ServerChannel,
客户端为Channel。
我们先来看一下AbstractBootstrap:
package io.netty.bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.ReflectiveChannelFactory; import io.netty.util.internal.SocketUtils; import io.netty.util.AttributeKey; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; /** * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}. *抽象引导程序AbstractBootstrap使启动一个通道更容易的工具类。支持链式方法编程,提供配置抽象引导程序的便捷方式。 * When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless * transports such as datagram (UDP). 当不在Server引导程序上下文下使用时,绑定方法可以用于无连接的传输,比如报文传输 */ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { volatile EventLoopGroup group;//事件循环组 @SuppressWarnings("deprecation") private volatile ChannelFactory<? extends C> channelFactory;//通道工厂,已丢器 private volatile SocketAddress localAddress;//本地socket地址 //通道配置项 private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); //通道属性 private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>(); private volatile ChannelHandler handler;//通道处理器 AbstractBootstrap() { // Disallow extending from a different package. } AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) { group = bootstrap.group; channelFactory = bootstrap.channelFactory; handler = bootstrap.handler; localAddress = bootstrap.localAddress; synchronized (bootstrap.options) { options.putAll(bootstrap.options); } synchronized (bootstrap.attrs) { attrs.putAll(bootstrap.attrs); } } }
从上面来看抽象引导程序AbstractBootstrap,内部关联的一个事件循环组EventLoopGroup,
一个通道处理器ChannelHandler,一个通道配置集和一个本地Socket地址及一个通道属性集。
下面来看其他方法:
/** * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created * {@link Channel} 配置事件循环组,处理所有创建的事件 */ @SuppressWarnings("unchecked") public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return (B) this; }
/** * The {@link Class} which is used to create {@link Channel} instances from. * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your * {@link Channel} implementation has no no-args constructor. 通道类channelClass,用于创建通道实例,如果你的通道实现为无参构造可以使用此方法或 #channelFactory */ public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
/** * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} * is not working for you because of some more complex needs. If your {@link Channel} implementation * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for * simplify your code. 当调用绑定方法时,通道工厂由于创建通道实例。 */ @SuppressWarnings({ "unchecked", "deprecation" }) public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } ** * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead. */ @Deprecated @SuppressWarnings("unchecked") public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return (B) this; }
回到配置通道方法的这一句:
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
我们来看一下反射通道工厂:
package io.netty.channel; import io.netty.util.internal.StringUtil; /** * A {@link ChannelFactory} that instantiates a new {@link Channel} by invoking its default constructor reflectively. */ public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } //创建通道实例 @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; } }
再来看配置本地地址:
/** * The {@link SocketAddress} which is used to bind the local "end" to. */ @SuppressWarnings("unchecked") public B localAddress(SocketAddress localAddress) { this.localAddress = localAddress; return (B) this; } /** * @see #localAddress(SocketAddress) */ public B localAddress(int inetPort) { return localAddress(new InetSocketAddress(inetPort)); } /** * @see #localAddress(SocketAddress) */ public B localAddress(String inetHost, int inetPort) { return localAddress(SocketUtils.socketAddress(inetHost, inetPort)); } /** * @see #localAddress(SocketAddress) */ public B localAddress(InetAddress inetHost, int inetPort) { return localAddress(new InetSocketAddress(inetHost, inetPort)); }
再来看配置通道选项
/
** * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}. */ @SuppressWarnings("unchecked") public <T> B option(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { //置为空,则移除通道配置选项 synchronized (options) { options.remove(option); } } else { //否则添加通道配置选择到配置选项集 synchronized (options) { options.put(option, value); } } return (B) this; }
ChannelOption定义见附篇。
//配置通道属性,与配置通道选项思路相同
/** * Allow to specify an initial attribute of the newly created {@link Channel}. If the {@code value} is * {@code null}, the attribute of the specified {@code key} is removed. */ @SuppressWarnings("unchecked") public <T> B attr(AttributeKey<T> key, T value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { synchronized (attrs) { attrs.remove(key); } } else { synchronized (attrs) { attrs.put(key, value); } } return (B) this; }
AttributeKey定义见附篇。
/** * Validate all the parameters. Sub-classes may override this, but should * call the super method in that case. 验证参数,子类可以重写此方法 */ @SuppressWarnings("unchecked") public B validate() { if (group == null) { throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return (B) this; }
/** * Returns a deep clone of this bootstrap which has the identical configuration. This method is useful when making * multiple {@link Channel}s with similar settings. Please note that this method does not clone the * {@link EventLoopGroup} deeply but shallowly, making the group a shared resource. 克隆引导配置 */ @Override @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException") public abstract B clone();
/** * Create a new {@link Channel} and register it with an {@link EventLoop}. 创建一个通道,注册到事件循环 */ public ChannelFuture register() { validate(); return initAndRegister(); } //初始化通道,注册通道到事件循环 final ChannelFuture initAndRegister() { Channel channel = null; try { //创建通道实例 channel = channelFactory.newChannel(); init(channel);//初始化通道 } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } //委托给事件循环组 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
//初始化通道 abstract void init(Channel channel) throws Exception;
/** * Returns the {@link AbstractBootstrapConfig} object that can be used to obtain the current config * of the bootstrap. 获取启动项当前配置 */ public abstract AbstractBootstrapConfig<B, C> config();
从上面可以看出,注册通道到事件循环组,首先由通道工厂创建通道实例,然后初始化通道,初始化工作待子类实现;然后将实际注册工作委托给事件循环组。
再来看地址绑定方法:
/** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind() { validate(); SocketAddress localAddress = this.localAddress; if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } return doBind(localAddress); } /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); }
/** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(String inetHost, int inetPort) { return bind(SocketUtils.socketAddress(inetHost, inetPort)); }
//SocketUtils
/** * Provides socket operations with privileges enabled. This is necessary for applications that use the * {@link SecurityManager} to restrict {@link SocketPermission} to their application. By asserting that these * operations are privileged, the operations can proceed even if some code in the calling chain lacks the appropriate * {@link SocketPermission}. socket操作访问控制权限Utils */ public final class SocketUtils { ... private SocketUtils() { } //在当前访问控制权限下,根据主机名和端口号创建Socket地址 public static InetSocketAddress socketAddress(final String hostname, final int port) { return AccessController.doPrivileged(new PrivilegedAction<InetSocketAddress>() { @Override public InetSocketAddress run() { return new InetSocketAddress(hostname, port); } }); } ... }
/** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(InetAddress inetHost, int inetPort) { return bind(new InetSocketAddress(inetHost, inetPort)); } /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
从上面来看,绑定socket地址实际通过doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) { //首先注册通道到事件循环组 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); //异常返回 if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); //通道注册到时间循环组,成功,委托给doBind0完成实际socket的地址绑定 doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. //注册工作大部分情况下,已经完成,但有可能存在意外情况 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 //注册成功,执行器可以使用 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
//PendingRegistrationPromise,通道注册任务异步任务结果
static final class PendingRegistrationPromise extends DefaultChannelPromise { // Is set to the correct EventExecutor once the registration was successful. Otherwise it will // stay null and so the GlobalEventExecutor.INSTANCE will be used for notifications. private volatile boolean registered; PendingRegistrationPromise(Channel channel) { super(channel); } void registered() { registered = true; } @Override protected EventExecutor executor() { if (registered) { // If the registration was a success executor is set. //注册成功,即通道注册到nio事件循环中,然后获取nio事件循环的事件执行器 // See https://github.com/netty/netty/issues/2586 return super.executor(); } // The registration failed so we can only use the GlobalEventExecutor as last resort to notify. return GlobalEventExecutor.INSTANCE; } }
再来看实际绑定工作
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. // 此方法在通道注册前触发调用。 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { //委托给通道 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
从上面可以看出绑定socket地址,首先注册通道到事件循环组,待注册完成时,创建一个绑定任务线程完成地址绑定,
实际将地址绑定工作委托给通道,并将绑定任务线程交由通道关联的事件循环的事件执行器执行。
再来看其他方法,
/** * the {@link ChannelHandler} to use for serving the requests. 配置通道处理器 */ @SuppressWarnings("unchecked") public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return (B) this; } /** * Returns the configured {@link EventLoopGroup} or {@code null} if non is configured yet. *返回事件循环组 * @deprecated Use {@link #config()} instead. */ @Deprecated public final EventLoopGroup group() { return group; } //拷贝Map static <K, V> Map<K, V> copiedMap(Map<K, V> map) { final Map<K, V> copied; synchronized (map) { if (map.isEmpty()) { return Collections.emptyMap(); } copied = new LinkedHashMap<K, V>(map); } return Collections.unmodifiableMap(copied); }
下面几个方法主要是获取通道属性,通道选型,通道处理器,通道工厂
final Map<ChannelOption<?>, Object> options0() { return options; } final Map<AttributeKey<?>, Object> attrs0() { return attrs; } final SocketAddress localAddress() { return localAddress; } @SuppressWarnings("deprecation") final ChannelFactory<? extends C> channelFactory() { return channelFactory; } final ChannelHandler handler() { return handler; } final Map<ChannelOption<?>, Object> options() { return copiedMap(options); } final Map<AttributeKey<?>, Object> attrs() { return copiedMap(attrs); }
//设置通道选项 static void setChannelOptions( Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) { for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) { setChannelOption(channel, e.getKey(), e.getValue(), logger); } } static void setChannelOptions( Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) { for (Map.Entry<ChannelOption<?>, Object> e: options) { setChannelOption(channel, e.getKey(), e.getValue(), logger); } } //配置通道选项 @SuppressWarnings("unchecked") private static void setChannelOption( Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) { try { if (!channel.config().setOption((ChannelOption<Object>) option, value)) { logger.warn("Unknown channel option '{}' for channel '{}'", option, channel); } } catch (Throwable t) { logger.warn( "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t); } }
总结:
抽象引导程序AbstractBootstrap,内部关联的一个事件循环组EventLoopGroup,一个通道处理器ChannelHandler,一个通道选项集和一个本地Socket地址及一个通道属性集。内部的方法主要配置事件循环组,通道处理,通道选项集,socket地址,及通道属性,通道注册,地址绑定。注册通道到事件循环组,首先由通道工厂创建通道实例,然后初始化通道,初始化工作待子类实现;然后将实际注册工作委托给事件循环组。绑定定socket地址,首先注册通道到事件循环组,待注册完成时,创建一个绑定任务线程完成地址绑定,实际将地址绑定工作委托给通道,并将绑定任务线程交由通道关联的事件循环的事件执行器执行。
附:
package io.netty.channel; import io.netty.buffer.ByteBufAllocator; import io.netty.util.AbstractConstant; import io.netty.util.ConstantPool; import java.net.InetAddress; import java.net.NetworkInterface; /** * A {@link ChannelOption} allows to configure a {@link ChannelConfig} in a type-safe * way. Which {@link ChannelOption} is supported depends on the actual implementation * of {@link ChannelConfig} and may depend on the nature of the transport it belongs * to. *通道配置项运行以类型安全的方式配置通道选项,具体支持何种通道选项,依赖于具体的通道配置实现 或所属的传输类型transport * @param <T> the type of the value which is valid for the {@link ChannelOption} */ public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> { //常量池,存放配置选项常量 private static final ConstantPool<ChannelOption<Object>> pool = new ConstantPool<ChannelOption<Object>>() { @Override protected ChannelOption<Object> newConstant(int id, String name) { return new ChannelOption<Object>(id, name); } }; /** * Returns the {@link ChannelOption} of the specified name. 获取name对象的通道选项值 */ @SuppressWarnings("unchecked") public static <T> ChannelOption<T> valueOf(String name) { return (ChannelOption<T>) pool.valueOf(name); } /** * Shortcut of {@link #valueOf(String) valueOf(firstNameComponent.getName() + "#" + secondNameComponent)}. 获取firstNameComponent.getName() + "#" + secondNameComponent的属性值 */ @SuppressWarnings("unchecked") public static <T> ChannelOption<T> valueOf(Class<?> firstNameComponent, String secondNameComponent) { return (ChannelOption<T>) pool.valueOf(firstNameComponent, secondNameComponent); } /** * Returns {@code true} if a {@link ChannelOption} exists for the given {@code name}. 判断是否存在name对应的通道选项 */ public static boolean exists(String name) { return pool.exists(name); } /** * Creates a new {@link ChannelOption} for the given {@code name} or fail with an * {@link IllegalArgumentException} if a {@link ChannelOption} for the given {@code name} exists. 根据名字创建通道配置选项 */ @SuppressWarnings("unchecked") public static <T> ChannelOption<T> newInstance(String name) { return (ChannelOption<T>) pool.newInstance(name); } //字节buf分配器 public static final ChannelOption<ByteBufAllocator> ALLOCATOR = valueOf("ALLOCATOR"); //接受buf分配器 public static final ChannelOption<RecvByteBufAllocator> RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR"); //消息大小估算器 public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR"); //连接超时时间 public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS"); /** * @deprecated Use {@link MaxMessagesRecvByteBufAllocator} 每次读取,允许读取的最大消息 */ @Deprecated public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ"); //每次写的自旋次数 public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT"); /** * @deprecated Use {@link #WRITE_BUFFER_WATER_MARK} */ @Deprecated public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK"); /** * @deprecated Use {@link #WRITE_BUFFER_WATER_MARK} */ @Deprecated public static final ChannelOption<Integer> WRITE_BUFFER_LOW_WATER_MARK = valueOf("WRITE_BUFFER_LOW_WATER_MARK"); public static final ChannelOption<WriteBufferWaterMark> WRITE_BUFFER_WATER_MARK = valueOf("WRITE_BUFFER_WATER_MARK"); public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE = valueOf("ALLOW_HALF_CLOSURE"); public static final ChannelOption<Boolean> AUTO_READ = valueOf("AUTO_READ"); /** * @deprecated Auto close will be removed in a future release. * * If {@code true} then the {@link Channel} is closed automatically and immediately on write failure. * The default value is {@code true}. */ @Deprecated public static final ChannelOption<Boolean> AUTO_CLOSE = valueOf("AUTO_CLOSE"); //下面这些配置项就与socket的相关配置,我们在nio分类博客中有讲,下面这些用于socket通道 public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST"); public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE"); public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF"); public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF"); public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR"); public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER"); public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG"); public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT"); //用于报文通道 public static final ChannelOption<Integer> IP_TOS = valueOf("IP_TOS"); public static final ChannelOption<InetAddress> IP_MULTICAST_ADDR = valueOf("IP_MULTICAST_ADDR"); public static final ChannelOption<NetworkInterface> IP_MULTICAST_IF = valueOf("IP_MULTICAST_IF"); public static final ChannelOption<Integer> IP_MULTICAST_TTL = valueOf("IP_MULTICAST_TTL"); public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED"); public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY"); @Deprecated public static final ChannelOption<Boolean> DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION = valueOf("DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION"); public static final ChannelOption<Boolean> SINGLE_EVENTEXECUTOR_PER_GROUP = valueOf("SINGLE_EVENTEXECUTOR_PER_GROUP"); /** * Creates a new {@link ChannelOption} with the specified unique {@code name}. 创建通道选项 */ private ChannelOption(int id, String name) { super(id, name); } @Deprecated protected ChannelOption(String name) { this(pool.nextId(), name); } /** * Validate the value which is set for the {@link ChannelOption}. Sub-classes * may override this for special checks. 校验通道选项的值 */ public void validate(T value) { if (value == null) { throw new NullPointerException("value"); } } }
从上面可以看出通道选项实际为一个抽象常量,主要的操作都是委托给内部的常量池ConstantPool,
下面来看常量池的定义:
package io.netty.util; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PlatformDependent; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** * A pool of {@link Constant}s. * * @param <T> the type of the constant */ public abstract class ConstantPool<T extends Constant<T>> { //常量集 private final ConcurrentMap<String, T> constants = PlatformDependent.newConcurrentHashMap(); //常量id生产器 private final AtomicInteger nextId = new AtomicInteger(1); /** * Shortcut of {@link #valueOf(String) valueOf(firstNameComponent.getName() + "#" + secondNameComponent)}. 获取firstNameComponent.getName() + "#" + secondNameComponent对应的常量 */ public T valueOf(Class<?> firstNameComponent, String secondNameComponent) { if (firstNameComponent == null) { throw new NullPointerException("firstNameComponent"); } if (secondNameComponent == null) { throw new NullPointerException("secondNameComponent"); } return valueOf(firstNameComponent.getName() + '#' + secondNameComponent); } /** * Returns the {@link Constant} which is assigned to the specified {@code name}. * If there's no such {@link Constant}, a new one will be created and returned. * Once created, the subsequent calls with the same {@code name} will always return the previously created one * (i.e. singleton.) *返回name对应常量 * @param name the name of the {@link Constant} */ public T valueOf(String name) { checkNotNullAndNotEmpty(name); return getOrCreate(name); } /** * Get existing constant by name or creates new one if not exists. Threadsafe *获取name对应的常量,没有则创建 * @param name the name of the {@link Constant} */ private T getOrCreate(String name) { T constant = constants.get(name); if (constant == null) { final T tempConstant = newConstant(nextId(), name); constant = constants.putIfAbsent(name, tempConstant); if (constant == null) { return tempConstant; } } return constant; } /** * Returns {@code true} if a {@link AttributeKey} exists for the given {@code name}. 检查name常量是否存在 */ public boolean exists(String name) { checkNotNullAndNotEmpty(name); return constants.containsKey(name); } /** * Creates a new {@link Constant} for the given {@code name} or fail with an * {@link IllegalArgumentException} if a {@link Constant} for the given {@code name} exists. 创建name常量 */ public T newInstance(String name) { checkNotNullAndNotEmpty(name); return createOrThrow(name); } /** * Creates constant by name or throws exception. Threadsafe *创建name对应的常量 * @param name the name of the {@link Constant} */ private T createOrThrow(String name) { T constant = constants.get(name); if (constant == null) { final T tempConstant = newConstant(nextId(), name); constant = constants.putIfAbsent(name, tempConstant); if (constant == null) { return tempConstant; } } throw new IllegalArgumentException(String.format("'%s' is already in use", name)); } private static String checkNotNullAndNotEmpty(String name) { ObjectUtil.checkNotNull(name, "name"); if (name.isEmpty()) { throw new IllegalArgumentException("empty name"); } return name; } //创建常量实例 protected abstract T newConstant(int id, String name); @Deprecated public final int nextId() { return nextId.getAndIncrement(); } }
从上面可以看出常量池,实际上用一个并发Map来存放常量,所有的操作委托为Map。
再来看通道选项的父类,抽象常量
public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
package io.netty.util; import java.util.concurrent.atomic.AtomicLong; /** * Base implementation of {@link Constant}. */ public abstract class AbstractConstant<T extends AbstractConstant<T>> implements Constant<T> { private static final AtomicLong uniqueIdGenerator = new AtomicLong(); private final int id;//常量id private final String name;//常量name private final long uniquifier;//常量唯一识别号 /** * Creates a new instance. */ protected AbstractConstant(int id, String name) { this.id = id; this.name = name; this.uniquifier = uniqueIdGenerator.getAndIncrement(); } @Override public final String name() { return name; } @Override public final int id() { return id; } @Override public final String toString() { return name(); } @Override public final int hashCode() { return super.hashCode(); } @Override public final boolean equals(Object obj) { return super.equals(obj); } @Override public final int compareTo(T o) { if (this == o) { return 0; } @SuppressWarnings("UnnecessaryLocalVariable") AbstractConstant<T> other = o; int returnCode; returnCode = hashCode() - other.hashCode(); if (returnCode != 0) { return returnCode; } if (uniquifier < other.uniquifier) { return -1; } if (uniquifier > other.uniquifier) { return 1; } throw new Error("failed to compare two different constants"); } }
//Constant
package io.netty.util; /** * A singleton which is safe to compare via the {@code ==} operator. Created and managed by {@link ConstantPool}. */ public interface Constant<T extends Constant<T>> extends Comparable<T> { /** * Returns the unique number assigned to this {@link Constant}. */ int id(); /** * Returns the name of this {@link Constant}. */ String name(); }
再来看属性key
package io.netty.util; /** * Key which can be used to access {@link Attribute} out of the {@link AttributeMap}. Be aware that it is not be * possible to have multiple keys with the same name. * * @param <T> the type of the {@link Attribute} which can be accessed via this {@link AttributeKey}. */ @SuppressWarnings("UnusedDeclaration") // 'T' is used only at compile time public final class AttributeKey<T> extends AbstractConstant<AttributeKey<T>> { //常量池 private static final ConstantPool<AttributeKey<Object>> pool = new ConstantPool<AttributeKey<Object>>() { @Override protected AttributeKey<Object> newConstant(int id, String name) { return new AttributeKey<Object>(id, name); } }; /** * Returns the singleton instance of the {@link AttributeKey} which has the specified {@code name}. */ @SuppressWarnings("unchecked") public static <T> AttributeKey<T> valueOf(String name) { return (AttributeKey<T>) pool.valueOf(name); } /** * Returns {@code true} if a {@link AttributeKey} exists for the given {@code name}. */ public static boolean exists(String name) { return pool.exists(name); } /** * Creates a new {@link AttributeKey} for the given {@code name} or fail with an * {@link IllegalArgumentException} if a {@link AttributeKey} for the given {@code name} exists. */ @SuppressWarnings("unchecked") public static <T> AttributeKey<T> newInstance(String name) { return (AttributeKey<T>) pool.newInstance(name); } @SuppressWarnings("unchecked") public static <T> AttributeKey<T> valueOf(Class<?> firstNameComponent, String secondNameComponent) { return (AttributeKey<T>) pool.valueOf(firstNameComponent, secondNameComponent); } private AttributeKey(int id, String name) { super(id, name); } }
从上面来看属性key也是一个常量,内部用常量池存放属性常量。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1321netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2444netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1316netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1594netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1844netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1397netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2834netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2177netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2037netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1078netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1877netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1218netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1202netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 957netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1309netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1077netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1855netty 管道线定义-ChannelPipeline:htt ...
相关推荐
在“netty-socket-master”这个压缩包中,可能包含了服务器端和客户端的代码实现,包括Bootstrap的配置、ChannelHandler的定义、数据传输的逻辑等。通过对这些代码的学习和分析,开发者可以深入理解Netty在Android上...
在创建Netty服务器时,我们首先定义一个服务器端的Bootstrap(引导类),然后配置一个ServerBootstrap,指定服务器使用的BossGroup(接收新连接)和WorkerGroup(处理已有连接的IO操作)。接着,我们需要提供一个...
Bootstrap是用于创建服务器或客户端的启动器,Channel是网络连接的抽象,Handler是处理网络事件的接口,EventLoopGroup则是执行事件处理的线程池。在结合protobuf时,我们通常会自定义一个ByteToMessageDecoder来...
`io.netty.bootstrap`、`io.netty.channel`等包提供了Netty的核心传输组件。 #### transport-native-epoll `io.netty.channel.epoll`和`io.netty.channel.unix`提供了针对Linux平台的原生EPOLL支持。 #### ...
ChannelHandler是Netty的核心组件之一,它定义了处理这些事件的接口,如读取、写入、连接和关闭事件。 Netty 4.1引入了许多增强和改进,包括: 1. **ByteBuf**:Netty的缓冲区机制,它比Java的ByteBuffer更强大,...
2. **Channel**: Channel 是 Netty 中的抽象概念,它代表了一个打开的连接,可以用来读取和写入数据。Channel 可以是 TCP 连接、UDP 套接字或者本地进程间通信的管道。 3. **EventLoop**: EventLoop 是 Netty 的...
Spring框架可以用于管理和装配Netty的相关组件,例如,我们可以用Spring的Bean管理来创建和配置Netty的Bootstrap、ChannelHandler等。同时,Spring还可以处理业务逻辑,如消息的验证、处理和存储。Maven作为项目构建...
1. **Channel**:在Netty中,Channel是网络连接的抽象,用于读写数据。 2. **Bootstrap**:启动器,用于配置并启动一个Channel实例。 3. **Handler**:处理器,Netty中的事件驱动模型的关键,负责处理I/O事件和业务...
Bootstrap和ServerBootstrap分别用于创建客户端和服务端的连接,Channel是Netty中数据传输的抽象,而EventLoop则负责处理I/O事件。 在实现聊天功能时,我们需要定义两个关键组件:服务器端的处理器和客户端的处理器...
Bootstrap 和 ServerBootstrap 用于创建客户端和服务端连接,Channel 是网络连接的抽象,Handler 处理 I/O 事件,EventLoopGroup 负责事件的调度。 4. **简单服务端示例**: 创建一个简单的 Netty 服务器,首先...
首先,我们要理解Netty的五大组件:Bootstrap、Channel、ChannelHandler、EventLoop和Pipeline。Bootstrap是应用的启动器,用于配置和启动连接;Channel是网络连接的抽象,用于读写数据;ChannelHandler是处理网络...
然后,创建Netty的客户端Bootstrap,配置好EventLoopGroup(通常使用NioEventLoopGroup,它是非阻塞I/O的实现),并设置ChannelFactory来创建客户端Channel。接着,定义Pipeline,添加自定义的处理器,处理器可以...
- **Bootstrap**:是Netty应用启动引导类,用于初始化并启动Netty服务。 - **Channel**:是所有I/O操作的基础,可以理解为连接的抽象表示。 - **EventLoop**:负责处理I/O事件,包括注册、读取、写入等操作。 - **...
2. **Channel**:在Netty中,Channel是网络连接的抽象,它代表了到另一个实体的连接,如套接字或管道。你可以通过Channel进行读写操作,并监听各种网络事件。 3. **ByteBuf**:Netty提供了自己的缓冲区实现ByteBuf...
Bootstrap是启动网络服务的入口,Channel是连接的抽象,Handler是处理网络事件的核心,Pipeline则负责事件的传递和处理流程。通过这些组件,开发者可以灵活地定制网络通信的行为。 在协议支持方面,Netty提供了一...
- **Bootstrap**:初始化Netty服务器或客户端的核心组件,配置Netty的参数。 - **Channel**:代表网络连接的一端,可以是ServerSocketChannel、SocketChannel等。 - **EventLoopGroup**:负责处理I/O操作的线程组,...
Netty 的核心组件包括 Channel、Bootstrap、Pipeline 和 EventLoopGroup。Channel 是网络连接的抽象,可以是 TCP、UDP 或其他协议。Bootstrap 是启动网络服务的配置类,而 Pipeline 是处理进出场消息的链式结构。...
Bootstrap和ServerBootstrap负责配置并启动连接,而Channel是网络连接的抽象,它负责处理I/O操作。Handler是Netty中的事件处理器,用于处理I/O事件,如读取、写入、连接和断开等。EventLoop则负责调度这些事件并执行...
3. **Bootstrap**:Bootstrap 是启动服务器或客户端的配置类,它定义了 Channel 实例的初始化参数,如事件循环组、处理器管道等。 4. **EventLoopGroup**:事件循环组是 Netty 的多线程模型,它管理一组线程,这些...