浏览 6825 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2013-01-07
最后修改:2013-01-07
以下是本人的websocket 协议解析,框架是基于mina + spring 做的。 需要的mina jar包有 mina-core-2.0.4.jar mina-integration-beans-2.0.4.jar mina-integration-jmx-2.0.4.jar mina-integration-ognl-2.0.4.jar mina-integration-spring-1.1.7.jar.zip 大家还没搞出来的可以参照一下,若不对的还希望各位大神指出或者线下交流 QQ:593040793 import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.codehaus.jackson.map.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import qq.web.model.Data; import qq.web.service.MessageData; import qq.web.service.UserService; /** * * @author 程欣伟 * */ public class WebSocketIoHandler extends IoHandlerAdapter { @Autowired private MessageData messageData; @Autowired private UserService userService; public static final String INDEX_KEY = WebSocketIoHandler.class.getName() + ".INDEX"; //key=sessionId value = session sid 和 session对应 private Map<Long, IoSession> ioSessionMap = new HashMap<Long, IoSession>(); //key = userId value = sessionId 用户和 sid 对应 private Map<Integer,Long> userSessionMap = new HashMap<Integer, Long>(); /** * 将IoBuffer转换成string * @author 程欣伟 * @param message * @return */ public String ioBufferToString(Object message) { if (!(message instanceof IoBuffer)){ return ""; } IoBuffer ioBuffer = (IoBuffer) message; return new String(ioBuffer.array()); } /** * 当有请求消息时触发 * @author 程欣伟 * @return */ @Override public void messageReceived(IoSession session, Object message) throws Exception { // System.out.println(ioBufferToString(message)); //吧传入的消息转换成流 IoBuffer buffer = (IoBuffer)message; //转换成字节数组 byte[] b = new byte[buffer.limit()]; buffer.get(b); //获取sessionId Long sid = session.getId(); //如果没有此sessionId则代表第一次连接 if (!ioSessionMap.containsKey(sid)) { //把此session放入map ioSessionMap.put(sid, session); byte[] bufferAry = buffer.array(); String m = new String(bufferAry); //获取握手协议字符串 String sss = getSecWebSocketAccept(m); buffer.clear(); buffer.put(sss.getBytes("utf-8")); buffer.flip(); session.write(buffer); buffer.free(); } else { //存在session //解析传输的数据内容 String str = decode(b); // System.out.println(roleStr); //--------------- 业务开始 ---------------- // ObjectMapper objectMapper = new ObjectMapper(); Data data = null; try{ data = objectMapper.readValue(str, Data.class); }catch(Exception e){ return ; } try{ if(data.getDataType().equals("message")){ messageData.processMessage(data.getData()); }else if(data.getDataType().equals("login")){ userService.processUser(data.getData(),sid); } }catch(Exception e){ //如果处理消息报错 则告知浏览器 e.printStackTrace(); data.setData("false"); sendMessageToHtml(objectMapper.writeValueAsString(data), sid); } } } /** * 发送消息给浏览器 * @author 程欣伟 * @param msg * @param sid * @return */ public boolean sendMessageToHtml(String msg,Long sid) { boolean sendFlag = true; try{ //获取字节数组 byte[] bb = encode( msg); //创建IO流 IoBuffer ioBuffer = IoBuffer.allocate(bb.length); //把字节数组写入流中 ioBuffer.put(bb); //api 解释为翻转 但是目前不知道什么意思 ioBuffer.flip(); //同步块 synchronized (ioSessionMap) { //获取所有的session IoSession ioSession = ioSessionMap.get(sid); if (ioSession!=null&&ioSession.isConnected()) { //复制一个新的buffer IoBuffer writeResult = ioBuffer.duplicate(); ioSession.write(writeResult); }else{ sendFlag = false; } } ioBuffer.free(); }catch(Exception e){ e.printStackTrace(); sendFlag=false; } return sendFlag; } @Override public void sessionOpened(IoSession session) throws Exception { session.setAttribute(INDEX_KEY, 0); } // @Override // public void sessionIdle( IoSession session, IdleStatus status ) throws Exception { // System.out.println( "IDLE " + session.getIdleCount( status )); // } /** * 当ws连接断开时触发 */ @Override public void sessionClosed(IoSession session) throws Exception { //如果不连接的话 则删除 ioSessionMap.remove(session);; } /** * 根据用户ID 获取 session连接 * @param userId * @return */ public IoSession getSessionByUserId(int userId){ Long sid = userSessionMap.get(userId); if(sid==null){ return null; } IoSession session = ioSessionMap.get(sid); if(session==null){ userSessionMap.remove(userId); return null; } return session; } /** * * @author 程欣伟 * 获取握手协议 字符串 * 首先要获取到请求头中的Sec-WebSocket-Key的值,再把这一段GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11 * 加到获取到的Sec-WebSocket-Key的值的后面,然后拿这个字符串做SHA-1 hash计算,然后再把得到的结果通过base64加密 * @param key * @return */ private String getSecWebSocketAccept(String key) { String secKey = getSecWebSocketKey(key); String guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; secKey += guid; try { MessageDigest md = MessageDigest.getInstance("SHA-1"); md.update(secKey.getBytes("iso-8859-1"), 0, secKey.length()); byte[] sha1Hash = md.digest(); secKey = base64Encode(sha1Hash); } catch (Exception e) { e.printStackTrace(); } String rtn = "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: " + secKey + "\r\n\r\n"; return rtn; } /** * * @author 程欣伟 * 获取到请求头中的Sec-WebSocket-Key的 * @param req * @return */ private String getSecWebSocketKey(String req) { Pattern p = Pattern.compile("^(Sec-WebSocket-Key:).+", Pattern.CASE_INSENSITIVE | Pattern.MULTILINE); Matcher m = p.matcher(req); if (m.find()) { String foundstring = m.group(); return foundstring.split(":")[1].trim(); } else { return null; } } /** * base64 * @param input * @return */ private String base64Encode(byte[] input) { return new String(org.apache.mina.util.Base64.encodeBase64(input)); } /** * @author 程欣伟 * 把传入的消息解码 * @param receivedDataBuffer * @return * @throws UnsupportedEncodingException */ private String decode(byte[] receivedDataBuffer) throws UnsupportedEncodingException { String result = null; //数据开始的位数 前面2个byte 固定必须存在 int dataStartIndex=2; //查看第一帧的值 代表是否结束 int isend = receivedDataBuffer[0]>>7&0x1; System.out.println("是否结束:【"+(isend==1?"yes":"no")+"】"); //获取是否需要掩码 boolean mask = ((receivedDataBuffer[1]>>7&0x1)==1)?true:false; System.out.println("掩码:【"+(mask?"yes":"no")+"】"); //Payload length: 传输数据的长度,以字节的形式表示:7位、7+16位、或者7+64位。 //如果这个值以字节表示是0-125这个范围,那这个值就表示传输数据的长度; //如果这个值是126,则随后的两个字节表示的是一个16进制无符号数,用来表示传输数据的长度; //如果这个值是127,则随后的是8个字节表示的一个64位无符合数,这个数用来表示传输数据的长度 int dataLength = receivedDataBuffer[1] & 0x7F; System.out.println("描述消息长度:【"+dataLength+"】"); //查看 消息描述 是否大于 126 如果大于 if(dataLength<126){ //126以内取本身 }else if(dataLength==126){ dataStartIndex = dataStartIndex +2; }else if(dataLength==127){ dataStartIndex = dataStartIndex +8; } //掩码数组 byte[] frameMaskingAry = new byte[4]; if(mask){ for(int i=0;i<frameMaskingAry.length;i++){ frameMaskingAry[i] = receivedDataBuffer[dataStartIndex+i]; } dataStartIndex += 4; } // 计算非空位置 int lastStation = receivedDataBuffer.length - 1; // 利用掩码对org-data进行异或 int frame_masking_key = 0; //保存数据的 数组 byte[] dataByte = new byte[lastStation-dataStartIndex+1]; if(mask){ for (int i = dataStartIndex; i <= lastStation; i++) { //吧数据进行异或运算 receivedDataBuffer[i] = (byte) (receivedDataBuffer[i] ^ frameMaskingAry[frame_masking_key%4]); //吧进行异或运算之后的 数据放入数组 dataByte[i-dataStartIndex]=receivedDataBuffer[i]; frame_masking_key++; } } result = new String(dataByte, "UTF-8"); System.out.println(result); return result; } /** * @author 程欣伟 * 对传入数据进行无掩码转换 * @param msg * @return * @throws UnsupportedEncodingException */ private byte[] encode(String msg) throws UnsupportedEncodingException { // 掩码开始位置 int masking_key_startIndex = 2; byte[] msgByte = msg.getBytes("UTF-8"); // 计算掩码开始位置 if (msgByte.length <= 125) { masking_key_startIndex = 2; } else if (msgByte.length > 65536) { masking_key_startIndex = 10; } else if (msgByte.length > 125) { masking_key_startIndex = 4; } // 创建返回数据 byte[] result = new byte[msgByte.length + masking_key_startIndex]; // 开始计算ws-frame // frame-fin + frame-rsv1 + frame-rsv2 + frame-rsv3 + frame-opcode result[0] = (byte) 0x81; // 129 // frame-masked+frame-payload-length // 从第9个字节开始是 1111101=125,掩码是第3-第6个数据 // 从第9个字节开始是 1111110>=126,掩码是第5-第8个数据 if (msgByte.length <= 125) { result[1] = (byte) (msgByte.length); } else if (msgByte.length > 65536) { result[1] = 0x7F; // 127 } else if (msgByte.length > 125) { result[1] = 0x7E; // 126 result[2] = (byte) (msgByte.length >>; result[3] = (byte) (msgByte.length % 256); } // 将数据编码放到最后 for (int i = 0; i < msgByte.length; i++) { result[i + masking_key_startIndex] = msgByte[i]; } decode(result); String str = new String(result ,"utf-8"); System.out.println(str); return result; } public Map<Long, IoSession> getIoSessionMap() { return ioSessionMap; } public void setIoSessionMap(Map<Long, IoSession> ioSessionMap) { this.ioSessionMap = ioSessionMap; } public Map<Integer, Long> getUserSessionMap() { return userSessionMap; } public void setUserSessionMap(Map<Integer, Long> userSessionMap) { this.userSessionMap = userSessionMap; } public static void main(String[] args) throws UnsupportedEncodingException { byte b = 8; System.out.println("" + (byte) ((b >> 7) & 0x1) + (byte) ((b >> 6) & 0x1) + (byte) ((b >> 5) & 0x1) + (byte) ((b >> 4) & 0x1) + (byte) ((b >> 3) & 0x1) + (byte) ((b >> 2) & 0x1) + (byte) ((b >> 1) & 0x1) + (byte) ((b >> 0) & 0x1) ); byte[] a = {(byte)104,(byte)49}; System.out.println(new String(a,"utf-8")); } } 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |