在网上找了很多关于websocket协议的资料。我发现大部分的资料或是实现记录的都不完整,或者只给出了最基本的实现。于是,我花了一周的业余实现写了一个相对完整的实现。
首先是解码器部分:
public class WSDecoder extends CumulativeProtocolDecoder { private final static String REQUEST_CONTEXT_KEY = "__REQUEST_DATA_CONTEXT"; private final static String END_TAG = "\r\n"; private enum FrameType { Text,Binary,Control; } private class RequestDataContext { private IoBuffer _tmp; private CharsetDecoder _charsetDecoder; private FrameType _frameType; RequestDataContext(String charset) { this._tmp = IoBuffer.allocate(512).setAutoExpand(true); this._charsetDecoder = Charset.forName("utf-8").newDecoder(); } public FrameType getFrameType() { return this._frameType; } public String getDataAsString() { try { _tmp.flip(); return _tmp.getString(_charsetDecoder); } catch (CharacterCodingException e) { return null; } } public byte[] getDataAsArray() { _tmp.flip(); byte[] data = new byte[_tmp.remaining()]; _tmp.get(data); return data; } void append(byte[] data) { this._tmp.put(data); } void setFrameType(FrameType _frameType) { this._frameType = _frameType; } } private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; public void setByteOrder(ByteOrder byteOrder) { this.byteOrder = byteOrder; } private CharsetDecoder charsetDecoder = Charset.forName("utf-8") .newDecoder(); public void setCharsetDecoder(CharsetDecoder charsetDecoder) { this.charsetDecoder = charsetDecoder; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput decoderOutput) throws CharacterCodingException, NoSuchAlgorithmException { if (!in.hasRemaining()) return false; WSSessionState state = getSessionState(session); switch (state) { case Handshake: doHandshake(session, in); break; case Connected: if (in.remaining() < 2) return false; in.mark().order(this.byteOrder); byte fstByte = in.get(); int opCode = fstByte & 0xf; switch (opCode) { case 0x0: case 0x1: case 0x2: boolean isFinalFrame = fstByte < 0; boolean isRsvColZero = (fstByte & 0x70) == 0; if (!isRsvColZero) { closeConnection(session, in); break; } byte secByte = in.get(); boolean isMasking = secByte < 0; int dataLength = 0; byte payload = (byte) (secByte & 0x7f); if (payload == 126) dataLength = in.getUnsignedShort(); else if (payload == 127) dataLength = (int) in.getLong(); else dataLength = payload; if (in.remaining() < (isMasking ? dataLength + 4 : dataLength)) { in.reset(); return false; } byte[] mask = new byte[4]; byte[] data = new byte[dataLength]; if (isMasking) in.get(mask); in.get(data); // 用掩码处理数据。 for( int i=0, maskLength=mask.length, looplimit=data.length; i<looplimit; i++ ) data[i] = (byte)(data[i] ^ mask[i % maskLength]); // 创建一个对象保存“数据帧的数据类型”。协议规定——对于分片的数据只有第一帧会携带数据类型信息,所以要新建对象保存数据类型,以应对分片。 RequestDataContext context = (RequestDataContext) session .getAttribute(REQUEST_CONTEXT_KEY); if (context == null) { context = new RequestDataContext(charsetDecoder.charset() .name()); context.setFrameType((opCode == 0x1) ? FrameType.Text : FrameType.Binary); session.setAttribute(REQUEST_CONTEXT_KEY, context); } context.append(data); if (isFinalFrame) { context = (RequestDataContext) session.removeAttribute(REQUEST_CONTEXT_KEY); if (context.getFrameType() == FrameType.Text) decoderOutput.write(context.getDataAsString()); else decoderOutput.write(context.getDataAsArray()); return true; } else return false; case 0x3: case 0x4: case 0x5: case 0x6: case 0x7: break; case 0x8: closeConnection(session, in); break; case 0x9: case 0xA: default: closeConnection(session, in); break; } break; default: closeConnection(session, in); break; } return true; } private void doHandshake(IoSession session, IoBuffer in) throws CharacterCodingException, NoSuchAlgorithmException { String handshakeMessage = in.getString(charsetDecoder); String[] msgColumns = splitHandshakeMessage(handshakeMessage); String requestURI = msgColumns[0]; String httpVersion = requestURI.substring( requestURI.lastIndexOf("/") + 1, requestURI.length()); String upgradeCol = getMessageColumnValue(msgColumns, "Upgrade:"); String connectionCol = getMessageColumnValue(msgColumns, "Connection:"); String secWsProtocolCol = getMessageColumnValue(msgColumns, "Sec-WebSocket-Protocol:"); String secWskeyCol = getMessageColumnValue(msgColumns, "Sec-WebSocket-Key:"); String wsVersionCol = getMessageColumnValue(msgColumns, "Sec-WebSocket-Version:"); // 校验重要字段。任何字段不满足条件,都会导致握手失败! boolean hasWebsocket = contains(upgradeCol, "websocket"); boolean hasUpgrade = contains(connectionCol, "upgrade"); boolean isGetMethod = "GET".equalsIgnoreCase(subString(requestURI, 1, " ")); boolean isSecWsKeyNull = secWskeyCol == null || secWskeyCol.isEmpty(); boolean isValidVersion = "13".equals(wsVersionCol); boolean isValidHttpVer = Float.parseFloat(httpVersion) >= 1.1F; if (!hasWebsocket || !hasUpgrade || !isGetMethod) throw new WSException("Invalid websocket request!"); if (isSecWsKeyNull || !isValidVersion || !isValidHttpVer) throw new WSException("Invalid websocket request!"); String secWsAccept = getSecWebsocketAccept(secWskeyCol); String response = getResponseData(upgradeCol, connectionCol, secWsAccept, secWsProtocolCol); initRequestContext(session, msgColumns); session.write(response); } private String[] splitHandshakeMessage(String handshakeMessage) { StringTokenizer st = new StringTokenizer(handshakeMessage, END_TAG); String[] result = new String[st.countTokens()]; for (int i = 0; st.hasMoreTokens(); i++) result[i] = st.nextToken(); return result; } private boolean contains(String src, String target) { if (src == null || src.isEmpty()) return false; else return src.toLowerCase().contains(target); } private String getSecWebsocketAccept(String secWebsocketkey) throws NoSuchAlgorithmException { StringBuilder srcBuilder = new StringBuilder(); srcBuilder.append(secWebsocketkey); srcBuilder.append("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); MessageDigest md = MessageDigest.getInstance("SHA-1"); md.update(srcBuilder.toString().getBytes(charsetDecoder.charset())); byte[] ciphertext = md.digest(); String result = new String(Base64.encodeBase64(ciphertext), charsetDecoder.charset()); return result; } private String getResponseData(String upgrade, String connection, String secWsAccept, String secWsProtocol) { StringBuilder result = new StringBuilder(); result.append("HTTP/1.1 101 Switching Protocols\r\n"); result.append("Upgrade:").append(upgrade).append(END_TAG); result.append("Connection:").append(connection).append(END_TAG); result.append("Sec-WebSocket-Accept:").append(secWsAccept) .append(END_TAG); if (secWsProtocol != null && !"".equals(secWsProtocol)) result.append("Sec-WebSocket-Protocol:").append(secWsProtocol) .append(END_TAG); result.append(END_TAG); return result.toString(); } private void closeConnection(IoSession session, IoBuffer buffer) { buffer.free(); session.close(true); } @SuppressWarnings("unused") private void closeConnection(IoSession session, String errorMsg) { session.write(errorMsg).addListener( new IoFutureListener<WriteFuture>() { @Override public void operationComplete(WriteFuture future) { future.getSession().close(true); } }); } private void initRequestContext(IoSession session, String[] data) { session.setAttribute("__SESSION_CONTEXT", data); } }
下面是编码器部分:
public class WSEncoder extends ProtocolEncoderAdapter { private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; public void setByteOrder(ByteOrder byteOrder) { this.byteOrder = byteOrder; } private CharsetEncoder charsetEncoder = Charset.forName("utf-8") .newEncoder(); public void setCharsetEncoder(CharsetEncoder charsetEncoder) { this.charsetEncoder = charsetEncoder; } private int defaultPageSize = 65536; public void setDefaultPageSize(int defaultPageSize) { this.defaultPageSize = defaultPageSize; } // 返回的数据默认不必进行掩码运算。 private boolean isMasking = false; public void setIsMasking(boolean masking) { this.isMasking = masking; } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput encoderOutput) throws CharacterCodingException { IoBuffer buff = IoBuffer.allocate(1024).setAutoExpand(true); WSSessionState status = getSessionState(session); switch (status) { case Handshake: try { buff.putString((String) message, charsetEncoder) .flip(); encoderOutput.write(buff); } catch (CharacterCodingException e) { session.close(true); } session.setAttribute(WSSessionState.ATTRIBUTE_KEY, WSSessionState.Connected); break; case Connected: if (!session.isConnected() || message == null) return; byte dataType = 1; // 将数据统一转换成byte数组进行处理。 byte[] data = null; if (message instanceof String) data = ((String) message).getBytes(charsetEncoder.charset()); else { data = (byte[]) message; dataType = 2; } // 生成掩码。 byte[] mask = new byte[4]; Random random = new Random(); // 用掩码处理数据。 for( int i=0,limit=data.length; i<limit; i++) data[i] = (byte)(data[i] ^ mask[i%4]); /** * 以分片的方式向客户端推送数据。 */ int pageSize = this.defaultPageSize; // 分页大小 int remainLength = data.length; // 剩余数据长度 int realLength = 0; // 数据帧中“负载数据”的实际长度 int dataIndex = 0; for( boolean isFirstFrame=true, isFinalFrame = false; !isFinalFrame; buff.clear(), isFirstFrame=false) { int headerLeng = 2; int payload = 0; if (remainLength > 0 && remainLength <= 125) { payload = remainLength; } else if (remainLength > 125 && remainLength <=65536) { payload = 126; } else { payload = 127; } switch(payload) { case 126 : headerLeng += 2; break; case 127 : headerLeng += 8; break; default : headerLeng += 0; break; } headerLeng += isMasking ? 4 : 0; // 计算当前帧中剩余的可用于保存“负载数据”的字节长度。 realLength = ( pageSize - headerLeng ) >= remainLength ? remainLength : ( pageSize - headerLeng ); // 判断当前帧是否为最后一帧。 isFinalFrame = (remainLength - (pageSize - headerLeng)) < 0; // 生成第一个字节 byte fstByte = (byte)(isFinalFrame ? 0x80 : 0x0); // 若当前帧为第一帧,则还需保存数据类型信息。 fstByte += isFirstFrame ? dataType : 0; buff.put(fstByte); // 生成第二个字节。判断是否需要掩码,若需要掩码,则置标志位的值为1. byte sndByte = (byte)(isMasking ? 0x80 : 0); // 保存payload信息。 sndByte += payload; buff.put(sndByte); switch(payload) { case 126 : buff.putUnsignedShort(realLength); break; case 127 : buff.putLong(realLength); break; default : break; } if (isMasking) { random.nextBytes(mask); buff.put(mask); } for(int loopCount=dataIndex+realLength, i=0; dataIndex<loopCount; dataIndex++,i++) buff.put((byte)(data[i] ^ mask[i%4])); buff.flip(); encoderOutput.write(buff); remainLength -= (pageSize - headerLeng); } break; default: session.close(true); break; } } }
下面是工具类实现:
final class WSToolKit { private WSToolKit() { } static enum WSSessionState { Handshake, Connected; public final static String ATTRIBUTE_KEY = "__SESSION_STATE"; } static <T> T nvl(T t1, T t2) { return t1 == null ? t2 : t1; } static WSSessionState getSessionState (IoSession session) { WSSessionState state = (WSSessionState) session .getAttribute(WSSessionState.ATTRIBUTE_KEY); if (state == null) { state = WSSessionState.Handshake; session.setAttribute(WSSessionState.ATTRIBUTE_KEY, state); } return state; } static String getMessageColumnValue(String[] headers, String headerTag) { for (String header : headers) { if( header.startsWith(headerTag) ) return header.substring(headerTag.length(), header.length()) .trim(); } return null; } static String subString(String src, int order, String flag) { for (int i = 1, j = 0, k = 0;; i++) { j = src.indexOf(flag, k); if (i < order) { if (j == -1) return ""; else k = j + 1; } else { if (j == -1) return src.substring(k, src.length()); else return src.substring(k, j); } } } }
如果大家想转载,请注明出处!谢谢。
相关推荐
在这里,我们关注的是基于Mina框架的WebSocket服务器。Mina(Java Network Application Platform)是一个网络通信框架,它简化了开发高性能、高可用性的网络应用程序的过程。Mina提供了异步的事件驱动模型,可以处理...
总结起来,基于MINA的TLS/SSL NIO Socket实现涉及到的主要步骤包括: 1. 创建SSLContext并配置证书、密钥和协议。 2. 创建SSLEngine,并根据应用角色设置工作模式。 3. 在MINA的IoFilterChain中添加SSLFilter。 4. ...
2. **MINA Server端实现**:在MINA中,Server端主要负责监听指定的端口,接收到客户端连接请求后,创建一个Session对象来代表与该客户端的连接。Server端可以通过自定义的Filter(过滤器)来处理进来的数据,如解码...
基于Java的米娜框架,报告对使用基于Java、websocket协议的网页聊天室的过程和技术做了详细的叙述首先,对现有网页进行了分析与评价。首先, 启动后台服务器,然后连接站点,客户端在pc端输入网站或者在手机端扫...
【基于Mina的Android消息推送系统】是一种针对移动互联网中消息传输实时性和效率优化的技术解决方案。随着智能手机技术的迅速崛起,尤其是iOS和Android操作系统的普及,移动应用的需求日益增长,传统服务器被动接收...
在这个实例中,我们将深入探讨如何利用Apache Mina实现TCP的长连接和短连接。 首先,TCP(传输控制协议)是互联网上广泛使用的面向连接的协议,它保证了数据的可靠传输。TCP连接分为两种类型:长连接和短连接。 1....
- 设计编解码器时,通常需要继承mina提供的AbstractDecoder或AbstractEncoder基类,并实现decode()或encode()方法。 - 编解码器应该考虑数据的边界问题,确保正确识别和处理完整的数据包。 4. **示例解析** - ...
标题中的"SSI+Mina2(Struts2+Spring4+Mybatis3+Mina2)集成发布就可运行"指的是一个基于Java的Web应用程序开发框架的整合,它结合了多种技术来构建高效、灵活和可扩展的网络应用。这个集成方案主要包括以下组件: 1....
综上所述,实现微信小程序的在线聊天功能基于MINA框架,涉及到了前端的界面设计、数据绑定、事件处理,以及后端的网络通信、数据存储和用户认证等多个方面。通过熟练运用这些技术,我们可以构建出稳定、高效的在线...
2. 事件驱动:MINA基于事件驱动的设计,当网络事件发生时,如数据到达或连接建立,框架会触发相应的回调方法。 3. 丰富的过滤器链:MINA的过滤器机制可以让你在数据传输过程中添加自定义的处理逻辑,例如数据编码...
而"ws-server"则可能是一个基于MINA的WebSocket服务器,WebSocket是一种在浏览器和服务器之间提供全双工通信的协议,MINA 提供了WebSocket的支持,使得开发者能够方便地构建实时交互的应用。 综上所述,MINA作为一...
2. **Handler处理**:MINA的处理器是网络事件的实际处理者,负责接收和发送数据。在HTTP协议实例中,Handler会解析接收到的HTTP请求,并生成相应的HTTP响应。 3. **HTTP协议解析**:在MINA中,我们需要编写代码来...
《基于Mina实现的位置分享系统详解》 在当今数字化时代,位置分享系统已经成为了人们日常生活中不可或缺的一部分,它广泛应用于社交、导航、紧急救援等多个领域。本文将详细讲解如何使用Apache Mina框架来构建一个...
2. **UDP服务**: 利用MINA开发基于UDP协议的应用,如广播和多播。 3. **WebSocket服务**: MINA可以构建WebSocket服务器,提供实时双向通信。 4. **FTP/SMTP服务器**: 使用MINA构建网络协议服务器,如FTP或SMTP。 5. ...
2. **事件驱动**:MINA基于事件驱动的设计,当网络事件(如连接建立、数据读取、连接关闭等)发生时,会触发相应的处理器或过滤器链。这使得代码更易于维护和扩展,因为业务逻辑与底层I/O操作分离。 3. **过滤器...
在项目中引入MINA,你可以创建高效的服务端口,如TCP服务器或WebSocket服务器。 2. **Spring Framework**: Spring是Java领域的主流应用框架,它提供了依赖注入(DI)和面向切面编程(AOP)等核心特性。Spring帮助...
基于mina或netty框架下的推送系统,或许有一些企业有着自己一套即时通讯系统的需求,那么CIM为您提供了一个解决方案,目前CIM支持websocket,android,ios,桌面应用,系统应用等多端接入支持,可应用于移动应用,...
【标题】"原创nio socket mina+javascript+flash实现commet长连接网页聊天室"揭示了一个基于Java NIO(Non-blocking I/O)的Socket通信框架Mina与JavaScript、Flash技术结合,实现COMET(Comet是使服务器向浏览器推...
- NIO(Non-blocking I/O):Mina基于Java NIO API构建,实现了非阻塞I/O,提高了处理大规模并发连接的能力。 - 事件驱动:Mina通过事件监听器模式来处理网络事件,如连接建立、数据读写等,降低了复杂性。 - 连接器...