首先我们先来看SocketServer这个的实现类,这个类虽然实现的很简单,但是包含了nio请求的最基本的过程。
这个类实现的比较经典
我们先来看下局部变量:
// 从类的命名来看是一个RequestHandler的工程类 private final RequestHandlerFactory handlerFactory; // 可以接受的最大请求数 private final int maxRequestSize ; // 任务处理者的数量 private final Processor[] processors ; // 封装的Acceptor private final Acceptor acceptor ; // SocketServer的统计类 private final SocketServerStats stats ; // Server的配置类 private final ServerConfig serverConfig ;接着我们来看下构造函数和startup函数
public SocketServer(RequestHandlerFactory handlerFactory, // ServerConfig serverConfig) { super(); this.serverConfig = serverConfig; this.handlerFactory = handlerFactory; this.maxRequestSize = serverConfig.getMaxSocketRequestSize(); // 初始化处理器 this.processors = new Processor[serverConfig.getNumThreads()]; this.stats = new SocketServerStats(1000L * 1000L * 1000L * serverConfig.getMonitoringPeriodSecs()); // 初始化响应器 this.acceptor = new Acceptor(serverConfig.getPort(), // processors, // serverConfig.getSocketSendBuffer(), // serverConfig.getSocketReceiveBuffer()); } /** * Start the socket server and waiting for finished * * @throws InterruptedException */ public void startup() throws InterruptedException { final int maxCacheConnectionPerThread = serverConfig.getMaxConnections() / processors.length ; logger.info("start " + processors. length + " Processor threads"); // processor和acceptor全部都起来 for (int i = 0; i < processors. length; i++) { processors[i] = new Processor(handlerFactory , stats , maxRequestSize , maxCacheConnectionPerThread); Utils. newThread("jafka-processor-" + i, processors[i], false ).start(); } Utils. newThread("jafka-acceptor", acceptor, false).start(); acceptor.awaitStartup(); }
我们接下来来看Acceptor的实现类。
在看之前,我们先来了解下AbstractServerThread这个类,这个是服务器端所有线程的父类,里面包含一个Selector。
Acceptor类是一个线程类,我们还是先来看下它的局部变量。
// 绑定的端口 private int port ; // 一批任务处理器 private Processor[] processors; // send和reciveBuffer的大小 private int sendBufferSize ; private int receiveBufferSize ;既然是线程类,我们接着来看run方法。
1. run方法的开头进行了socket的绑定。
// 初始化ServerSocket,以非阻塞的方式启动,并经ACCEPT事件注册到selector, // 这样就可以处理accept事件了 final ServerSocketChannel serverChannel; try { serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking( false); serverChannel.socket().bind( new InetSocketAddress(port)); serverChannel.register(getSelector(), SelectionKey.OP_ACCEPT ); } catch (IOException e) { logger.error("listener on port " + port + " failed."); throw new RuntimeException(e); }2. 第二部分是socket绑定完毕后,通知server启动成功
3. 从selector里面select出来一个请求,然后选择一个processor进行处理,我们简单来看代码
// int currentProcessor = 0; while(isRunning()) { int ready = -1; try { ready = getSelector().select(500L); } catch (IOException e) { throw new IllegalStateException(e); } if(ready<=0)continue; Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator(); while(iter.hasNext() && isRunning()) try { SelectionKey key = iter.next(); iter.remove(); // 如果是accetp事件,则选择一个processor进行处理 if(key.isAcceptable()) { accept(key,processors[currentProcessor]); } else { throw new IllegalStateException("Unrecognized key state for acceptor thread."); } // 选择下一个processor currentProcessor = (currentProcessor + 1) % processors .length ; } catch (Throwable t) { logger.error("Error in acceptor",t); } }
4. 当server退出后关闭server和selector
最后我们来看下accept私有函数的实现:
其实非常简单,将请求交个processor进行处理
private void accept(SelectionKey key, Processor processor) throws IOException{ // 获取到ServerSocketChannel类 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize ); // 配置刚才accept来的socket,进行设置后,交由processor进行处理 SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking( false); socketChannel.socket().setTcpNoDelay( true); socketChannel.socket().setSendBufferSize(sendBufferSize ); // processor.accept(socketChannel); }接下来我们接着来看Processor类的实现。
Processor我们先来看下accept函数
public void accept(SocketChannel socketChannel) { newConnections.add(socketChannel); getSelector().wakeup(); }这个地方为什么要调用wakeup,我们目前还没有想到,还在继续思考
下来我们来看最主要的run函数:
public void run() { // 启动成功 startupComplete(); while (isRunning()) { try { // setup any new connections that have been queued up // 将新进来请求的read事件注册到channel上 configureNewConnections(); final Selector selector = getSelector(); int ready = selector.select(500); if (ready <= 0) continue; Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext() && isRunning()) { SelectionKey key = null; try { key = iter.next(); iter.remove(); // 如果通道可读,就触发读操作 if (key.isReadable()) { read(key); // 如果可写,就触发写操作 } else if (key.isWritable()) { write(key); } else if (!key.isValid()) { close(key); } else { throw new IllegalStateException("Unrecognized key state for processor thread."); } // 如果发送读操作的异常 } catch (EOFException eofe) { Socket socket = channelFor(key).socket(); logger.debug(format( "connection closed by %s:%d." , socket.getInetAddress(), socket.getPort())); close(key); // 如果发生InvalidRequestException异常 } catch (InvalidRequestException ire) { Socket socket = channelFor(key).socket(); logger.info(format( "Closing socket connection to %s:%d due to invalid request: %s", socket.getInetAddress(), socket.getPort(), ire.getMessage())); close(key); } catch (Throwable t) { Socket socket = channelFor(key).socket(); final String msg = "Closing socket for %s:%d becaulse of error"; if (logger .isDebugEnabled()) { logger.error(format(msg, socket.getInetAddress(), socket.getPort()), t); } else { logger.error(format(msg, socket.getInetAddress(), socket.getPort())); } close(key); } } } catch (IOException e) { logger.error(e.getMessage(), e); } } // logger.info("Closing selector while shutting down"); closeSelector(); shutdownComplete(); }1. 通知processor启动成功,这个里面我们也看到,程序中都散落这一些CountDownLatch
2. 将新来的请求的read事件注册到channel上
3. 从selector里面选择read和write事件,然后分别调用read和write函数
我们重点来看下read函数的实现:
private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = channelFor(key); Receive request = null; if (key.attachment() == null) { request = new BoundedByteBufferReceive(maxRequestSize ); key.attach(request); } else { request = (Receive) key.attachment(); } int read = request.readFrom(socketChannel); stats.recordBytesRead(read); if (read < 0) { close(key); // 如果读完毕了,进行处理,将response对象赋值给attach,然后发送响应 } else if (request.complete()) { Send maybeResponse = handle(key, request); key.attach( null); // if there is a response, send it, otherwise do nothing if (maybeResponse != null) { key.attach(maybeResponse); key.interestOps(SelectionKey.OP_WRITE ); } } else { // 如果没读完,则继续注册读事件,因为之前已经将read删除了 // more reading to be done key.interestOps(SelectionKey. OP_READ); getSelector().wakeup(); if (logger .isTraceEnabled()) { logger.trace("reading request not been done. " + request); } } }我们接着来看handle函数
/** * Handle a completed request producing an optional response */ private Send handle(SelectionKey key, Receive request) { // requestbuffer的第一个short就是RequestType enum的value final short requestTypeId = request.buffer().getShort(); final RequestKeys requestType = RequestKeys.valueOf(requestTypeId); if (requestLogger .isTraceEnabled()) { if (requestType == null) { throw new InvalidRequestException("No mapping found for handler id " + requestTypeId); } String logFormat = "Handling %s request from %s"; requestLogger.trace(format(logFormat, requestType, channelFor(key).socket().getRemoteSocketAddress())); } // 从ReqeustHandler工厂里面取得RequestHandler RequestHandler handlerMapping = requesthandlerFactory.mapping(requestType, request); if (handlerMapping == null) { throw new InvalidRequestException("No handler found for request"); } // 调用handler将结果返回 long start = System.nanoTime(); Send maybeSend = handlerMapping.handler(requestType, request); stats.recordRequest(requestType, System.nanoTime() - start); return maybeSend; }我们也看下write函数,堪称比较经典的实现
private void write(SelectionKey key) throws IOException { // 因为我们注册read事件的时候,有一个attachment里面是send Send response = (Send) key.attachment(); SocketChannel socketChannel = channelFor(key); // 将response写入socket流 int written = response.writeTo(socketChannel); stats.recordBytesWritten(written); // 如果写完注册读事件,如果没写完,则继续注册写事件 if (response.complete()) { key.attach( null); key.interestOps(SelectionKey. OP_READ); } else { key.interestOps(SelectionKey. OP_WRITE); getSelector().wakeup(); } }
相关推荐
#A快速分布式消息传递系统(MQ) Jafka mq是从克隆的分布式发布-订阅消息系统。 因此它具有以下功能: 具有O(1)磁盘结构的持久消息传递即使在存储大量TB消息的情况下也能提供恒定的时间性能。 高吞吐量:即使使用...
jafka, 一种快速简单的分布式发布订阅消息系统( mq ) #A 快速分布式邮件系统( MQ ) Jafka是一个分布式发布订阅消息系统,从 Apache 克隆。因此,它具有以下特性:具有 O(1) 磁盘结构的持久消息传递,即使有大量的...
kafka 的 wiki 是徆丌错的学习文档: https://cwiki.apache.org/confluence/display/KAFKA/Index 接下来就是一系列文章,文章都是循序渐迕的方式带你了览 kafka: 关亍 kafka 的基本知识,分布式的基础:《分布式消息...
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在 Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下 进行消息持久化;高...
由于AMQP是一个网络协议,参与方可以位于不同的网络位置,增加了系统的灵活性和可靠性。 RabbitMQ的特性包括: 1. **易用性**:RabbitMQ提供了直观的管理界面,方便用户监控和管理队列、交换器和绑定。 2. **扩展性...
常见的开源技术有Fluentd、Flume、Apollo、Chukwa、Sqoop、DataX、MySQLStreamer、Canal、Scribe、ZeroMQ、ActiveMQ、Logstash、RabbitMQ、Jafka、Storm、Samza、Heron、Spark、Flink等。这些技术都有其优缺点,需要...
zeromq的作者之一用C语言重写的通信框架, OpenMQ Open-MQ 是一个开源的消息中间件,类似IBM的 WebSphere MQ(MQSeries),采用 C++ 和 Qt 库编写的,支持Windows、Unix 以及 Mac OS 平台,支持 JMS。 ZeroMQ ZeroMQ...
海尔电器公司在实时计算平台的建设中选择了多种开源技术,包括Fluentd、Flume、Apollo、Chukwa、Sqoop、DataX、MySQL Streamer、Canal、Scribe、ZeroMQ、ActiveMQ、Logstash、RabbitMQ、Jafka、Storm、Samza、Heron...
- **Jafka**:基于Kafka早期版本发展而来,但非官方项目,活跃度较低。 #### Kafka的数据文件分段与索引 - **数据文件分段**:Kafka通过将数据文件分段来提高查询效率。每个Partition被分成多个Segment File,每个...