- 浏览: 59596 次
- 性别:
- 来自: 杭州
最新评论
-
name327:
迟到三年多的回答,大概都不需要了,可以给其他人看,我的回答只代 ...
仿照jetty的nio原理写了个例子 -
顾惜朝:
...
ibatis如何自动获取自定义的handler -
悲伤逆流成河:
我也研究过,写了个socks5代理服务器,感觉真的nio真的一 ...
仿照jetty的nio原理例子2(7月10号改进) -
diyunpeng:
直接用JarInputStream读取Jar内文件不可以么?
...
写了个脚本查找项目里面重复的class文件,避免冲突 -
diyunpeng:
JdbcOdbcDriver class loaded
re ...
关于DriverManager与驱动
改动点:
1.分成了4个class文件,看起来清晰一点
2.把请求封装成附件,放到socketChannel里面
3.selector.accept()方法删除,取而代之的是selector.selectNow(),并且放到处理注册信息之后。增加了休息策略,selector.select(400),避免不停的循环,占用cpu%的情况。
4.每个请求到来之后,直接分出一个线程去处理。
7月10日改进点:
1.增加了自动删除超时的连接功能
2.key.interestOps操作优化,放到selector线程里面去做
3.request取消了runnable接口
SimpleJettyServerPlus 这个是server
ConnectionHandler 这个是提交连接事件的
Request这个是附件,放再socketChannel里的附件,包含了请求信息
RequestHandlerl用来提交请求信息
1.分成了4个class文件,看起来清晰一点
2.把请求封装成附件,放到socketChannel里面
3.selector.accept()方法删除,取而代之的是selector.selectNow(),并且放到处理注册信息之后。增加了休息策略,selector.select(400),避免不停的循环,占用cpu%的情况。
4.每个请求到来之后,直接分出一个线程去处理。
7月10日改进点:
1.增加了自动删除超时的连接功能
2.key.interestOps操作优化,放到selector线程里面去做
3.request取消了runnable接口
SimpleJettyServerPlus 这个是server
package com.daizuan.jetty.plus; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author daizuan */ public class SimpleJettyServerPlus { private final ConcurrentLinkedQueue<Object> _changes_con = new ConcurrentLinkedQueue<Object>(); private ServerSocketChannel channel; private Selector selector; private int port; private Runnable connectionHandler; private Runnable requestHandler; public SimpleJettyServerPlus(int port) throws IOException{ this.port = port; this.channel = ServerSocketChannel.open(); this.selector = Selector.open(); } public void setConnectionHandler(ConnectionHandler connectionHandler) { this.connectionHandler = connectionHandler; } public void setRequestHandler(RequestHandler requestHandler) { this.requestHandler = requestHandler; } public void listen() throws IOException { // 服务器开始监听端口,提供服务 channel.socket().bind(new InetSocketAddress(port)); // 将scoket榜定在制定的端口上 channel.configureBlocking(true); startConnectionHandler(); startRequestHandler(); } private void startRequestHandler() { if (requestHandler == null) { requestHandler = new RequestHandler(_changes_con, selector); } startThread(requestHandler); } private void startConnectionHandler() { if (connectionHandler == null) { connectionHandler = new ConnectionHandler(_changes_con, channel, selector); } startThread(connectionHandler); } private void startThread(Runnable run) { new Thread(run).start(); } public static void main(String[] args) throws IOException { // System.out.println("server start........."); SimpleJettyServerPlus server = new SimpleJettyServerPlus(6789); server.listen(); // 服务器开始监听端口,提供服务 } }
ConnectionHandler 这个是提交连接事件的
package com.daizuan.jetty.plus; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.ConcurrentLinkedQueue; public class ConnectionHandler implements Runnable { private ConcurrentLinkedQueue<Object> _changes_con; private ServerSocketChannel channel; private Selector selector; public ConnectionHandler(ConcurrentLinkedQueue<Object> _changes_con, ServerSocketChannel channel, Selector selector){ this._changes_con = _changes_con; this.channel = channel; this.selector = selector; } @Override public void run() { System.out.println("ConnectionHander:connection Hander start......"); while (true) { // 分发连接事件 SocketChannel sc = null; try { // 这里阻塞监听连接事件 sc = channel.accept(); sc.configureBlocking(false); _changes_con.add(sc); // 释放selector的锁,以便接收注册信息 selector.wakeup(); System.out.println("listener:a client in![" + sc.socket().getRemoteSocketAddress() + "]"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
Request这个是附件,放再socketChannel里的附件,包含了请求信息
package com.daizuan.jetty.plus; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ConcurrentLinkedQueue; public class Request { private final ConcurrentLinkedQueue<Object> _changes_req = new ConcurrentLinkedQueue<Object>(); private SelectionKey key; Selector selector; private static int DEFAULT_BUFFERSIZE = 16; private static String DEFAULT_CHARSET = "GBK"; private static final String FORMAT = "yyyy-MM-dd HH:mm:ss"; private static final String EXIT = "exit"; private static final int MAX_ZERO_COUNT = 16; private static final long MAX_IDLE_TIME = 60000; private String id; private boolean isDispatched = false; private Runnable _handle = new Runnable() { @Override public void run() { handle(); } }; private volatile long dispatchedTime = 0; private SocketChannel sc; private RequestHandler reqHandler; private int interestOps; public void setReqHandler(RequestHandler reqHandler) { this.reqHandler = reqHandler; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Request(Selector selector, SelectionKey key){ this.key = key; this.selector = selector; this.sc = (SocketChannel) key.channel(); dispatchedTime = System.currentTimeMillis(); } public void addTask(Object o) { _changes_req.add(o); } public void process() { synchronized (this) { if (isDispatched) { System.out.println("I am dispatched ,so return.."); key.interestOps(0); return; } interestOps = key.interestOps(); dispatchedTime = System.currentTimeMillis(); isDispatched = true; new Thread(_handle).start(); } } private void handle() { try { // 解析出请求 String request = parseRequest(); System.out.println("read [" + request + "] from " + id); if (request == null || needToCanncel(request)) { System.out.println(id + "I am die!"); close(); return; } // 向客户端写一些信息 write("[" + getTime() + "] " + request + "\n"); unDispatched(); } catch (Exception e) { e.printStackTrace(); } } private void unDispatched() { synchronized (this) { isDispatched = false; updateKey(); } } /** * 重新设置key,并不做实际更新,仅仅设置,把实际的更新操作放到selector线程里面去做 */ private void updateKey() { synchronized (this) { interestOps = !isDispatched ? SelectionKey.OP_READ : 0; System.out.println("interestOps:" + interestOps + ",SelectionKey.OP_READ:" + SelectionKey.OP_READ); if (key.interestOps() == interestOps) { return; } reqHandler.addChange(this); selector.wakeup(); } } /** * 更新key */ public void doUpdateKey() { synchronized (this) { if (key != null && key.isValid() && sc.isOpen()) { key.interestOps(interestOps); System.out.println("interestOps-->" + interestOps); } else { close(); } } } public void timeOut() { long now = System.currentTimeMillis(); if (now - dispatchedTime > MAX_IDLE_TIME) { close(); } } private String getTime() { DateFormat df = new SimpleDateFormat(FORMAT); return df.format(new Date()); } private boolean needToCanncel(String request) { return EXIT.equals(request); } private String parseRequest() throws IOException { ByteBuffer bbuffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE); int count = 0; int off = 0; byte[] data = new byte[DEFAULT_BUFFERSIZE * 10]; bbuffer.clear(); int zeroCount = 0; // 循环一次性吧所有数据读完,否则可能buffer满了,数据未读完 System.out.println(11111111); while ((count = sc.read(bbuffer)) != -1) { if (count == 0 && ++zeroCount > MAX_ZERO_COUNT) { System.out.println("read zero count:" + zeroCount + ",break"); break; } bbuffer.flip(); if ((off + count) > data.length) { data = grow(data, DEFAULT_BUFFERSIZE * 10); } byte[] buf = bbuffer.array(); System.arraycopy(buf, 0, data, off, count); off += count; } if (count < 0) { return null; } byte[] req = new byte[off]; System.arraycopy(data, 0, req, 0, off); return new String(req, DEFAULT_CHARSET).trim(); } private void close() { if (sc != null && sc.socket() != null) { try { if (!sc.socket().isClosed()) { sc.socket().close(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (sc != null) { try { sc.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (key != null) { key.cancel(); } reqHandler.removeReq(this); } private void write(String str) { try { sc.write(ByteBuffer.wrap(str.getBytes(DEFAULT_CHARSET))); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 数组扩容 * * @param src byte[] 源数组数据 * @param size int 扩容的增加量 * @return byte[] 扩容后的数组 */ private byte[] grow(byte[] src, int size) { byte[] tmp = new byte[src.length + size]; System.arraycopy(src, 0, tmp, 0, src.length); return tmp; } }
RequestHandlerl用来提交请求信息
package com.daizuan.jetty.plus; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; public class RequestHandler implements Runnable { private ConcurrentLinkedQueue<Object> _changes_con; private Selector selector; private static final long MAX_IDLE = 400; private long kickTime = 0; private ConcurrentMap<Request, RequestHandler> requests = new ConcurrentHashMap<Request, RequestHandler>(); public RequestHandler(ConcurrentLinkedQueue<Object> _changes_con, Selector selector){ this._changes_con = _changes_con; this.selector = selector; } @Override public void run() { System.out.println("RequestHander:Request Hander start......"); while (true) { try { int changes = _changes_con.size(); Object change = null; while (changes-- > 0 && (change = _changes_con.poll()) != null) { if (change instanceof SocketChannel) { processCon(change); } else if (change instanceof Request) { ((Request) change).doUpdateKey(); } else { System.out.println("what's this??"); } } int count = selector.selectNow(); if (count == 0) selector.select(MAX_IDLE); Set<SelectionKey> keys = selector.selectedKeys(); // 处理请求信息 for (SelectionKey key : keys) { System.out.println("find some keys " + key); processReq(key); } selector.selectedKeys().clear(); long now = System.currentTimeMillis(); if (now - kickTime > MAX_IDLE) { kickTime = now; kick(); } } catch (Exception e) { e.printStackTrace(); } } } private void processCon(Object change) { try { if (change instanceof SocketChannel) { SocketChannel sc = (SocketChannel) change; String id = "[" + sc.socket().getRemoteSocketAddress() + "] "; SelectionKey key = sc.register(selector, SelectionKey.OP_READ, null); Request req = new Request(selector, key); req.setReqHandler(this); req.setId(id); key.attach(req); requests.put(req, this); req.process(); System.out.println("a client connected!" + id); } } catch (Exception e) { e.printStackTrace(); } } /** * 定时的清除一些超时的连接 */ private void kick() { new Thread(new Runnable() { @Override public void run() { for (Map.Entry<Request, RequestHandler> entry : requests.entrySet()) { entry.getKey().timeOut(); } } } ).start(); } public void removeReq(Request req) { System.out.println("remvoe:" + req); requests.remove(req); } public void addChange(Request req) { System.out.println("add:" + req); this._changes_con.add(req); } private void processReq(SelectionKey key) { if (!key.isValid()) { key.cancel(); Request req = (Request) key.attachment(); if (req != null) req.doUpdateKey(); return; } Request req = (Request) key.attachment(); req.process(); } }
评论
2 楼
悲伤逆流成河
2013-01-02
我也研究过,写了个socks5代理服务器,感觉真的nio真的一般般,真的好麻烦,一个版本只用了3个线程,担心读取不及时。另外一个版本用了线程池读写数据,那真的还不如用bio呢。现在思考中,是不是我的代码有问题,不会使用nio,还是nio本身就有问题,这也是这么就以来叫好不叫坐的原因(java里的nio的api太少了,另外根本没办法自己写出一个selector)??
1 楼
zhhzhfya
2012-03-19
你好,我用一个IE访问
ConnectionHandler.java
的
// 这里阻塞监听连接事件
sc = channel.accept();
这里进行2次accept,我感觉应该一次吧
会出现下面的日志:
ConnectionHander:connection Hander start......
RequestHander:Request Hander start......
listener:a client in![/127.0.0.1:7882]
listener:a client in![/127.0.0.1:7883]
请帮忙解释下,谢谢
ConnectionHandler.java
的
// 这里阻塞监听连接事件
sc = channel.accept();
这里进行2次accept,我感觉应该一次吧
会出现下面的日志:
ConnectionHander:connection Hander start......
RequestHander:Request Hander start......
listener:a client in![/127.0.0.1:7882]
listener:a client in![/127.0.0.1:7883]
请帮忙解释下,谢谢
相关推荐
【标题】:“Jetty整合SpringMVC例子” 在Java Web开发中,Jetty是一个轻量级、高性能的HTTP服务器和Servlet容器,而SpringMVC是Spring框架的一部分,用于构建MVC模式的Web应用。将Jetty与SpringMVC整合可以实现...
Jetty是一个轻量级、高性能且灵活的Servlet引擎,它在IT行业中被广泛用于构建Web应用程序和服务器。其设计理念强调简单性和可扩展性,通过Handler数据模型实现这一目标。Handler是Jetty架构的核心,它是一种可扩展的...
Jetty、Tomcat和Mina都是Java领域中著名的Web服务器和应用服务器,它们在NIO架构上有着相似的设计模式。本文将从这三个框架中提炼出NIO构架网络服务器的经典模式,并逐一解析它们的核心机制。 首先,Jetty的NIO实现...
标题“jfinal-jetty+idea例子”揭示了一个基于Java开发的项目实例,它结合了JFinal和Jetty两个关键组件,并在IntelliJ IDEA(简称IDEA)环境中运行。这个项目可能是为了演示如何在IDEA中配置和运行一个使用JFinal...
接着,我们添加了Jetty Runner插件,并指定了版本号。然后,在`dependencies`块中,我们通常会引入Servlet API等必要的库。最后,在`jettyRunWar`任务中,我们设置了Jetty服务器监听的HTTP端口(8080)以及停止...
通过分析这个文件,你可以更深入地理解Jetty的工作原理和使用方式。 总之,Jetty作为一个轻量级的Web服务器,为开发者提供了一个快速开发和部署Java Web应用的平台。它易于学习和使用,尤其适合小型项目和原型开发...
【标题】"maven+jetty +ssh 项目例子"是一个综合性的开发示例,它展示了如何使用Maven构建工具、Jetty服务器以及SSH(Spring Security)框架来开发和部署一个Java Web应用。这个项目旨在帮助开发者理解这些技术的...
通过这个例子,你已经了解了如何在Android中部署和运行Jetty服务器,以及如何加载示例Web应用程序。结合`example-webapps`目录中的实际文件,你可以进一步实践和调试,以熟悉Jetty在Android环境中的应用。记住,实践...
4. **线程模型**:Jetty采用高效的线程模型,如NIO(非阻塞I/O)或EPOLL(在Linux上),以提高并发处理能力,这对于高流量的Web应用至关重要。 5. **WebSocket支持**:如果lib目录包含websocket相关的JAR,例如...
这个压缩包包含Jetty 8版本的实现及其依赖库,是学习和理解Jetty工作原理,尤其是NIO(非阻塞I/O)和Servlet容器实现的宝贵资源。 Jetty 8在设计时特别强调了性能和可扩展性,它使用了Java NIO(New I/O)API来处理...
**Jetty Axis2.war详解** Jetty是一个轻量级且高效的开源Java Web服务器和Servlet容器,它允许开发者快速地部署和管理Web应用程序。而Axis2则是Apache软件基金会的一个项目,提供了一个强大的Web服务引擎,用于实现...
3. **高并发**:Jetty采用NIO(非阻塞I/O)模型,能够处理大量并发连接,从而提高性能。 4. **模块化**:Jetty的组件设计为可插拔,用户可以根据需要选择和配置必要的模块。 5. **WebSocket支持**:Jetty很早就...
Jetty是一款轻量级、高性能的Java Web服务器和Servlet容器,与Tomcat相似,它为开发和...尽管现代开发可能更倾向于使用更新的版本,但对于了解Jetty的历史和原理,以及在某些场景下部署旧项目,这个版本仍有其价值。
这个目录包含了Jetty自身的核心组件和其他依赖的第三方库,它们对于理解Jetty的工作原理和如何配置环境至关重要。 1. **Jetty核心组件**: `lib`目录下的jar文件包括了Jetty的主要模块,如Jetty HTTP服务器、...
在本文中,我们将深入探讨如何使用Jetty 8实现WebSocket技术来构建一个实时通信的聊天工具。WebSocket协议是一种在客户端和服务器之间建立长连接的协议,它为双向通信提供了低延迟、高效的解决方案,特别适合实时...
在嵌入式Jetty环境下运行Struts2 Annotation项目是一个常见的任务,特别是在开发和测试阶段,因为这种方式能够快速启动服务,而无需依赖大型服务器容器。本文将深入探讨如何配置和执行这个过程,以及涉及的关键技术...
这是Jetty的一个发行版,版本号为9.2.26,发布于2018年8月6日。这个压缩包包含了运行Jetty所需的所有核心组件和库,包括以下几个关键部分: 1. **bin** 目录:包含了启动和管理Jetty服务器的脚本,如`start.jar`和...
**Jetty与Tomcat原理详解** Jetty和Tomcat是两个广泛应用的Java Web服务器和Servlet容器,它们在处理HTTP请求、执行Java Servlets以及管理Web应用程序方面起着关键作用。了解这两个容器的工作原理对于开发者来说至...
自己用jetty写的java小例子,在mac 环境下,具体的解说在:https://www.cnblogs.com/aspirant/p/9445542.html
Jetty 7.6.10 是一个轻量级、高性能的Java Web服务器和Servlet容器。这个版本在Jetty的历史中扮演着重要的角色,因为它包含了多个改进和修复,旨在提高性能、稳定性和安全性。让我们深入了解一下Jetty 7.6.10的主要...