- 浏览: 984494 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 抽象BootStrap定义:http://donald-draper.iteye.com/blog/2392492
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
引言:
上面一篇文章我们看了ServerBootstrap,先来回顾一下:
服务端Bootstrap虽然继承与抽象Bootstrap,但他有自己的child通道选项及属性集,事件循环组和通道处理器,这些是用于配置,当Server通道接收客户端的请求,创建与客户端交互的通道。当构造Server引导配置时,如果传递一个事件循环,则Server通道监听器和客户端交互的通道公用一个事件循环组,否则parentGroup事件循环组用于监听器ServerChannel接受连接,childGroup事件循环组用于处理与客户端交互的通道相关事件和IO操作。
Server引导配置绑定socket地址,首先初始化通道,对于Server引导配置,这个通道为NioServerSocketChannel,初始化通道,即初始化Server通道;初始化Server通道,首先将Server引导配置的父类抽象Bootstrap的选项和属性配置给Server通道,然后添加ServerBootstrapAcceptor到Server通道内部的Channel管道内,然后将Server通道注册到事件循环组parentGroup中,然后通过Server通道#bind方法完成实际socket地址;Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。
今天我们来看客户端Bootstrap:
由于客户端引导配置Bootstrap,不像Server引导配置需要创建交互通道,所以可以看到客户端引导配置没有在配置
子事件循环。在客户端实例中有这么几句:
我们需要关注的是这一句:
引导配置连接远端socket地址:
从上面来了,连接远端socket地址,实际委托给doResolveAndConnect方法:
下面我们先看一下初始化通道方法,再来看实际连接完成方法doResolveAndConnect0。
//初始化通道
我们来理一下,客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。
再来看其他方法:
总结:
客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。
附:
在客户端引导配置有一个地址解析组配置
我们来简单看一下:
//DefaultAddressResolverGroup,默认地址解析组
//DefaultNameResolver,默认命名解析器
//命名解析器InetNameResolver
先把InetNameResolver的父类SimpleNameResolver看一下,在看socket地址解析器InetSocketAddressResolver
//NameResolver
回到InetNameResolver的asAddressResolver方法的这一句
//socket地址解析器,InetSocketAddressResolver
来看地址解析方法,
//InetSocketAddress
再看另外两个地址解决方法,
这个命名Resolver为DefaultNameResolver,可以回到DefaultNameResolver的定义在上面就不说了,
来看InetSocketAddressResolver的父类AbstractAddressResolver:
//AddressResolver接口定义,这个就不说,看看就行
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
引言:
上面一篇文章我们看了ServerBootstrap,先来回顾一下:
服务端Bootstrap虽然继承与抽象Bootstrap,但他有自己的child通道选项及属性集,事件循环组和通道处理器,这些是用于配置,当Server通道接收客户端的请求,创建与客户端交互的通道。当构造Server引导配置时,如果传递一个事件循环,则Server通道监听器和客户端交互的通道公用一个事件循环组,否则parentGroup事件循环组用于监听器ServerChannel接受连接,childGroup事件循环组用于处理与客户端交互的通道相关事件和IO操作。
Server引导配置绑定socket地址,首先初始化通道,对于Server引导配置,这个通道为NioServerSocketChannel,初始化通道,即初始化Server通道;初始化Server通道,首先将Server引导配置的父类抽象Bootstrap的选项和属性配置给Server通道,然后添加ServerBootstrapAcceptor到Server通道内部的Channel管道内,然后将Server通道注册到事件循环组parentGroup中,然后通过Server通道#bind方法完成实际socket地址;Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。
今天我们来看客户端Bootstrap:
package io.netty.bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.resolver.AddressResolver; import io.netty.resolver.DefaultAddressResolverGroup; import io.netty.resolver.NameResolver; import io.netty.resolver.AddressResolverGroup; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; import java.util.Map.Entry; /** * A {@link Bootstrap} that makes it easy to bootstrap a {@link Channel} to use * for clients. 引导程序Bootstrap使客户端很容易启动一个通道。 * * The {@link #bind()} methods are useful in combination with connectionless transports such as datagram (UDP). * For regular TCP connections, please use the provided {@link #connect()} methods. 绑定方法#bind对无连接的报文通信UDP非常有用。对于Socket连接TCP,可以使用#connect连接方法。 */ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class); private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE; //引导配置 private final BootstrapConfig config = new BootstrapConfig(this); @SuppressWarnings("unchecked") private volatile AddressResolverGroup<SocketAddress> resolver = (AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER; private volatile SocketAddress remoteAddress;//远端socket地址 public Bootstrap() { } private Bootstrap(Bootstrap bootstrap) { super(bootstrap); resolver = bootstrap.resolver; remoteAddress = bootstrap.remoteAddress; } }
由于客户端引导配置Bootstrap,不像Server引导配置需要创建交互通道,所以可以看到客户端引导配置没有在配置
子事件循环。在客户端实例中有这么几句:
EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Bootstrap,用于配置客户端,一般为Socket通道 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加安全套接字处理器和通道处理器到 ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port)); } pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new EchoClientHandler()); } }); ... //连接socket地址 ChannelFuture f = bootstrap.connect(inetSocketAddress).sync(); ... }
我们需要关注的是这一句:
bootstrap.connect(inetSocketAddress).sync();
引导配置连接远端socket地址:
/** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(String inetHost, int inetPort) { return connect(InetSocketAddress.createUnresolved(inetHost, inetPort)); } /** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(InetAddress inetHost, int inetPort) { return connect(new InetSocketAddress(inetHost, inetPort)); } /** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doResolveAndConnect(remoteAddress, config.localAddress()); } /** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doResolveAndConnect(remoteAddress, localAddress); }
从上面来了,连接远端socket地址,实际委托给doResolveAndConnect方法:
/** * @see #connect() 连接操作 */ private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { //初始化通道,并注册通道到事件循环组 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } //初始化成功,则委托给doResolveAndConnect0完成实际连接操作 return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { //如果初始化和注册工作没有完成,添加任务结果监听器,待完成时,更新注册状态,完成实际连接操作 // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }
下面我们先看一下初始化通道方法,再来看实际连接完成方法doResolveAndConnect0。
//初始化通道
@Override @SuppressWarnings("unchecked") void init(Channel channel) throws Exception { //添加通道处理器到通道内存的Channel管道中 ChannelPipeline p = channel.pipeline(); p.addLast(config.handler()); //配置通道选项和属性 final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }
//实际连接操作 private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { final EventLoop eventLoop = channel.eventLoop(); final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop); if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { // Resolver has no idea about what to do with the specified remote address or it's resolved already. //如果地址解析器不知道如何应对远端地址,或已经解决则连接远端地址,完成实际连接 doConnect(remoteAddress, localAddress, promise); return promise; } final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress); if (resolveFuture.isDone()) { final Throwable resolveFailureCause = resolveFuture.cause(); if (resolveFailureCause != null) { // Failed to resolve immediately channel.close(); promise.setFailure(resolveFailureCause); } else { //远端地址解析完毕,并支持,完成实际连接 // Succeeded to resolve immediately; cached? (or did a blocking lookup) doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; } //如果地址解析任务没有完成,添加监听器,待任务完成时,完成连接操作 // Wait until the name resolution is finished. resolveFuture.addListener(new FutureListener<SocketAddress>() { @Override public void operationComplete(Future<SocketAddress> future) throws Exception { if (future.cause() != null) { channel.close(); promise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, promise); } } }); } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
//连接远端地址 private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); //创建一个任务线程完成实际连接远端地址操作,实际委托给通道的连接方法,任务线程交由通道所在的事件循环去执行 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { //实际委托给通道的连接方法 channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }
我们来理一下,客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。
再来看其他方法:
//配置远端socket地址 /** * The {@link SocketAddress} to connect to once the {@link #connect()} method * is called. */ public Bootstrap remoteAddress(SocketAddress remoteAddress) { this.remoteAddress = remoteAddress; return this; } /** * @see #remoteAddress(SocketAddress) */ public Bootstrap remoteAddress(String inetHost, int inetPort) { remoteAddress = InetSocketAddress.createUnresolved(inetHost, inetPort); return this; } /** * @see #remoteAddress(SocketAddress) */ public Bootstrap remoteAddress(InetAddress inetHost, int inetPort) { remoteAddress = new InetSocketAddress(inetHost, inetPort); return this; } /** * Connect a {@link Channel} to the remote peer. 连接远端peer通道 */ public ChannelFuture connect() { validate(); SocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { throw new IllegalStateException("remoteAddress not set"); } return doResolveAndConnect(remoteAddress, config.localAddress()); } //验证引导配置是否有效 @Override public Bootstrap validate() { super.validate(); if (config.handler() == null) { throw new IllegalStateException("handler not set"); } return this; } //克隆配置 @Override @SuppressWarnings("CloneDoesntCallSuperClone") public Bootstrap clone() { return new Bootstrap(this); } /** * Returns a deep clone of this bootstrap which has the identical configuration except that it uses * the given {@link EventLoopGroup}. This method is useful when making multiple {@link Channel}s with similar * settings. */ public Bootstrap clone(EventLoopGroup group) { Bootstrap bs = new Bootstrap(this); bs.group = group; return bs; } //获取引导配置 @Override public final BootstrapConfig config() { return config; } //获取远端socket地址 final SocketAddress remoteAddress() { return remoteAddress; } //获取地址解析组 final AddressResolverGroup<?> resolver() { return resolver; }
总结:
客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。
附:
在客户端引导配置有一个地址解析组配置
private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;
我们来简单看一下:
//DefaultAddressResolverGroup,默认地址解析组
package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.internal.UnstableApi; import java.net.InetSocketAddress; /** * A {@link AddressResolverGroup} of {@link DefaultNameResolver}s. */ @UnstableApi public final class DefaultAddressResolverGroup extends AddressResolverGroup<InetSocketAddress> { public static final DefaultAddressResolverGroup INSTANCE = new DefaultAddressResolverGroup(); private DefaultAddressResolverGroup() { } //创建地址解析器 @Override protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) throws Exception { return new DefaultNameResolver(executor).asAddressResolver(); } }
//DefaultNameResolver,默认命名解析器
package io.netty.resolver; import io.netty.util.internal.SocketUtils; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; /** * A {@link InetNameResolver} that resolves using JDK's built-in domain name lookup mechanism. * Note that this resolver performs a blocking name lookup from the caller thread. */ @UnstableApi public class DefaultNameResolver extends InetNameResolver { public DefaultNameResolver(EventExecutor executor) { super(executor); } @Override protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception { try { promise.setSuccess(SocketUtils.addressByName(inetHost)); } catch (UnknownHostException e) { promise.setFailure(e); } } @Override protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise) throws Exception { try { promise.setSuccess(Arrays.asList(SocketUtils.allAddressesByName(inetHost))); } catch (UnknownHostException e) { promise.setFailure(e); } } }
//命名解析器InetNameResolver
package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.internal.UnstableApi; import java.net.InetAddress; import java.net.InetSocketAddress; /** * A skeletal {@link NameResolver} implementation that resolves {@link InetAddress}. */ @UnstableApi public abstract class InetNameResolver extends SimpleNameResolver<InetAddress> { private volatile AddressResolver<InetSocketAddress> addressResolver;//地址解析器 /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(String)} */ protected InetNameResolver(EventExecutor executor) { super(executor); } /** * Return a {@link AddressResolver} that will use this name resolver underneath. * It's cached internally, so the same instance is always returned. 这个方法时我们要找的 */ public AddressResolver<InetSocketAddress> asAddressResolver() { AddressResolver<InetSocketAddress> result = addressResolver; if (result == null) { synchronized (this) { result = addressResolver; if (result == null) { //如果内部地址解析器为空,则创建一个socket地址解析器 addressResolver = result = new InetSocketAddressResolver(executor(), this); } } } return result; } }
先把InetNameResolver的父类SimpleNameResolver看一下,在看socket地址解析器InetSocketAddressResolver
//SimpleNameResolver package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.util.List; import static io.netty.util.internal.ObjectUtil.*; /** * A skeletal {@link NameResolver} implementation. */ @UnstableApi public abstract class SimpleNameResolver<T> implements NameResolver<T> { private final EventExecutor executor; /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(String)} */ protected SimpleNameResolver(EventExecutor executor) { this.executor = checkNotNull(executor, "executor"); } /** * Returns the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(String)}. */ protected EventExecutor executor() { return executor; } @Override public final Future<T> resolve(String inetHost) { final Promise<T> promise = executor().newPromise(); return resolve(inetHost, promise); } @Override public Future<T> resolve(String inetHost, Promise<T> promise) { checkNotNull(promise, "promise"); try { doResolve(inetHost, promise); return promise; } catch (Exception e) { return promise.setFailure(e); } } @Override public final Future<List<T>> resolveAll(String inetHost) { final Promise<List<T>> promise = executor().newPromise(); return resolveAll(inetHost, promise); } @Override public Future<List<T>> resolveAll(String inetHost, Promise<List<T>> promise) { checkNotNull(promise, "promise"); try { doResolveAll(inetHost, promise); return promise; } catch (Exception e) { return promise.setFailure(e); } } /** * Invoked by {@link #resolve(String)} to perform the actual name resolution. */ protected abstract void doResolve(String inetHost, Promise<T> promise) throws Exception; /** * Invoked by {@link #resolveAll(String)} to perform the actual name resolution. */ protected abstract void doResolveAll(String inetHost, Promise<List<T>> promise) throws Exception; @Override public void close() { } }
//NameResolver
package io.netty.resolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.io.Closeable; import java.util.List; /** * Resolves an arbitrary string that represents the name of an endpoint into an address. */ @UnstableApi public interface NameResolver<T> extends Closeable { /** * Resolves the specified name into an address. * * @param inetHost the name to resolve * * @return the address as the result of the resolution */ Future<T> resolve(String inetHost); /** * Resolves the specified name into an address. * * @param inetHost the name to resolve * @param promise the {@link Promise} which will be fulfilled when the name resolution is finished * * @return the address as the result of the resolution */ Future<T> resolve(String inetHost, Promise<T> promise); /** * Resolves the specified host name and port into a list of address. * * @param inetHost the name to resolve * * @return the list of the address as the result of the resolution */ Future<List<T>> resolveAll(String inetHost); /** * Resolves the specified host name and port into a list of address. * * @param inetHost the name to resolve * @param promise the {@link Promise} which will be fulfilled when the name resolution is finished * * @return the list of the address as the result of the resolution */ Future<List<T>> resolveAll(String inetHost, Promise<List<T>> promise); /** * Closes all the resources allocated and used by this resolver. */ @Override void close(); }
回到InetNameResolver的asAddressResolver方法的这一句
//如果内部地址解析器为空,则创建一个socket地址解析器 addressResolver = result = new InetSocketAddressResolver(executor(), this);
//socket地址解析器,InetSocketAddressResolver
package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; /** * A {@link AbstractAddressResolver} that resolves {@link InetSocketAddress}. */ @UnstableApi public class InetSocketAddressResolver extends AbstractAddressResolver<InetSocketAddress> { final NameResolver<InetAddress> nameResolver;//命名解决器,默认为DefaultNameResolver /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(java.net.SocketAddress)} * @param nameResolver the {@link NameResolver} used for name resolution */ public InetSocketAddressResolver(EventExecutor executor, NameResolver<InetAddress> nameResolver) { super(executor, InetSocketAddress.class); this.nameResolver = nameResolver; } @Override public void close() { nameResolver.close(); } }
来看地址解析方法,
//地址是否可以解析 @Override protected boolean doIsResolved(InetSocketAddress address) { return !address.isUnresolved(); }
//InetSocketAddress
/** * Checks whether the address has been resolved or not. * * @return <code>true</code> if the hostname couldn't be resolved into * an <code>InetAddress</code>. */ public final boolean isUnresolved() { return holder.isUnresolved(); }
再看另外两个地址解决方法,
@Override protected void doResolve(final InetSocketAddress unresolvedAddress, final Promise<InetSocketAddress> promise) throws Exception { // Note that InetSocketAddress.getHostName() will never incur a reverse lookup here, // because an unresolved address always has a host name. //委托给命令解决器 nameResolver.resolve(unresolvedAddress.getHostName()) .addListener(new FutureListener<InetAddress>() { @Override public void operationComplete(Future<InetAddress> future) throws Exception { if (future.isSuccess()) { promise.setSuccess(new InetSocketAddress(future.getNow(), unresolvedAddress.getPort())); } else { promise.setFailure(future.cause()); } } }); } @Override protected void doResolveAll(final InetSocketAddress unresolvedAddress, final Promise<List<InetSocketAddress>> promise) throws Exception { // Note that InetSocketAddress.getHostName() will never incur a reverse lookup here, // because an unresolved address always has a host name. //委托给命令解决器 nameResolver.resolveAll(unresolvedAddress.getHostName()) .addListener(new FutureListener<List<InetAddress>>() { @Override public void operationComplete(Future<List<InetAddress>> future) throws Exception { if (future.isSuccess()) { List<InetAddress> inetAddresses = future.getNow(); List<InetSocketAddress> socketAddresses = new ArrayList<InetSocketAddress>(inetAddresses.size()); for (InetAddress inetAddress : inetAddresses) { socketAddresses.add(new InetSocketAddress(inetAddress, unresolvedAddress.getPort())); } promise.setSuccess(socketAddresses); } else { promise.setFailure(future.cause()); } } }); }
这个命名Resolver为DefaultNameResolver,可以回到DefaultNameResolver的定义在上面就不说了,
来看InetSocketAddressResolver的父类AbstractAddressResolver:
package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.TypeParameterMatcher; import io.netty.util.internal.UnstableApi; import java.net.SocketAddress; import java.nio.channels.UnsupportedAddressTypeException; import java.util.Collections; import java.util.List; import static io.netty.util.internal.ObjectUtil.checkNotNull; /** * A skeletal {@link AddressResolver} implementation. */ @UnstableApi public abstract class AbstractAddressResolver<T extends SocketAddress> implements AddressResolver<T> { private final EventExecutor executor;//事件执行器 private final TypeParameterMatcher matcher;//socket地址匹配器 /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(SocketAddress)} */ protected AbstractAddressResolver(EventExecutor executor) { this.executor = checkNotNull(executor, "executor"); matcher = TypeParameterMatcher.find(this, AbstractAddressResolver.class, "T"); } /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(SocketAddress)} * @param addressType the type of the {@link SocketAddress} supported by this resolver */ protected AbstractAddressResolver(EventExecutor executor, Class<? extends T> addressType) { this.executor = checkNotNull(executor, "executor"); matcher = TypeParameterMatcher.get(addressType); } /** * Returns the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(SocketAddress)}. */ protected EventExecutor executor() { return executor; } //命名Resolver是否支持socket地址 @Override public boolean isSupported(SocketAddress address) { return matcher.match(address); } //socket地址是否可以Resolve @Override public final boolean isResolved(SocketAddress address) { if (!isSupported(address)) { throw new UnsupportedAddressTypeException(); } @SuppressWarnings("unchecked") final T castAddress = (T) address; //具体Resolve工作委托给doIsResolved方法 return doIsResolved(castAddress); } /** * Invoked by {@link #isResolved(SocketAddress)} to check if the specified {@code address} has been resolved * already. */ protected abstract boolean doIsResolved(T address); @Override public final Future<T> resolve(SocketAddress address) { if (!isSupported(checkNotNull(address, "address"))) { // Address type not supported by the resolver return executor().newFailedFuture(new UnsupportedAddressTypeException()); } if (isResolved(address)) { // Resolved already; no need to perform a lookup @SuppressWarnings("unchecked") final T cast = (T) address; return executor.newSucceededFuture(cast); } try { @SuppressWarnings("unchecked") final T cast = (T) address; final Promise<T> promise = executor().newPromise(); //具体Resolve工作委托给doIsResolved方法 doResolve(cast, promise); return promise; } catch (Exception e) { return executor().newFailedFuture(e); } } @Override public final Future<T> resolve(SocketAddress address, Promise<T> promise) { checkNotNull(address, "address"); checkNotNull(promise, "promise"); if (!isSupported(address)) { // Address type not supported by the resolver return promise.setFailure(new UnsupportedAddressTypeException()); } if (isResolved(address)) { // Resolved already; no need to perform a lookup @SuppressWarnings("unchecked") final T cast = (T) address; return promise.setSuccess(cast); } try { @SuppressWarnings("unchecked") final T cast = (T) address; //具体Resolve工作委托给doIsResolved方法 doResolve(cast, promise); return promise; } catch (Exception e) { return promise.setFailure(e); } } @Override public final Future<List<T>> resolveAll(SocketAddress address) { if (!isSupported(checkNotNull(address, "address"))) { // Address type not supported by the resolver return executor().newFailedFuture(new UnsupportedAddressTypeException()); } if (isResolved(address)) { // Resolved already; no need to perform a lookup @SuppressWarnings("unchecked") final T cast = (T) address; return executor.newSucceededFuture(Collections.singletonList(cast)); } try { @SuppressWarnings("unchecked") final T cast = (T) address; final Promise<List<T>> promise = executor().newPromise(); //具体Resolve工作委托给doResolveAll方法 doResolveAll(cast, promise); return promise; } catch (Exception e) { return executor().newFailedFuture(e); } } @Override public final Future<List<T>> resolveAll(SocketAddress address, Promise<List<T>> promise) { checkNotNull(address, "address"); checkNotNull(promise, "promise"); if (!isSupported(address)) { // Address type not supported by the resolver return promise.setFailure(new UnsupportedAddressTypeException()); } if (isResolved(address)) { // Resolved already; no need to perform a lookup @SuppressWarnings("unchecked") final T cast = (T) address; return promise.setSuccess(Collections.singletonList(cast)); } try { @SuppressWarnings("unchecked") final T cast = (T) address; //具体Resolve工作委托给doResolveAll方法 doResolveAll(cast, promise); return promise; } catch (Exception e) { return promise.setFailure(e); } } //下面几个方法待子类实现,见InetSocketAddressResolver的定义 /** * Invoked by {@link #resolve(SocketAddress)} to perform the actual name * resolution. */ protected abstract void doResolve(T unresolvedAddress, Promise<T> promise) throws Exception; /** * Invoked by {@link #resolveAll(SocketAddress)} to perform the actual name * resolution. */ protected abstract void doResolveAll(T unresolvedAddress, Promise<List<T>> promise) throws Exception; @Override public void close() { } }
//AddressResolver接口定义,这个就不说,看看就行
package io.netty.resolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.io.Closeable; import java.net.SocketAddress; import java.nio.channels.UnsupportedAddressTypeException; import java.util.List; /** * Resolves a possibility unresolved {@link SocketAddress}. */ @UnstableApi public interface AddressResolver<T extends SocketAddress> extends Closeable { /** * Returns {@code true} if and only if the specified address is supported by this resolved. */ boolean isSupported(SocketAddress address); /** * Returns {@code true} if and only if the specified address has been resolved. * * @throws UnsupportedAddressTypeException if the specified address is not supported by this resolver */ boolean isResolved(SocketAddress address); /** * Resolves the specified address. If the specified address is resolved already, this method does nothing * but returning the original address. * * @param address the address to resolve * * @return the {@link SocketAddress} as the result of the resolution */ Future<T> resolve(SocketAddress address); /** * Resolves the specified address. If the specified address is resolved already, this method does nothing * but returning the original address. * * @param address the address to resolve * @param promise the {@link Promise} which will be fulfilled when the name resolution is finished * * @return the {@link SocketAddress} as the result of the resolution */ Future<T> resolve(SocketAddress address, Promise<T> promise); /** * Resolves the specified address. If the specified address is resolved already, this method does nothing * but returning the original address. * * @param address the address to resolve * * @return the list of the {@link SocketAddress}es as the result of the resolution */ Future<List<T>> resolveAll(SocketAddress address); /** * Resolves the specified address. If the specified address is resolved already, this method does nothing * but returning the original address. * * @param address the address to resolve * @param promise the {@link Promise} which will be fulfilled when the name resolution is finished * * @return the list of the {@link SocketAddress}es as the result of the resolution */ Future<List<T>> resolveAll(SocketAddress address, Promise<List<T>> promise); /** * Closes all the resources allocated and used by this resolver. */ @Override void close(); }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1324netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2063netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2453netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1320netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1319netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1600netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1848netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1400netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2840netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2186netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2042netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1082netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1880netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1222netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1205netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 960netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1313netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2193netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1085netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1859netty 管道线定义-ChannelPipeline:htt ...
相关推荐
根据提供的文件信息,这里将对Netty源码的相关知识点进行详细解析。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器与客户端。Netty因其优秀的性能和灵活的设计而受到...
### Netty源码解析知识点概览 #### 一、Netty简介与应用场景 - **Netty**是一款由JBOSS提供的高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。 - **应用场景**:Netty广泛...
本文将深入探讨如何使用Java的Netty框架实现一个基于DTU(Data Transfer Unit)的TCP服务器,该服务器具备多端口通信和多协议解析的能力。 首先,DTU是一种专门用于远程数据传输的设备,它能够通过GPRS、3G/4G等...
3. **配置Pipeline**: 在服务器和客户端的Bootstrap中,我们需要设置Pipeline,将自定义的编码器和解码器添加进去,同时也要包含`GzipHandler`。这样,数据在编码和解码之前/之后都会被gzip处理。 4. **发送和接收...
《Netty in Action》中文版是一本深入了解和学习Netty框架的重要参考资料,它为读者提供了全面、深入的Netty技术解析。Netty是Java平台上一个高性能、异步事件驱动的网络应用框架,常用于开发高效的服务器和客户端...
"Netty高性能原理和框架架构解析" Netty 是一个广受欢迎的异步事件驱动的 Java 开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty 的主要特点是设计优雅,适用于各种传输类型的统一 API ...
Netty的`Bootstrap`类是用来初始化和配置服务器的启动参数,如绑定的IP地址和端口号、事件循环组、处理器链等。例如,我们可以通过以下代码创建一个简单的Netty服务器: ```java EventLoopGroup bossGroup = new ...
通过“Netty-API-文档中文版”,你可以详细了解到如何创建ServerBootstrap、Bootstrap、ChannelFuture、ChannelInboundHandlerAdapter等核心组件,以及如何配置和使用各种编解码器。文档还会介绍如何进行连接管理、...
《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...
小白一个,刚学习完Netty,打算做一个Demo练练手。 感谢以下项目提供的思路,向他们学习! 1, 2, 思路 1,启动: 1.1,需要一个Bootstrap作为客户端的启动类。 1.2,然后添加指定的处理器来解析包,添加管道...
《Netty权威指南第二版》源代码是一份珍贵的学习资源,由知名作者李林锋编写,专注于Java网络编程框架Netty的深入解析。这个压缩包包含的不仅是代码,更是理解和掌握Netty技术的关键。Netty是基于Java NIO(非阻塞I/...
1. 创建 Bootstrap:Bootstrap 是 Netty 中用于配置和启动服务器的核心类,我们需要配置 ServerBootstrap 并设置其 Channel 类型为 NioServerSocketChannel。 2. 绑定端口:通过调用 bind() 方法,将服务器绑定到...
在"基于Java Netty的UDP客户端声呐数据对接"项目中,我们主要关注如何利用Netty处理UDP通信,以及如何解析和封装SCANFISH-II型声呐系统的数据。 UDP(User Datagram Protocol)是一种无连接的传输层协议,它不保证...
在 `channelRead()` 方法中,你可以解析接收到的请求,然后通过 `Bootstrap` 创建一个新的连接到目标服务器,并在新的连接上注册一个 ChannelHandlerContext,以便将客户端的请求转发到目标服务器。当从目标服务器...
在Netty的初始化阶段,我们需要配置一个Bootstrap对象,设置串口参数(如波特率、校验位等),并将我们的`SerialDataHandler`和`SerialCommandSender`添加到管道(Pipeline)中。 接下来,`Bootstrap.bind()`方法...
3. **定义Netty服务器**:创建一个Netty服务器,继承自`io.netty.bootstrap.ServerBootstrap`,配置服务器的参数,如端口、通道处理器等。使用`io.netty.channel.ChannelHandlerContext`来处理接收到的网络事件。 4...
《Netty权威指南》第二版是由李林锋编著的一本深入解析Netty框架的专业书籍。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于开发高并发、低延迟的网络应用,如分布式系统、游戏服务器、云计算平台...
1. **创建Bootstrap**: 首先,我们需要创建一个`Bootstrap`实例,它是Netty中的启动配置类。通过`Bootstrap.group()`指定线程池,`Bootstrap.channel()`指定使用`UdpServerChannel`作为通道类型。 ```java ...