- 浏览: 3163 次
- 性别:
- 来自: 北京
文章分类
最新评论
基于Java NIO的即时聊天服务器雏形
前不久自己动手写了一个Android的聊天工具,跟服务器的交互还是基于HTTP方式的,在一般通讯上还算凑活,但是在即时聊天的时候就有点恶心了,客户端开启Service每隔3秒去询问服务器是否有自己的新消息(当然3秒有点太快了),在心疼性能和流量的前提下,只能自己动手写个服务器,传统的Socket是阻塞的,这样的话服务器对每个Socket都需要建立一个线程来操作,资源开销很大,而且线程多了直接会影响服务端的性能(曾经测试开了3000多个线程就不让创建了,所以并发数目也是有限制的),听说从JDK1.5就多了个New IO,灰常不错的样子,找了找相关的资料,网上竟然全都是最最最简单的一个demo,然后去CSDN发帖,基本上都是建议直接使用MINA框架的,这样一来根本达不到学习NIO的目的,而且现在的技术也太快餐了,只知道使用前辈留下的东西,知其然不知其所以然。
折腾了一个周,终于搞出来了一个雏形,相比于xmpp的xml,本人更喜欢json的简洁,为了防止客户端异常断开等,准备采用心跳检测的机制来判断用户是否在线,另外还有一种方法是学习例如Tomcat等Servlet中间件的方式,设置Session周期,定时清除过期Session。本Demo暂时实现了Session过期检测,心跳检测有空再搞,如果本例子在使用过程中有性能漏洞或者什么bug请及时通知我,谢谢
一开始把文章发博客园了,结果发现Java基本成悲剧了,还是回JavaEye吧……
废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,直接上代码:
JsonParser Json的解析类,随便封装了下,使用的最近比较火的fastjson Main 入口,不解释 Log SeekServer: 服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0 目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的 Handler: 只有一个抽象方法exec,Session将会继承它 Session: 封装了用户的请求和SelectionKey和SocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。 SessionManager: 将所有Session存放到ConcurrentHashMap,这里使用手机用户的imei做key,ConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程, 封装了一些操作Session的方法例如get,remove等 SessionProcessor 里面使用了JDK自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequest和SocketResponse(看到这里是不是有点熟悉的感觉,对没错,JavaWeb里到处都是request和response) RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandler和MessageHandler)
SocketRequest和SocketResponse 最后则是两个处理请求的Handler 还有个监测是否超时的类Looper,定期去删除Session 注意,在Task和SessionProcessor类里都有对SessionManager的sessions做遍历,文中使用的方法并不是很好,主要是效率问题,推荐使用遍历Entry的方式来获取Key和Value, 因为一直在JavaWeb上折腾,所以会的童鞋看到Request和Response会挺亲切,这个例子没有经过任何安全和性能测试,如果需要放到生产环境上得话请先自行做测试- -! 客户端请求时的数据内容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},这些约定就自己来定了1 public class JsonParser {
2
3 private static JSONObject mJson;
4
5 public synchronized static String get(String json,String key) {
6 mJson = JSON.parseObject(json);
7 return mJson.getString(key);
8 }
9 }
1 public class Main {
2
3 public static void main(String... args) {
4 new SeekServer().start();
5 }
6 }
1 public class Log {
2
3 public static void i(Object obj) {
4 System.out.println(obj);
5 }
6 public static void e(Object e) {
7 System.err.println(e);
8 }
9 }
1 public class SeekServer extends Thread{
2 private final int ACCPET_PORT = 55555;
3 private final int TIME_OUT = 1000;
4 private Selector mSelector = null;
5 private ServerSocketChannel mSocketChannel = null;
6 private ServerSocket mServerSocket = null;
7 private InetSocketAddress mAddress = null;
8
9 public SeekServer() {
10 long sign = System.currentTimeMillis();
11 try {
12 mSocketChannel = ServerSocketChannel.open();
13 if(mSocketChannel == null) {
14 System.out.println("can't open server socket channel");
15 }
16 mServerSocket = mSocketChannel.socket();
17 mAddress = new InetSocketAddress(ACCPET_PORT);
18 mServerSocket.bind(mAddress);
19 Log.i("server bind port is " + ACCPET_PORT);
20 mSelector = Selector.open();
21 mSocketChannel.configureBlocking(false);
22 SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
23 key.attach(new Acceptor());
24
25 //检测Session状态
26 Looper.getInstance().loop();
27
28 //开始处理Session
29 SessionProcessor.start();
30
31 Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!");
32 } catch (ClosedChannelException e) {
33 Log.e(e.getMessage());
34 } catch (IOException e) {
35 Log.e(e.getMessage());
36 }
37 }
38
39 public void run() {
40 Log.i("server is listening...");
41 while(!Thread.interrupted()) {
42 try {
43 if(mSelector.select(TIME_OUT) > 0) {
44 Set<SelectionKey> keys = mSelector.selectedKeys();
45 Iterator<SelectionKey> iterator = keys.iterator();
46 SelectionKey key = null;
47 while(iterator.hasNext()) {
48 key = iterator.next();
49 Handler at = (Handler) key.attachment();
50 if(at != null) {
51 at.exec();
52 }
53 iterator.remove();
54 }
55 }
56 } catch (IOException e) {
57 Log.e(e.getMessage());
58 }
59 }
60 }
61
62 class Acceptor extends Handler{
63
64 public void exec(){
65 try {
66 SocketChannel sc = mSocketChannel.accept();
67 new Session(sc, mSelector);
68 } catch (ClosedChannelException e) {
69 Log.e(e);
70 } catch (IOException e) {
71 Log.e(e);
72 }
73 }
74 }
75 }
1 public abstract class Handler {
2
3 public abstract void exec();
4 }
1 public class Session extends Handler{
2
3 private SocketChannel mChannel;
4 private SelectionKey mKey;
5 private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);
6 private Charset charset = Charset.forName("UTF-8");
7 private CharsetDecoder mDecoder = charset.newDecoder();
8 private CharsetEncoder mEncoder = charset.newEncoder();
9 private long lastPant;//最后活动时间
10 private final int TIME_OUT = 1000 * 60 * 5; //Session超时时间
11 private String key;
12
13 private String sendData = "";
14 private String receiveData = null;
15
16 public static final int READING = 0,SENDING = 1;
17 int mState = READING;
18
19 public Session(SocketChannel socket, Selector selector) throws IOException {
20 this.mChannel = socket;
21 mChannel = socket;
22 mChannel.configureBlocking(false);
23 mKey = mChannel.register(selector, 0);
24 mKey.attach(this);
25 mKey.interestOps(SelectionKey.OP_READ);
26 selector.wakeup();
27 lastPant = Calendar.getInstance().getTimeInMillis();
28 }
29
30 public String getReceiveData() {
31 return receiveData;
32 }
33
34 public void clear() {
35 receiveData = null;
36 }
37
38 public void setSendData(String sendData) {
39 mState = SENDING;
40 mKey.interestOps(SelectionKey.OP_WRITE);
41 this.sendData = sendData + "\n";
42 }
43
44 public boolean isKeekAlive() {
45 return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();
46 }
47
48 public void setAlive() {
49 lastPant = Calendar.getInstance().getTimeInMillis();
50 }
51
52 /**
53 * 注销当前Session
54 */
55 public void distroy() {
56 try {
57 mChannel.close();
58 mKey.cancel();
59 } catch (IOException e) {}
60 }
61
62 @Override
63 public synchronized void exec() {
64 try {
65 if(mState == READING) {
66 read();
67 }else if(mState == SENDING) {
68 write();
69 }
70 } catch (IOException e) {
71 SessionManager.remove(key);
72 try {
73 mChannel.close();
74 } catch (IOException e1) {
75 Log.e(e1);
76 }
77 mKey.cancel();
78 }
79 }
80
81 public void read() throws IOException{
82 mRreceiveBuffer.clear();
83 int sign = mChannel.read(mRreceiveBuffer);
84 if(sign == -1) { //客户端连接关闭
85 mChannel.close();
86 mKey.cancel();
87 }
88 if(sign > 0) {
89 mRreceiveBuffer.flip();
90 receiveData = mDecoder.decode(mRreceiveBuffer).toString();
91 setAlive();
92 setSign();
93 SessionManager.addSession(key, this);
94 }
95 }
96
97 private void setSign() {
98 //设置当前Session的Key
99 key = JsonParser.get(receiveData,"imei");
100 //检测消息类型是否为心跳包
101 // String type = jo.getString("type");
102 // if(type.equals("HEART_BEAT")) {
103 // setAlive();
104 // }
105 }
106
107
108 /**
109 * 写消息
110 */
111 public void write() {
112 try {
113 mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));
114 sendData = null;
115 mState = READING;
116 mKey.interestOps(SelectionKey.OP_READ);
117 } catch (CharacterCodingException e) {
118 e.printStackTrace();
119 } catch (IOException e) {
120 try {
121 mChannel.close();
122 } catch (IOException e1) {
123 Log.e(e1);
124 }
125 }
126 }
127 }
1 public class SessionManager {
2
3 private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
4
5 public static void addSession(String key,Session session) {
6 sessions.put(key, session);
7 }
8
9 public static Session getSession(String key) {
10 return sessions.get(key);
11 }
12
13 public static Set<String> getSessionKeys() {
14 return sessions.keySet();
15 }
16
17 public static int getSessionCount() {
18 return sessions.size();
19 }
20
21 public static void remove(String[] keys) {
22 for(String key:keys) {
23 if(sessions.containsKey(key)) {
24 sessions.get(key).distroy();
25 sessions.remove(key);
26 }
27 }
28 }
29 public static void remove(String key) {
30 if(sessions.containsKey(key)) {
31 sessions.get(key).distroy();
32 sessions.remove(key);
33 }
34 }
35 }
1 public class SessionProcessor implements Runnable{
2
3 private static Runnable processor = new SessionProcessor();
4 private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
5 public static void start() {
6 new Thread(processor).start();
7 }
8
9 @Override
10 public void run() {
11 while(true) {
12 Session tmp = null;
13 for(String key:SessionManager.getSessionKeys()) {
14 tmp = SessionManager.getSession(key);
15 //处理Session未处理的请求
16 if(tmp.getReceiveData() != null) {
17 pool.execute(new Process(tmp));
18 }
19 }
20 try {
21 Thread.sleep(10);
22 } catch (InterruptedException e) {
23 Log.e(e);
24 }
25 }
26 }
27
28 class Process implements Runnable {
29
30 private SocketRequest request;
31 private SocketResponse response;
32
33 public Process(Session session) {
34 //将Session封装成Request和Response
35 request = new SocketRequest(session);
36 response = new SocketResponse(session);
37 }
38
39 @Override
40 public void run() {
41 new RequestTransform().transfer(request, response);
42 }
43 }
44
45 }
1 public class RequestTransform {
2
3 public void transfer(SocketRequest request,SocketResponse response) {
4 String action = request.getValue("action");
5 String handlerName = request.getValue("handler");
6 //根据Session的请求类型,让不同的类方法去处理
7 try {
8 Class<?> c= Class.forName("com.seek.server.handler." + handlerName);
9 Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};
10 Method method=c.getMethod(action,arg);
11 method.invoke(c.newInstance(), new Object[]{request,response});
12 } catch (Exception e) {
13 e.printStackTrace();
14 }
15 }
16 }
1 public class SocketRequest {
2
3 private Session mSession;
4 private String mReceive;
5
6 public SocketRequest(Session session) {
7 mSession = session;
8 mReceive = session.getReceiveData();
9 mSession.clear();
10 }
11
12 public String getValue(String key) {
13 return JsonParser.get(mReceive, key);
14 }
15
16 public String getQueryString() {
17 return mReceive;
18 }
19 }
1 public class SocketResponse {
2
3 private Session mSession;
4 public SocketResponse(Session session) {
5 mSession = session;
6 }
7
8 public void write(String msg) {
9 mSession.setSendData(msg);
10 }
11 }
1 public class UserHandler {
2
3 public void login(SocketRequest request,SocketResponse response) {
4 System.out.println(request.getQueryString());
5 //TODO: 处理用户登录
6 response.write("你肯定收到消息了");
7 }
8 }
1 public class MessageHandler {
2 public void send(SocketRequest request,SocketResponse response) {
3 System.out.println(request.getQueryString());
4 //消息发送
5 String key = request.getValue("imei");
6 Session session = SessionManager.getSession(key);
7 new SocketResponse(session).write(request.getValue("sms"));
8 }
9 }
1 public class Looper extends Thread{
2 private static Looper looper = new Looper();
3 private static boolean isStart = false;
4 private final int INTERVAL = 1000 * 60 * 5;
5 private Looper(){}
6 public static Looper getInstance() {
7 return looper;
8 }
9
10 public void loop() {
11 if(!isStart) {
12 isStart = true;
13 this.start();
14 }
15 }
16
17 public void run() {
18 Task task = new Task();
19 while(true) {
20 //Session过期检测
21 task.checkState();
22 //心跳包检测
23 //task.sendAck();
24 try {
25 Thread.sleep(INTERVAL);
26 } catch (InterruptedException e) {
27 Log.e(e);
28 }
29 }
30 }
31 }
1 public class Task {
2 public void checkState() {
3 Set<String> keys = SessionManager.getSessionKeys();
4 if(keys.size() == 0) {
5 return;
6 }
7 List<String> removes = new ArrayList<String>();
8 Iterator<String> iterator = keys.iterator();
9 String key = null;
10 while(iterator.hasNext()) {
11 key = iterator.next();
12 if(!SessionManager.getSession(key).isKeekAlive()) {
13 removes.add(key);
14 }
15 }
16 if(removes.size() > 0) {
17 Log.i("sessions is time out,remove " + removes.size() + "session");
18 }
19 SessionManager.remove(removes.toArray(new String[removes.size()]));
20 }
21
22 public void sendAck() {
23 Set<String> keys = SessionManager.getSessionKeys();
24 if(keys.size() == 0) {
25 return;
26 }
27 Iterator<String> iterator = keys.iterator();
28 while(iterator.hasNext()) {
29 iterator.next();
30 //TODO 发送心跳包
31 }
32 }
33 }
代码在家里的机器上,回去补上,请各位大大指证 - -!
相关推荐
基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现...
在“基于Java NIO的网络服务器Netty生产实例.zip”压缩包中,可能包含了关于如何使用Netty构建实际生产环境中的网络服务器的相关教程或代码示例。 1. **Java NIO基础**:NIO的核心组件包括Channel(通道)、Buffer...
java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...
服务端程序基于javaNIO,客户端程序基于旧IO,读完<<javaNIO>>后,导入eclipse即可运行,支持多人在线聊天,上下线通知.PS:非GUI程序,毕竟javaSwing用的少,不懂的地方大家可以一起讨论,评论必回!
在这个“基于java NIO的socket通信demo”中,我们将探讨如何使用NIO进行服务器和客户端的Socket通信,并解决通信过程中的字符集乱码问题。 首先,我们来看`NioServer.java`。这个文件中包含了一个基于NIO的服务器端...
在Java NIO中,数据的读写都是基于缓冲区进行的,这样可以避免不必要的数据拷贝,提高I/O操作的效率。 此外,Java NIO还引入了`Pipe`和`FileChannel`等特性,使得进程间通信和文件操作也变得更加灵活。`Pipe`允许两...
JAVA NIO有两种解释:一种叫非阻塞IO(Non-blocking I/O),另一种也叫新的IO(New I/O),其实是同一个概念。它是一种同步非阻塞的I/O模型,也是...本例是使用java nio实现的简单聊天系统,界面简单,旨在学习java nio
总的来说,"基于nio的简易聊天室"项目综合运用了Java NIO的多种核心特性,通过通道、缓冲区和选择器实现了高效的网络通信,同时结合GUI提供了友好的用户体验。通过这个项目,开发者可以深入理解NIO在实际应用中的...
在这个“java nio 聊天室源码”项目中,开发者使用了NIO来构建一个聊天室应用,以实现用户之间的实时通信。 1. **Java NIO基础** - **通道(Channel)**:在NIO中,数据是通过通道进行传输的,如SocketChannel、...
在这个“java nio聊天室源码”项目中,我们可以看到如何使用NIO构建一个实时、高效的多人在线聊天应用。 首先,我们要理解Java NIO的基本概念。NIO包括以下关键组件: 1. **通道(Channels)**:通道是数据传输的...
本项目“一个基于java nio的简单的http服务器”正是基于这种技术实现的,其目的是为了展示如何使用Java NIO来构建一个简单的HTTP服务器。 HTTP服务器是互联网上应用最广泛的网络协议之一,它用于在客户端(如浏览器...
用java编写的nio通信的例子,nio是io编程的新版本,比io较流行。同时本例子是适用socket通信的。可以在此基础上,添加您的个人应用。本例子适用于:java通信的学习者,android平台通信的学习者。
java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java...
Java NIO系列教程(一) Java NIO 概述 Java NIO系列教程(二) Channel Java NIO系列教程(三) Buffer Java NIO系列教程(四) Scatter/Gather Java NIO系列教程(五) 通道之间的数据传输 Java NIO系列教程(六)...
Java NIO服务器的这种设计模式使得服务器能够有效地处理大量并发连接,尤其适用于高并发场景,如聊天服务器、游戏服务器等。然而,NIO的学习曲线相对较陡,理解和正确使用选择器、通道和缓冲区需要一定的实践。总的...
综上所述,基于Java NIO的反应器模式设计与实现,可以大幅提升网络服务器的性能,通过非阻塞IO、事件驱动、选择器等机制,高效地处理高并发的数据传输任务,并且优化了线程资源的使用,减少了线程上下文切换的时间...
Java NIO(Non-blocking Input/Output)是一种在Java中处理I/O操作的新方式,相比于传统的BIO(Blocking I/O),NIO提供了更高效的数据传输能力,尤其适合于高并发、低延迟的网络应用,如聊天服务器。在这个场景下,...
Java NIO支持多种类型的通道,包括文件通道(FileChannel)、套接字通道(SocketChannel)和服务器套接字通道(ServerSocketChannel)等。通道可以同时进行读写操作,并且可以实现异步读写。 2. **缓冲区(Buffers...
4. **多路复用器(Multiplexing)**:Java NIO的多路复用器基于操作系统提供的Select或Poll机制,如Linux下的epoll。它能有效地监控大量通道的状态,提高系统资源利用率。 5. **文件系统访问**:NIO也提供了对文件...
Netty是一个基于NIO的高性能、异步事件驱动的网络应用框架,它简化了网络编程,广泛应用于服务器端应用开发。 NIO的核心概念包括通道(Channel)、缓冲区(Buffer)和选择器(Selector)。以下是对这些核心概念的...