- 浏览: 987822 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Mina 抽象Polling连接器(AbstractPollingIoConnector):http://donald-draper.iteye.com/blog/2378978
引言:
上一盘文章我们看了抽象Polling连接器,先来回顾一下:
抽象拉取连接器内部有一个连接请求队列connectQueue,连接请求取消队列cancelQueue,Io处理器和连接线程引用connectorRef。拉取连接器构造主要初始化会话配置,IO事件执行器和IO处理器。连接操作,首先根据本地socket地址创建SocketChannel,连接远端socket地址,根据IO处理器和SocketChannel构建Io会话,将会话添加到会话关联的IO处理器中,根据SocketChannel和会话初始化sessionInitializer构建连接请求,添加到连接请求队列,最后启动连接器线程。
连接器线程首先计算选择超时时间,执行超时选择操作,注册连接请求SocketChannel连接事件到选择器;如果没有任何连接请求SocketChannel需要处理,置空连接器连接线程引用,清空连接请求队列,如果有连接请求已经连接完成,即触发SocketChannel兴趣连接事件,处理连接事件就绪的连接请求,这个过程首先调用finishConnect完成SocketChannel连接后续工作,根据Io处理器和SocketChannel创建会话,初始化会话,添加会话到会话关联的IO处理器;然后处理连接超时的连接请求,即设置连接结果超时异常,添加到连接请求到取消队列;处理取消连接的连接请求,即关闭连接请求关联的SocketChannel。
今天我们来看连接器的具体实现NioSocketConnector:
从socket连接器来看,选择器selector为内部唯一变量。
上面的构造函数和AbstractPollingIoConnector的构造基本相同。
再来看socket连接器的其他操作:
来看创建socket通道
从创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式。
从上面一个方法来看注册操作,即注册socket通道的连接事件到选择器。
选择操作和唤醒操作直接委托给内部选择器。
再来看剩下的操作,很简单,看一下明白,不再讲
总结:
socket连接器NioSocketConnector内部关联一个选择器;初始化连接器,为打开一个选择器;连接远端地址,即委托SocketChannel的连接操作;创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式;注册操作,即注册socket通道的连接事件到选择器。选择操作和唤醒操作直接委托给内部选择器。销毁socket连接器,即关闭选择器。
引言:
上一盘文章我们看了抽象Polling连接器,先来回顾一下:
抽象拉取连接器内部有一个连接请求队列connectQueue,连接请求取消队列cancelQueue,Io处理器和连接线程引用connectorRef。拉取连接器构造主要初始化会话配置,IO事件执行器和IO处理器。连接操作,首先根据本地socket地址创建SocketChannel,连接远端socket地址,根据IO处理器和SocketChannel构建Io会话,将会话添加到会话关联的IO处理器中,根据SocketChannel和会话初始化sessionInitializer构建连接请求,添加到连接请求队列,最后启动连接器线程。
连接器线程首先计算选择超时时间,执行超时选择操作,注册连接请求SocketChannel连接事件到选择器;如果没有任何连接请求SocketChannel需要处理,置空连接器连接线程引用,清空连接请求队列,如果有连接请求已经连接完成,即触发SocketChannel兴趣连接事件,处理连接事件就绪的连接请求,这个过程首先调用finishConnect完成SocketChannel连接后续工作,根据Io处理器和SocketChannel创建会话,初始化会话,添加会话到会话关联的IO处理器;然后处理连接超时的连接请求,即设置连接结果超时异常,添加到连接请求到取消队列;处理取消连接的连接请求,即关闭连接请求关联的SocketChannel。
今天我们来看连接器的具体实现NioSocketConnector:
/** * {@link IoConnector} for socket transport (TCP/IP). * * @author [url=http://mina.apache.org]Apache MINA Project[/url] */ public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements SocketConnector { private volatile Selector selector;//选择器 }
从socket连接器来看,选择器selector为内部唯一变量。
/** * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model). 默认构造多线程模型 */ public NioSocketConnector() { super(new DefaultSocketSessionConfig(), NioProcessor.class); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); } /** * Constructor for {@link NioSocketConnector} with default configuration, and * given number of {@link NioProcessor} for multithreading I/O operations * @param processorCount the number of processor to create and place in a * {@link SimpleIoProcessorPool} 多线程模型,指定Io处理器线程池大小 */ public NioSocketConnector(int processorCount) { super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); } /** * Constructor for {@link NioSocketConnector} with default configuration but a * specific {@link IoProcessor}, useful for sharing the same processor over multiple * {@link IoService} of the same type. 多Io服务共享Io处理器模式构造 * @param processor the processor to use for managing I/O events */ public NioSocketConnector(IoProcessor<NioSession> processor) { super(new DefaultSocketSessionConfig(), processor); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); } /** * Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling * connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing * the same processor and executor over multiple {@link IoService} of the same type. * @param executor the executor for connection * @param processor the processor for I/O operations 与上一个方法不同的,添加了Io事件执行器参数executor */ public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) { super(new DefaultSocketSessionConfig(), executor, processor); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); } /** * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in * thread pool executor to manage the given number of processor instances. The processor class must have * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a * no-arg constructor. * 使用内部IO处理器线程池SimpleIoProcessorPool,管理Io处理器实例 * @param processorClass the processor class. * @param processorCount the number of processors to instantiate. * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider) * @since 2.0.0-M4 */ public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) { super(new DefaultSocketSessionConfig(), processorClass, processorCount); } /** * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in * thread pool executor to manage the default number of processor instances. The processor class must have * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a * no-arg constructor. The default number of instances is equal to the number of processor cores * in the system, plus one. * 多线程IO处理器模式,IO处理器实例默认为系统核心处理器数量+1 * @param processorClass the processor class. * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider) * @since 2.0.0-M4 */ public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) { super(new DefaultSocketSessionConfig(), processorClass); }
上面的构造函数和AbstractPollingIoConnector的构造基本相同。
再来看socket连接器的其他操作:
/** * {@inheritDoc} 初始化,打开一个选择器 */ @Override protected void init() throws Exception { this.selector = Selector.open(); } /** * {@inheritDoc} 销毁socket连接器,关闭选择器 */ @Override protected void destroy() throws Exception { if (selector != null) { selector.close(); } } /** * {@inheritDoc} 连接远端地址,即委托SocketChannel的连接操作 */ @Override protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception { return handle.connect(remoteAddress); } /** * {@inheritDoc} 获取socket通道的连接请求 */ @Override protected ConnectionRequest getConnectionRequest(SocketChannel handle) { //获取通道的选择key SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid())) { return null; } //返回选择key附加物 return (ConnectionRequest) key.attachment(); } /** * {@inheritDoc} 关闭socket通道 */ @Override protected void close(SocketChannel handle) throws Exception { //取消通道关联的选择key,关闭通道 SelectionKey key = handle.keyFor(selector); if (key != null) { key.cancel(); } handle.close(); } /** * {@inheritDoc} 完成连接 */ @Override protected boolean finishConnect(SocketChannel handle) throws Exception { //已完成连接,则取消通道关联的选择key(连接操作) if (handle.finishConnect()) { SelectionKey key = handle.keyFor(selector); if (key != null) { key.cancel(); } return true; } return false; }
来看创建socket通道
/** * {@inheritDoc} */ @Override protected SocketChannel newHandle(SocketAddress localAddress) throws Exception { //打开一个socket通道,配置接收缓存区size SocketChannel ch = SocketChannel.open(); int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize(); if (receiveBufferSize > 65535) { ch.socket().setReceiveBufferSize(receiveBufferSize); } if (localAddress != null) { try { //绑定地址 ch.socket().bind(localAddress); } catch (IOException ioe) { // Add some info regarding the address we try to bind to the // message String newMessage = "Error while binding on " + localAddress + "\n" + "original message : " + ioe.getMessage(); Exception e = new IOException(newMessage); e.initCause(ioe.getCause()); // Preemptively close the channel ch.close(); throw e; } } //配置通道为非阻塞模式 ch.configureBlocking(false); return ch; }
从创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式。
/** * {@inheritDoc} 注册socket通道的连接事件到选择器 */ @Override protected void register(SocketChannel handle, ConnectionRequest request) throws Exception { handle.register(selector, SelectionKey.OP_CONNECT, request); } /** * {@inheritDoc} 创建socket会话 */ @Override protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) { return new NioSocketSession(this, processor, handle); } /** * {@inheritDoc} 选择操作 */ @Override protected int select(int timeout) throws Exception { return selector.select(timeout); } /** * {@inheritDoc} 唤醒操作 */ @Override protected void wakeup() { selector.wakeup(); }
从上面一个方法来看注册操作,即注册socket通道的连接事件到选择器。
选择操作和唤醒操作直接委托给内部选择器。
再来看剩下的操作,很简单,看一下明白,不再讲
/** * {@inheritDoc} */ @Override public TransportMetadata getTransportMetadata() { return NioSocketSession.METADATA; } /** * {@inheritDoc} */ @Override public SocketSessionConfig getSessionConfig() { return (SocketSessionConfig) sessionConfig; } /** * {@inheritDoc} */ @Override public InetSocketAddress getDefaultRemoteAddress() { return (InetSocketAddress) super.getDefaultRemoteAddress(); } /** * {@inheritDoc} */ @Override public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) { super.setDefaultRemoteAddress(defaultRemoteAddress); } /** * {@inheritDoc} 连接操作事件就绪的Socket通道 */ @Override protected Iterator<SocketChannel> selectedHandles() { return new SocketChannelIterator(selector.selectedKeys()); } /** * {@inheritDoc} 选择器所有关联通道 */ @Override protected Iterator<SocketChannel> allHandles() { return new SocketChannelIterator(selector.keys()); } private static class SocketChannelIterator implements Iterator<SocketChannel> { private final Iterator<SelectionKey> i; private SocketChannelIterator(Collection<SelectionKey> selectedKeys) { this.i = selectedKeys.iterator(); } /** * {@inheritDoc} */ @Override public boolean hasNext() { return i.hasNext(); } /** * {@inheritDoc} */ @Override public SocketChannel next() { SelectionKey key = i.next(); return (SocketChannel) key.channel(); } /** * {@inheritDoc} */ @Override public void remove() { i.remove(); } }
总结:
socket连接器NioSocketConnector内部关联一个选择器;初始化连接器,为打开一个选择器;连接远端地址,即委托SocketChannel的连接操作;创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式;注册操作,即注册socket通道的连接事件到选择器。选择操作和唤醒操作直接委托给内部选择器。销毁socket连接器,即关闭选择器。
发表评论
-
Mina 报文连接器(NioDatagramConnector)
2017-06-14 08:46 1435Mina 抽象Polling连接器(A ... -
Mina 报文监听器NioDatagramAcceptor二(发送会话消息等)
2017-06-13 16:01 1564Mina 报文监听器NioDatagramAcceptor一( ... -
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器)
2017-06-13 09:51 2599Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina 报文通信简单示例
2017-06-12 09:01 2611MINA TCP简单通信实例:http://donald-dr ... -
Mina 抽象Polling连接器(AbstractPollingIoConnector)
2017-06-11 21:29 1025Mina 连接器接口定义及抽象实现(IoConnector ) ... -
Mina 连接器接口定义及抽象实现(IoConnector )
2017-06-11 13:46 1846Mina IoService接口定义及抽象实现:http:// ... -
Mina socket监听器(NioSocketAcceptor)
2017-06-09 08:44 3446Mina IoService接口定义及抽象实现:http:// ... -
Mina 抽象polling监听器
2017-06-08 22:32 793Mina Io监听器接口定义及抽象实现:http://dona ... -
Mina Io监听器接口定义及抽象实现
2017-06-07 13:02 1363Mina IoService接口定义及抽象实现:http:// ... -
Mina IoService接口定义及抽象实现
2017-06-06 23:44 1210Mina IoHandler接口定义:http://donal ... -
Mina Nio会话(Socket,DataGram)
2017-06-06 12:53 1228Mina Socket会话配置:http://donald-d ... -
Mina 抽象Io会话
2017-06-05 22:45 1031Mina Io会话接口定义:http://donald-dra ... -
Mina Io会话接口定义
2017-06-04 23:15 1186Mina Nio处理器:http://donald-drape ... -
Mina Nio处理器
2017-06-04 22:19 754Mina Io处理器抽象实现:http://donald-dr ... -
Mina Io处理器抽象实现
2017-06-03 23:52 1164Mina 过滤链抽象实现:http://donald-drap ... -
Mina IoHandler接口定义
2017-06-01 21:30 1759Mina 过滤链抽象实现:http://donald-drap ... -
MINA 多路复用协议编解码器工厂二(多路复用协议解码器)
2017-06-01 12:52 2293MINA 多路复用协议编解码器工厂一(多路复用协议编码器): ... -
MINA 多路复用协议编解码器工厂一(多路复用协议编码器)
2017-05-31 22:22 1883MINA 多路分离解码器实例:http://donald-dr ... -
Mina 累计协议解码器
2017-05-31 00:09 1243MINA 编解码器实例:http://donald-drape ... -
Mina 协议编解码过滤器三(会话write与消息接收过滤)
2017-05-28 07:22 1767Mina 协议编解码过滤器一(协议编解码工厂、协议编码器): ...
相关推荐
客户端则通过Socket连接到服务端,进行数据交互。 3. **服务端实现**:在MINA中,服务端通常由`NioServerSocketConnector`来创建,它监听指定的端口,等待客户端的连接。服务端接收到连接请求后,会创建一个`...
为了与HPsocket进行对接,需要在Mina中设置相应的过滤器和处理器。 1. **创建IoConnector**:首先,创建一个`NioSocketConnector`实例并设置连接超时时间。 ```java IoConnector connector = new ...
import org.apache.mina.transport.socket.nio.NioSocketConnector; public class MinaClient { public static void main(String[] args) throws Exception { IoConnector connector = new NioSocketConnector();...
import org.apache.mina.transport.socket.nio.NioSocketConnector; public class MinaClient { public static void main(String[] args) throws Exception { IoConnector connector = new NioSocketConnector();...
描述中的"mina简单客户端服务端示例"意味着我们将探讨如何使用Mina来创建一个基本的通信环境,包括一个能够发起连接的客户端和一个接收连接的服务端。这两个组件是任何网络通信系统的基础,通过它们,数据可以在不同...
import org.apache.mina.transport.socket.nio.NioSocketConnector; public class EchoClient { private IoConnector connector = new NioSocketConnector(); public void start(String host, int port) throws ...
import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.apache.mina.core.service.DefaultExecutorSelector; import org.apache.mina.core.session.IdleStatus; import org.apache.mina....
import org.apache.mina.transport.socket.nio.NioSocketConnector; public class Demo1Client { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Demo1Client.class); ...
在Mina的非阻塞I/O(NIO)模式下,存在三种主要的工作线程,它们在NIO Socket中起到不同的作用: 1. **Acceptor Thread**:这个线程负责接收来自客户端的连接请求,并将这些连接转发给I/O Processor线程处理。...
// 创建连接器 connector.getFilterChain().addLast("codec", // 添加编码过滤器 new ProtocolCodecFilter(new TextLineCodecFactory())); connector.setHandler(new ClientHandler()); // 设置处理器 ...