锁定老帖子 主题:基于Java NIO的即时聊天服务器雏形
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-08-12
最后修改:2011-08-12
前不久自己动手写了一个Android的聊天工具,跟服务器的交互还是基于HTTP方式的,在一般通讯上还算凑活,但是在即时聊天的时候就有点恶心了,客户端开启Service每隔3秒去询问服务器是否有自己的新消息(当然3秒有点太快了),在心疼性能和流量的前提下,只能自己动手写个服务器,传统的Socket是阻塞的,这样的话服务器对每个Socket都需要建立一个线程来操作,资源开销很大,而且线程多了直接会影响服务端的性能(曾经测试开了3000多个线程就不让创建了,所以并发数目也是有限制的),听说从JDK1.5就多了个New IO,灰常不错的样子,找了找相关的资料,网上竟然全都是最最最简单的一个demo,然后去CSDN发帖,基本上都是建议直接使用MINA框架的,这样一来根本达不到学习NIO的目的,而且现在的技术也太快餐了,只知道使用前辈留下的东西,知其然不知其所以然。 一开始把文章发博客园了,结果发现Java基本成悲剧了,还是回JavaEye吧…… 废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,代码传送门在这里: http://www.cnblogs.com/freedom-elf/archive/2011/08/11/2135015.html 发了两次内容都截断了,只能先给个链接,我悄悄研究下发帖
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2012-07-12
帮你转:
前不久自己动手写了一个Android的聊天工具,跟服务器的交互还是基于HTTP方式的,在一般通讯上还算凑活,但是在即时聊天的时候就有点恶心了,客户端开启Service每隔3秒去询问服务器是否有自己的新消息(当然3秒有点太快了),在心疼性能和流量的前提下,只能自己动手写个服务器,传统的Socket是阻塞的,这样的话服务器对每个Socket都需要建立一个线程来操作,资源开销很大,而且线程多了直接会影响服务端的性能(曾经测试开了3000多个线程就不让创建了,所以并发数目也是有限制的),听说从JDK1.5就多了个New IO,灰常不错的样子,找了找相关的资料,网上竟然全都是最最最简单的一个demo,然后去CSDN发帖,基本上都是建议直接使用MINA框架的,这样一来根本达不到学习NIO的目的,而且现在的技术也太快餐了,只知道使用前辈留下的东西,知其然不知其所以然。 折腾了一个周,终于搞出来了一个雏形,相比于xmpp的xml,本人更喜欢json的简洁,为了防止客户端异常断开等,准备采用心跳检测的机制来判断用户是否在线,另外还有一种方法是学习例如Tomcat等Servlet中间件的方式,设置Session周期,定时清除过期Session。本Demo暂时实现了Session过期检测,心跳检测有空再搞,如果本例子在使用过程中有性能漏洞或者什么bug请及时通知我,谢谢 废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,直接上代码: JsonParser Json的解析类,随便封装了下,使用的最近比较火的fastjson 1 public class JsonParser {2 3 private static JSONObject mJson;4 5 public synchronized static String get(String json,String key) {6 mJson = JSON.parseObject(json);7 return mJson.getString(key);8 }9 } Main 入口,不解释 1 public class Main {2 3 public static void main(String... args) {4 new SeekServer().start();5 }6 } Log 1 public class Log {2 3 public static void i(Object obj) {4 System.out.println(obj);5 }6 public static void e(Object e) {7 System.err.println(e);8 }9 } |
|
返回顶楼 | |
发表时间:2012-07-12
SeekServer:
服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0 目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的 public class SeekServer extends Thread{ private final int ACCPET_PORT = 55555; private final int TIME_OUT = 1000; private Selector mSelector = null; private ServerSocketChannel mSocketChannel = null; private ServerSocket mServerSocket = null; private InetSocketAddress mAddress = null; public SeekServer() { long sign = System.currentTimeMillis(); try { mSocketChannel = ServerSocketChannel.open(); if(mSocketChannel == null) { System.out.println("can't open server socket channel"); } mServerSocket = mSocketChannel.socket(); mAddress = new InetSocketAddress(ACCPET_PORT); mServerSocket.bind(mAddress); Log.i("server bind port is " + ACCPET_PORT); mSelector = Selector.open(); mSocketChannel.configureBlocking(false); SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); //检测Session状态 Looper.getInstance().loop(); //开始处理Session SessionProcessor.start(); Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!"); } catch (ClosedChannelException e) { Log.e(e.getMessage()); } catch (IOException e) { Log.e(e.getMessage()); } } public void run() { Log.i("server is listening..."); while(!Thread.interrupted()) { try { if(mSelector.select(TIME_OUT) > 0) { Set<SelectionKey> keys = mSelector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); SelectionKey key = null; while(iterator.hasNext()) { key = iterator.next(); Handler at = (Handler) key.attachment(); if(at != null) { at.exec(); } iterator.remove(); } } } catch (IOException e) { Log.e(e.getMessage()); } } } class Acceptor extends Handler{ public void exec(){ try { SocketChannel sc = mSocketChannel.accept(); new Session(sc, mSelector); } catch (ClosedChannelException e) { Log.e(e); } catch (IOException e) { Log.e(e); } } } } |
|
返回顶楼 | |
发表时间:2012-07-12
Handler:
只有一个抽象方法exec,Session将会继承它 public abstract class Handler { public abstract void exec(); } |
|
返回顶楼 | |
发表时间:2012-07-12
Session:
封装了用户的请求和SelectionKey和SocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。 public class Session extends Handler{ private SocketChannel mChannel; private SelectionKey mKey; private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240); private Charset charset = Charset.forName("UTF-8"); private CharsetDecoder mDecoder = charset.newDecoder(); private CharsetEncoder mEncoder = charset.newEncoder(); private long lastPant;//最后活动时间 private final int TIME_OUT = 1000 * 60 * 5; //Session超时时间 private String key; private String sendData = ""; private String receiveData = null; public static final int READING = 0,SENDING = 1; int mState = READING; public Session(SocketChannel socket, Selector selector) throws IOException { this.mChannel = socket; mChannel = socket; mChannel.configureBlocking(false); mKey = mChannel.register(selector, 0); mKey.attach(this); mKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); lastPant = Calendar.getInstance().getTimeInMillis(); } public String getReceiveData() { return receiveData; } public void clear() { receiveData = null; } public void setSendData(String sendData) { mState = SENDING; mKey.interestOps(SelectionKey.OP_WRITE); this.sendData = sendData + "\n"; } public boolean isKeekAlive() { return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis(); } public void setAlive() { lastPant = Calendar.getInstance().getTimeInMillis(); } /** * 注销当前Session */ public void distroy() { try { mChannel.close(); mKey.cancel(); } catch (IOException e) {} } @Override public synchronized void exec() { try { if(mState == READING) { read(); }else if(mState == SENDING) { write(); } } catch (IOException e) { SessionManager.remove(key); try { mChannel.close(); } catch (IOException e1) { Log.e(e1); } mKey.cancel(); } } public void read() throws IOException{ mRreceiveBuffer.clear(); int sign = mChannel.read(mRreceiveBuffer); if(sign == -1) { //客户端连接关闭 mChannel.close(); mKey.cancel(); } if(sign > 0) { mRreceiveBuffer.flip(); receiveData = mDecoder.decode(mRreceiveBuffer).toString(); setAlive(); setSign(); SessionManager.addSession(key, this); } } private void setSign() { //设置当前Session的Key key = JsonParser.get(receiveData,"imei"); //检测消息类型是否为心跳包 // String type = jo.getString("type"); // if(type.equals("HEART_BEAT")) { // setAlive(); // } } /** * 写消息 */ public void write() { try { mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData))); sendData = null; mState = READING; mKey.interestOps(SelectionKey.OP_READ); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { try { mChannel.close(); } catch (IOException e1) { Log.e(e1); } } } } |
|
返回顶楼 | |
发表时间:2012-07-12
SessionManager:
将所有Session存放到ConcurrentHashMap,这里使用手机用户的imei做key,ConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程, 封装了一些操作Session的方法例如get,remove等 public class SessionManager { private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>(); public static void addSession(String key,Session session) { sessions.put(key, session); } public static Session getSession(String key) { return sessions.get(key); } public static Set<String> getSessionKeys() { return sessions.keySet(); } public static int getSessionCount() { return sessions.size(); } public static void remove(String[] keys) { for(String key:keys) { if(sessions.containsKey(key)) { sessions.get(key).distroy(); sessions.remove(key); } } } public static void remove(String key) { if(sessions.containsKey(key)) { sessions.get(key).distroy(); sessions.remove(key); } } } |
|
返回顶楼 | |
发表时间:2012-07-12
SessionProcessor
里面使用了JDK自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequest和SocketResponse(看到这里是不是有点熟悉的感觉,对没错,JavaWeb里到处都是request和response) public class SessionProcessor implements Runnable{ private static Runnable processor = new SessionProcessor(); private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy()); public static void start() { new Thread(processor).start(); } @Override public void run() { while(true) { Session tmp = null; for(String key:SessionManager.getSessionKeys()) { tmp = SessionManager.getSession(key); //处理Session未处理的请求 if(tmp.getReceiveData() != null) { pool.execute(new Process(tmp)); } } try { Thread.sleep(10); } catch (InterruptedException e) { Log.e(e); } } } class Process implements Runnable { private SocketRequest request; private SocketResponse response; public Process(Session session) { //将Session封装成Request和Response request = new SocketRequest(session); response = new SocketResponse(session); } @Override public void run() { new RequestTransform().transfer(request, response); } } } |
|
返回顶楼 | |
发表时间:2012-07-12
RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandler和MessageHandler)
public class RequestTransform { public void transfer(SocketRequest request,SocketResponse response) { String action = request.getValue("action"); String handlerName = request.getValue("handler"); //根据Session的请求类型,让不同的类方法去处理 try { Class<?> c= Class.forName("com.seek.server.handler." + handlerName); Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class}; Method method=c.getMethod(action,arg); method.invoke(c.newInstance(), new Object[]{request,response}); } catch (Exception e) { e.printStackTrace(); } } } |
|
返回顶楼 | |
发表时间:2012-07-12
public class SocketRequest { private Session mSession; private String mReceive; public SocketRequest(Session session) { mSession = session; mReceive = session.getReceiveData(); mSession.clear(); } public String getValue(String key) { return JsonParser.get(mReceive, key); } public String getQueryString() { return mReceive; } } public class SocketResponse { private Session mSession; public SocketResponse(Session session) { mSession = session; } public void write(String msg) { mSession.setSendData(msg); } } 最后则是两个处理请求的Handler public class UserHandler { public void login(SocketRequest request,SocketResponse response) { System.out.println(request.getQueryString()); //TODO: 处理用户登录 response.write("你肯定收到消息了"); } } public class MessageHandler { public void send(SocketRequest request,SocketResponse response) { System.out.println(request.getQueryString()); //消息发送 String key = request.getValue("imei"); Session session = SessionManager.getSession(key); new SocketResponse(session).write(request.getValue("sms")); } } 还有个监测是否超时的类Looper,定期去删除Session public class Looper extends Thread{ private static Looper looper = new Looper(); private static boolean isStart = false; private final int INTERVAL = 1000 * 60 * 5; private Looper(){} public static Looper getInstance() { return looper; } public void loop() { if(!isStart) { isStart = true; this.start(); } } public void run() { Task task = new Task(); while(true) { //Session过期检测 task.checkState(); //心跳包检测 //task.sendAck(); try { Thread.sleep(INTERVAL); } catch (InterruptedException e) { Log.e(e); } } } } |
|
返回顶楼 | |
发表时间:2012-07-12
public class Task { public void checkState() { Set<String> keys = SessionManager.getSessionKeys(); if(keys.size() == 0) { return; } List<String> removes = new ArrayList<String>(); Iterator<String> iterator = keys.iterator(); String key = null; while(iterator.hasNext()) { key = iterator.next(); if(!SessionManager.getSession(key).isKeekAlive()) { removes.add(key); } } if(removes.size() > 0) { Log.i("sessions is time out,remove " + removes.size() + "session"); } SessionManager.remove(removes.toArray(new String[removes.size()])); } public void sendAck() { Set<String> keys = SessionManager.getSessionKeys(); if(keys.size() == 0) { return; } Iterator<String> iterator = keys.iterator(); while(iterator.hasNext()) { iterator.next(); //TODO 发送心跳包 } } } |
|
返回顶楼 | |