`
aids198311
  • 浏览: 59596 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

仿照jetty的nio原理例子2(7月10号改进)

    博客分类:
  • nio
nio 
阅读更多
改动点:
1.分成了4个class文件,看起来清晰一点
2.把请求封装成附件,放到socketChannel里面
3.selector.accept()方法删除,取而代之的是selector.selectNow(),并且放到处理注册信息之后。增加了休息策略,selector.select(400),避免不停的循环,占用cpu%的情况。
4.每个请求到来之后,直接分出一个线程去处理。

7月10日改进点:

1.增加了自动删除超时的连接功能
2.key.interestOps操作优化,放到selector线程里面去做
3.request取消了runnable接口






SimpleJettyServerPlus 这个是server
package com.daizuan.jetty.plus;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author daizuan
 */
public class SimpleJettyServerPlus {

    private final ConcurrentLinkedQueue<Object> _changes_con = new ConcurrentLinkedQueue<Object>();
    private ServerSocketChannel                 channel;
    private Selector                            selector;
    private int                                 port;
    private Runnable                            connectionHandler;
    private Runnable                            requestHandler;

    public SimpleJettyServerPlus(int port) throws IOException{
        this.port = port;
        this.channel = ServerSocketChannel.open();
        this.selector = Selector.open();
    }

    public void setConnectionHandler(ConnectionHandler connectionHandler) {
        this.connectionHandler = connectionHandler;
    }

    public void setRequestHandler(RequestHandler requestHandler) {
        this.requestHandler = requestHandler;
    }

    public void listen() throws IOException { // 服务器开始监听端口,提供服务
        channel.socket().bind(new InetSocketAddress(port)); // 将scoket榜定在制定的端口上
        channel.configureBlocking(true);
        startConnectionHandler();
        startRequestHandler();
    }

    private void startRequestHandler() {
        if (requestHandler == null) {
            requestHandler = new RequestHandler(_changes_con, selector);
        }
        startThread(requestHandler);
    }

    private void startConnectionHandler() {
        if (connectionHandler == null) {
            connectionHandler = new ConnectionHandler(_changes_con, channel, selector);
        }
        startThread(connectionHandler);
    }

    private void startThread(Runnable run) {
        new Thread(run).start();
    }

    public static void main(String[] args) throws IOException {
        // System.out.println("server start.........");
        SimpleJettyServerPlus server = new SimpleJettyServerPlus(6789);
        server.listen(); // 服务器开始监听端口,提供服务
    }

}



ConnectionHandler 这个是提交连接事件的
package com.daizuan.jetty.plus;

import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConnectionHandler implements Runnable {

    private ConcurrentLinkedQueue<Object> _changes_con;
    private ServerSocketChannel           channel;
    private Selector                      selector;

    public ConnectionHandler(ConcurrentLinkedQueue<Object> _changes_con, ServerSocketChannel channel, Selector selector){
        this._changes_con = _changes_con;
        this.channel = channel;
        this.selector = selector;
    }

    @Override
    public void run() {
        System.out.println("ConnectionHander:connection Hander start......");
        while (true) {
            // 分发连接事件
            SocketChannel sc = null;
            try {
                // 这里阻塞监听连接事件
                sc = channel.accept();
                sc.configureBlocking(false);
                _changes_con.add(sc);
                // 释放selector的锁,以便接收注册信息
                selector.wakeup();
                System.out.println("listener:a client in![" + sc.socket().getRemoteSocketAddress() + "]");
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }

}


Request这个是附件,放再socketChannel里的附件,包含了请求信息
package com.daizuan.jetty.plus;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Request {

    private final ConcurrentLinkedQueue<Object> _changes_req       = new ConcurrentLinkedQueue<Object>();
    private SelectionKey                        key;
    Selector                                    selector;
    private static int                          DEFAULT_BUFFERSIZE = 16;
    private static String                       DEFAULT_CHARSET    = "GBK";
    private static final String                 FORMAT             = "yyyy-MM-dd HH:mm:ss";
    private static final String                 EXIT               = "exit";
    private static final int                    MAX_ZERO_COUNT     = 16;
    private static final long                   MAX_IDLE_TIME      = 60000;
    private String                              id;
    private boolean                             isDispatched       = false;
    private Runnable                            _handle            = new Runnable() {

                                                                       @Override
                                                                       public void run() {
                                                                           handle();

                                                                       }
                                                                   };

    private volatile long                       dispatchedTime     = 0;

    private SocketChannel                       sc;

    private RequestHandler                      reqHandler;

    private int                                 interestOps;

    public void setReqHandler(RequestHandler reqHandler) {
        this.reqHandler = reqHandler;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Request(Selector selector, SelectionKey key){
        this.key = key;
        this.selector = selector;
        this.sc = (SocketChannel) key.channel();
        dispatchedTime = System.currentTimeMillis();
    }

    public void addTask(Object o) {
        _changes_req.add(o);
    }

    public void process() {
        synchronized (this) {
            if (isDispatched) {
                System.out.println("I am dispatched ,so return..");
                key.interestOps(0);
                return;
            }
            interestOps = key.interestOps();
            dispatchedTime = System.currentTimeMillis();
            isDispatched = true;
            new Thread(_handle).start();

        }
    }

    private void handle() {
        try {
            // 解析出请求
            String request = parseRequest();

            System.out.println("read [" + request + "] from " + id);
            if (request == null || needToCanncel(request)) {
                System.out.println(id + "I am die!");
                close();
                return;
            }
            // 向客户端写一些信息
            write("[" + getTime() + "] " + request + "\n");
            unDispatched();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void unDispatched() {
        synchronized (this) {
            isDispatched = false;
            updateKey();
        }

    }

    /**
     * 重新设置key,并不做实际更新,仅仅设置,把实际的更新操作放到selector线程里面去做
     */
    private void updateKey() {
        synchronized (this) {
            interestOps = !isDispatched ? SelectionKey.OP_READ : 0;
            System.out.println("interestOps:" + interestOps + ",SelectionKey.OP_READ:" + SelectionKey.OP_READ);
            if (key.interestOps() == interestOps) {

                return;
            }
            reqHandler.addChange(this);
            selector.wakeup();
        }

    }

    /**
     * 更新key
     */
    public void doUpdateKey() {
        synchronized (this) {
            if (key != null && key.isValid() && sc.isOpen()) {
                key.interestOps(interestOps);
                System.out.println("interestOps-->" + interestOps);
            } else {
                close();
            }
        }
    }

    public void timeOut() {
        long now = System.currentTimeMillis();
        if (now - dispatchedTime > MAX_IDLE_TIME) {
            close();
        }
    }

    private String getTime() {
        DateFormat df = new SimpleDateFormat(FORMAT);
        return df.format(new Date());

    }

    private boolean needToCanncel(String request) {
        return EXIT.equals(request);
    }

    private String parseRequest() throws IOException {
        ByteBuffer bbuffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE);
        int count = 0;
        int off = 0;
        byte[] data = new byte[DEFAULT_BUFFERSIZE * 10];
        bbuffer.clear();
        int zeroCount = 0;
        // 循环一次性吧所有数据读完,否则可能buffer满了,数据未读完
        System.out.println(11111111);
        while ((count = sc.read(bbuffer)) != -1) {
            if (count == 0 && ++zeroCount > MAX_ZERO_COUNT) {
                System.out.println("read zero count:" + zeroCount + ",break");
                break;
            }
            bbuffer.flip();
            if ((off + count) > data.length) {
                data = grow(data, DEFAULT_BUFFERSIZE * 10);
            }
            byte[] buf = bbuffer.array();
            System.arraycopy(buf, 0, data, off, count);
            off += count;
        }

        if (count < 0) {
            return null;
        }

        byte[] req = new byte[off];
        System.arraycopy(data, 0, req, 0, off);
        return new String(req, DEFAULT_CHARSET).trim();

    }

    private void close() {
        if (sc != null && sc.socket() != null) {
            try {
                if (!sc.socket().isClosed()) {
                    sc.socket().close();
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        if (sc != null) {
            try {
                sc.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if (key != null) {
            key.cancel();
        }

        reqHandler.removeReq(this);

    }

    private void write(String str) {
        try {
            sc.write(ByteBuffer.wrap(str.getBytes(DEFAULT_CHARSET)));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 数组扩容
     * 
     * @param src byte[] 源数组数据
     * @param size int 扩容的增加量
     * @return byte[] 扩容后的数组
     */
    private byte[] grow(byte[] src, int size) {
        byte[] tmp = new byte[src.length + size];
        System.arraycopy(src, 0, tmp, 0, src.length);
        return tmp;
    }
}



RequestHandlerl用来提交请求信息
package com.daizuan.jetty.plus;

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

public class RequestHandler implements Runnable {

    private ConcurrentLinkedQueue<Object>          _changes_con;
    private Selector                               selector;
    private static final long                      MAX_IDLE = 400;
    private long                                   kickTime = 0;
    private ConcurrentMap<Request, RequestHandler> requests = new ConcurrentHashMap<Request, RequestHandler>();

    public RequestHandler(ConcurrentLinkedQueue<Object> _changes_con, Selector selector){
        this._changes_con = _changes_con;
        this.selector = selector;
    }

    @Override
    public void run() {
        System.out.println("RequestHander:Request Hander start......");
        while (true) {
            try {
                int changes = _changes_con.size();
                Object change = null;
                while (changes-- > 0 && (change = _changes_con.poll()) != null) {
                    if (change instanceof SocketChannel) {
                        processCon(change);
                    } else if (change instanceof Request) {
                        ((Request) change).doUpdateKey();
                    } else {
                        System.out.println("what's this??");
                    }
                }
                int count = selector.selectNow();
                if (count == 0) selector.select(MAX_IDLE);
                Set<SelectionKey> keys = selector.selectedKeys();
                // 处理请求信息
                for (SelectionKey key : keys) {
                    System.out.println("find some keys " + key);
                    processReq(key);
                }
                selector.selectedKeys().clear();
                long now = System.currentTimeMillis();
                if (now - kickTime > MAX_IDLE) {
                    kickTime = now;
                    kick();
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    private void processCon(Object change) {
        try {
            if (change instanceof SocketChannel) {
                SocketChannel sc = (SocketChannel) change;
                String id = "[" + sc.socket().getRemoteSocketAddress() + "] ";
                SelectionKey key = sc.register(selector, SelectionKey.OP_READ, null);
                Request req = new Request(selector, key);
                req.setReqHandler(this);
                req.setId(id);
                key.attach(req);
                requests.put(req, this);
                req.process();
                System.out.println("a client connected!" + id);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 定时的清除一些超时的连接
     */
    private void kick() {
        new Thread(new Runnable() {

            @Override
            public void run() {
                for (Map.Entry<Request, RequestHandler> entry : requests.entrySet()) {
                    entry.getKey().timeOut();
                }

            }
        }

        ).start();
    }

    public void removeReq(Request req) {
        System.out.println("remvoe:" + req);
        requests.remove(req);
    }

    public void addChange(Request req) {
        System.out.println("add:" + req);
        this._changes_con.add(req);
    }

    private void processReq(SelectionKey key) {
        if (!key.isValid()) {
            key.cancel();
            Request req = (Request) key.attachment();
            if (req != null) req.doUpdateKey();
            return;
        }
        Request req = (Request) key.attachment();
        req.process();
    }
}

分享到:
评论
2 楼 悲伤逆流成河 2013-01-02  
我也研究过,写了个socks5代理服务器,感觉真的nio真的一般般,真的好麻烦,一个版本只用了3个线程,担心读取不及时。另外一个版本用了线程池读写数据,那真的还不如用bio呢。现在思考中,是不是我的代码有问题,不会使用nio,还是nio本身就有问题,这也是这么就以来叫好不叫坐的原因(java里的nio的api太少了,另外根本没办法自己写出一个selector)??
1 楼 zhhzhfya 2012-03-19  
你好,我用一个IE访问
ConnectionHandler.java

// 这里阻塞监听连接事件
sc = channel.accept();
这里进行2次accept,我感觉应该一次吧
会出现下面的日志:
ConnectionHander:connection Hander start......
RequestHander:Request Hander start......
listener:a client in![/127.0.0.1:7882]
listener:a client in![/127.0.0.1:7883]

请帮忙解释下,谢谢

相关推荐

    jetty整合springmvc例子

    【标题】:“Jetty整合SpringMVC例子” 在Java Web开发中,Jetty是一个轻量级、高性能的HTTP服务器和Servlet容器,而SpringMVC是Spring框架的一部分,用于构建MVC模式的Web应用。将Jetty与SpringMVC整合可以实现...

    Jetty工作原理

    Jetty是一个轻量级、高性能且灵活的Servlet引擎,它在IT行业中被广泛用于构建Web应用程序和服务器。其设计理念强调简单性和可扩展性,通过Handler数据模型实现这一目标。Handler是Jetty架构的核心,它是一种可扩展的...

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式

    Jetty、Tomcat和Mina都是Java领域中著名的Web服务器和应用服务器,它们在NIO架构上有着相似的设计模式。本文将从这三个框架中提炼出NIO构架网络服务器的经典模式,并逐一解析它们的核心机制。 首先,Jetty的NIO实现...

    jfinal-jetty+idea例子

    标题“jfinal-jetty+idea例子”揭示了一个基于Java开发的项目实例,它结合了JFinal和Jetty两个关键组件,并在IntelliJ IDEA(简称IDEA)环境中运行。这个项目可能是为了演示如何在IDEA中配置和运行一个使用JFinal...

    gradle的jetty插件使用例子

    接着,我们添加了Jetty Runner插件,并指定了版本号。然后,在`dependencies`块中,我们通常会引入Servlet API等必要的库。最后,在`jettyRunWar`任务中,我们设置了Jetty服务器监听的HTTP端口(8080)以及停止...

    jetty 例子, 就一个demo 还有jar

    通过分析这个文件,你可以更深入地理解Jetty的工作原理和使用方式。 总之,Jetty作为一个轻量级的Web服务器,为开发者提供了一个快速开发和部署Java Web应用的平台。它易于学习和使用,尤其适合小型项目和原型开发...

    maven+jetty +ssh 项目例子

    【标题】"maven+jetty +ssh 项目例子"是一个综合性的开发示例,它展示了如何使用Maven构建工具、Jetty服务器以及SSH(Spring Security)框架来开发和部署一个Java Web应用。这个项目旨在帮助开发者理解这些技术的...

    Android-I-Jetty服务器部署例子代码

    通过这个例子,你已经了解了如何在Android中部署和运行Jetty服务器,以及如何加载示例Web应用程序。结合`example-webapps`目录中的实际文件,你可以进一步实践和调试,以熟悉Jetty在Android环境中的应用。记住,实践...

    jetty6.1.6-2

    4. **线程模型**:Jetty采用高效的线程模型,如NIO(非阻塞I/O)或EPOLL(在Linux上),以提高并发处理能力,这对于高流量的Web应用至关重要。 5. **WebSocket支持**:如果lib目录包含websocket相关的JAR,例如...

    jetty 8及依赖包

    这个压缩包包含Jetty 8版本的实现及其依赖库,是学习和理解Jetty工作原理,尤其是NIO(非阻塞I/O)和Servlet容器实现的宝贵资源。 Jetty 8在设计时特别强调了性能和可扩展性,它使用了Java NIO(New I/O)API来处理...

    jetty axis2.war

    **Jetty Axis2.war详解** Jetty是一个轻量级且高效的开源Java Web服务器和Servlet容器,它允许开发者快速地部署和管理Web应用程序。而Axis2则是Apache软件基金会的一个项目,提供了一个强大的Web服务引擎,用于实现...

    jetty-4.2.24

    3. **高并发**:Jetty采用NIO(非阻塞I/O)模型,能够处理大量并发连接,从而提高性能。 4. **模块化**:Jetty的组件设计为可插拔,用户可以根据需要选择和配置必要的模块。 5. **WebSocket支持**:Jetty很早就...

    jetty-6.1.26.zip

    Jetty是一款轻量级、高性能的Java Web服务器和Servlet容器,与Tomcat相似,它为开发和...尽管现代开发可能更倾向于使用更新的版本,但对于了解Jetty的历史和原理,以及在某些场景下部署旧项目,这个版本仍有其价值。

    jetty包2(lib目录)

    这个目录包含了Jetty自身的核心组件和其他依赖的第三方库,它们对于理解Jetty的工作原理和如何配置环境至关重要。 1. **Jetty核心组件**: `lib`目录下的jar文件包括了Jetty的主要模块,如Jetty HTTP服务器、...

    基于jetty8 websocket的例子

    在本文中,我们将深入探讨如何使用Jetty 8实现WebSocket技术来构建一个实时通信的聊天工具。WebSocket协议是一种在客户端和服务器之间建立长连接的协议,它为双向通信提供了低延迟、高效的解决方案,特别适合实时...

    在嵌入式jetty环境下运行struts2Annotation项目

    在嵌入式Jetty环境下运行Struts2 Annotation项目是一个常见的任务,特别是在开发和测试阶段,因为这种方式能够快速启动服务,而无需依赖大型服务器容器。本文将深入探讨如何配置和执行这个过程,以及涉及的关键技术...

    jetty安装的JAR(其他相关的jetty axis2的jar请看此博客相关文档)

    这是Jetty的一个发行版,版本号为9.2.26,发布于2018年8月6日。这个压缩包包含了运行Jetty所需的所有核心组件和库,包括以下几个关键部分: 1. **bin** 目录:包含了启动和管理Jetty服务器的脚本,如`start.jar`和...

    jetty,tomcat原理

    **Jetty与Tomcat原理详解** Jetty和Tomcat是两个广泛应用的Java Web服务器和Servlet容器,它们在处理HTTP请求、执行Java Servlets以及管理Web应用程序方面起着关键作用。了解这两个容器的工作原理对于开发者来说至...

    Jetty for Mac java demo 用java写的Mac环境下 jetty小例子

    自己用jetty写的java小例子,在mac 环境下,具体的解说在:https://www.cnblogs.com/aspirant/p/9445542.html

    jetty7.6.10服务器

    Jetty 7.6.10 是一个轻量级、高性能的Java Web服务器和Servlet容器。这个版本在Jetty的历史中扮演着重要的角色,因为它包含了多个改进和修复,旨在提高性能、稳定性和安全性。让我们深入了解一下Jetty 7.6.10的主要...

Global site tag (gtag.js) - Google Analytics