前段时间用bio方式,也就是传统io实现了socket的长连接和心跳,总觉着服务端开启多线程管理socket连接的方式过于消耗资源,数据并发的情况下可能会影响到性能,因此就尝试使用nio改进原来的代码。
然而改进的过程却不像我起初设想的那般容易,可以说一波三折,原因主要是nio读写都是字节流,LZ一开始依然通过ObjectOutputStream.writeObject直接向Socket服务端发送数据,然而问题出现了,每次从ByteBuffer解析出来字节流都不一样,LZ使出浑身解数,一个字节一个字节的读取啊,问题没有了,可是由于是长连接,数据怎么解析啊,查资料,找大神,最后一个网友说有可能是粘包和分包的问题,一时晕菜,LZ网络可是渣渣啊,行吧,恶补一番,想了解的童鞋可以看看这个。http://blog.csdn.net/sunmenggmail/article/details/38952131
实现原理就像很多协议那样,自定义一套传输协议,比如消息长度(int型,4个字节)+消息体的方式,根据解析的消息长度定长解析消息内容,虽然最后证明LZ的问题不是由于粘包和分包造成的,但是LZ就这样歪打正着,给实现了!!!数据不正常的问题后来通过DataOutputStream和DataInputStream的方式也得到了解决。
废话多了,帖代码。
服务端:
package com.feng.test.longconnection1; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import org.apache.commons.lang.ArrayUtils; /** * * @author songfeng * @version 1.0 * @since 2015-10-24 * @category com.feng.test.longconnection * */ public class Server { private Map<String, Long> heatTimeMap = new HashMap<String, Long>(); public Server(int port) { Selector selector = null; ServerSocketChannel serverChannel = null; try { //获取一个ServerSocket通道 serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); //获取通道管理器 selector = Selector.open(); //将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件, //只有当该事件到达时,Selector.select()会返回,否则一直阻塞。 serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { //选择注册过的io操作的事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey readyKey = it.next(); //删除已选key,防止重复处理 it.remove(); if (readyKey.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) readyKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); // 连接成功后,注册接收服务器消息的事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if(readyKey.isReadable()) { SocketChannel socketChannel = (SocketChannel)readyKey.channel(); Object obj = receiveData(socketChannel); String msg = "Server back:"; if(obj instanceof String) { String id = obj.toString().split(",")[0]; if(heatTimeMap.get(id) != null && System.currentTimeMillis() - heatTimeMap.get(id) > 5000) { socketChannel.socket().close(); } else { heatTimeMap.put(id, System.currentTimeMillis()); } long time = System.currentTimeMillis(); msg += time + "\n"; sendData(socketChannel, msg); } else if(obj instanceof Pojo) { msg += ((Pojo)obj).getName() + "\n"; sendData(socketChannel, msg); } } } } } catch (Exception e) { e.printStackTrace(); } finally { try { selector.close(); if(serverChannel != null) { serverChannel.close(); } } catch (Exception e) { e.printStackTrace(); } } } private static Object receiveData(SocketChannel socketChannel) { Object obj = null; ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteBuffer intBuffer = ByteBuffer.allocate(4); ByteBuffer objBuffer = ByteBuffer.allocate(1024); int size = 0; int sum = 0; int objlen = 0; byte[] bytes = null; try { while((size = socketChannel.read(intBuffer)) > 0) { intBuffer.flip(); bytes = new byte[size]; intBuffer.get(bytes); baos.write(bytes); intBuffer.clear(); if(bytes.length == 4) { objlen = bytesToInt(bytes,0); } if(objlen > 0) { byte[] objByte = new byte[0]; while(sum != objlen) { size = socketChannel.read(objBuffer); if(size > 0) { objBuffer.flip(); bytes = new byte[size]; objBuffer.get(bytes,0,size); baos.write(bytes); objBuffer.clear(); objByte = ArrayUtils.addAll(objByte, bytes); sum += bytes.length; } } obj = ByteToObject(objByte); break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { baos.close(); } catch (Exception e) { e.printStackTrace(); } } return obj; } private static void sendData(SocketChannel socketChannel,Object obj) { byte[] bytes = ObjectToByte(obj); ByteBuffer buffer = ByteBuffer.wrap(bytes); try { socketChannel.write(buffer); } catch (IOException e) { e.printStackTrace(); } } /** * byte数组中取int数值,本方法适用于(低位在前,高位在后)的顺序。 * * @param ary * byte数组 * @param offset * 从数组的第offset位开始 * @return int数值 */ public static int bytesToInt(byte[] ary, int offset) { int value; value = (int) ((ary[offset]&0xFF) | ((ary[offset+1]<<8) & 0xFF00) | ((ary[offset+2]<<16)& 0xFF0000) | ((ary[offset+3]<<24) & 0xFF000000)); return value; } public static Object ByteToObject(byte[] bytes) { Object obj = null; try { // bytearray to object ByteArrayInputStream bi = new ByteArrayInputStream(bytes); ObjectInputStream oi = new ObjectInputStream(bi); obj = oi.readObject(); bi.close(); oi.close(); } catch (Exception e) { //e.printStackTrace(); } return obj; } public static byte[] ObjectToByte(Object obj) { byte[] bytes = null; try { // object to bytearray ByteArrayOutputStream bo = new ByteArrayOutputStream(); ObjectOutputStream oo = new ObjectOutputStream(bo); oo.writeObject(obj); bytes = bo.toByteArray(); bo.close(); oo.close(); } catch (Exception e) { e.printStackTrace(); } return bytes; } public static void main(String[] args) { Server server = new Server(55555); } }
客户端:
package com.feng.test.longconnection1; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; /** * * @author songfeng * @version 1.0 * @since 2015-10-24 * @category com.feng.test.longconnection * */ public class Client { private Socket socket; private String ip; private int port; private String id; DataOutputStream dos; DataInputStream dis; public Client(String ip, int port,String id) { try { this.ip = ip; this.port = port; this.id = id; this.socket = new Socket(ip, port); //this.socket.setKeepAlive(true); dos = new DataOutputStream(socket.getOutputStream()); dis = new DataInputStream(socket.getInputStream()); new Thread(new heartThread()).start(); new Thread(new MsgThread()).start(); } catch (Exception e) { e.printStackTrace(); } } public void sendMsg(Object content) { try { int len = ObjectToByte(content).length; ByteBuffer dataLenBuf = ByteBuffer.allocate(4); dataLenBuf.order(ByteOrder.LITTLE_ENDIAN); dataLenBuf.putInt(0, len); dos.write(dataLenBuf.array(), 0 , 4); dos.flush(); dos.write(ObjectToByte(content)); dos.flush(); } catch (Exception e) { e.printStackTrace(); closeSocket(); } } public void closeSocket() { try { socket.close(); dos.close(); dis.close(); } catch (IOException e) { e.printStackTrace(); } } public static byte[] ObjectToByte(Object obj) { byte[] bytes = null; try { // object to bytearray ByteArrayOutputStream bo = new ByteArrayOutputStream(); ObjectOutputStream oo = new ObjectOutputStream(bo); oo.writeObject(obj); bytes = bo.toByteArray(); bo.close(); oo.close(); } catch (Exception e) { e.printStackTrace(); } return bytes; } public static Object ByteToObject(byte[] bytes) { Object obj = null; try { // bytearray to object ByteArrayInputStream bi = new ByteArrayInputStream(bytes); ObjectInputStream oi = new ObjectInputStream(bi); obj = oi.readObject(); bi.close(); oi.close(); } catch (Exception e) { e.printStackTrace(); } return obj; } class heartThread implements Runnable { @Override public void run() { while(true) { try { Thread.sleep(1000); long time = System.currentTimeMillis(); //System.out.println("client send:" + time); sendMsg("Client" + id + "," + time); } catch (Exception e) { e.printStackTrace(); } } } } class MsgThread implements Runnable { @Override public void run() { int temp; while(true) { try { if(socket.getInputStream().available() > 0) { byte[] bytes = new byte[1024]; int len = 0; while((char)(temp = dis.read()) != '\n') { bytes[len]=(byte)temp; len++; } System.out.println(ByteToObject(bytes)); } } catch (Exception e) { closeSocket(); } } } } public static void main(String[] args) { Client client1 = new Client("127.0.0.1", 55555, "1"); client1.sendMsg(new Pojo("songfeng", 26, new ArrayList<String>())); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } Client client2 = new Client("127.0.0.1", 55555, "2"); } }
数据类:
package com.feng.test.longconnection1; import java.io.Serializable; import java.util.List; /** * * @author songfeng * @version 1.0 * @since 2015-10-16 * @category com.feng.test.longconnection * */ public class Pojo implements Serializable { /** * 序列化 */ private static final long serialVersionUID = -8868529619983791261L; private String name; private int age; private List<String> likeThing; public Pojo(String name, int age, List<String> likeThing) { super(); this.name = name; this.age = age; this.likeThing = likeThing; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public List<String> getLikeThing() { return likeThing; } public void setLikeThing(List<String> likeThing) { this.likeThing = likeThing; } }
相关推荐
3. **心跳机制**:心跳包的设计和实现,包括心跳间隔时间、心跳包内容、超时策略、重试机制等。心跳包可以防止网络阻塞导致的假死,以及及时发现并处理真正的连接中断。 4. **数据编码与解码**:发送的数据通常需要...
Socket连接分为两种类型:长连接和短连接。这两种连接方式各有特点,适用于不同的应用场景。 **1. 短连接(Short Connection)** 短连接通常用于一次性、非持久性的通信,如HTTP协议就是典型的短连接。在短连接中...
1. **建立Socket连接**:客户端使用`Socket`类的构造函数,指定服务器的IP地址和端口号,建立到服务器的连接。 2. **心跳包设计**:定义心跳包的格式,例如可以是一个简单的JSON对象,包含时间戳和"ping"或"pong"的...
NioSocket是一个基于Java NIO(非阻塞I/O)技术实现的网络通信框架,它包含服务器端(Server)和客户端(Client)两部分。在Java编程中,NIO(New Input/Output)提供了一种不同于传统IO模型的I/O操作方式,其核心...
- **心跳机制**:为了检测连接是否断开,通常需要设置心跳包,定时发送和接收,防止连接因长时间无数据传输而被网络中间设备关闭。 - **线程安全**:在多线程环境下,需确保所有操作都是线程安全的,避免并发问题。...
4. **保持连接**: 实现长连接的关键是避免在完成一次通信后立即关闭连接。通常,我们会在客户端和服务端都设置合适的超时时间,只有在超过这个时间没有数据交换时才会关闭连接。 5. **心跳机制**: 为了检测连接是否...
为了实现长连接,客户端和服务端都需要处理异常,避免因为单次通信问题导致整个连接关闭。例如,设置超时和重试机制,或者在通信间隔期间保持心跳包以检测连接状态。 **注意事项:** 1. 异步处理:在实际应用中,...
本主题将深入探讨“socket自动连接”和“多客户端管理”这两个关键知识点。 首先,我们来理解“socket自动连接”。在客户端-服务器模型中,通常由客户端发起连接请求,与服务器建立TCP连接。所谓的“自动连接”,是...
为了实现`NIO`的`Socket`服务,我们需要自定义一个`NioServerSocketChannel`工厂类,注册到`Netty`或者`Undertow`等服务器容器中,这些服务器容器已经集成了对`NIO`的支持。 1. **配置SpringBoot**: 在`...
4. 如何处理Socket连接,发送和接收自定义协议的数据。 5. 示例代码展示如何在Java中编写和运行Netty应用。 这个demo将是一个很好的学习资源,帮助开发者理解Netty框架在实际应用中的工作原理,以及如何结合HTTP、...
在Java编程中,心跳包(Heartbeat Packet)是一种在网络通信中维持连接活性的重要机制,尤其在TCP协议下,心跳包可以解决长连接因网络延迟或静默而导致的连接超时问题。TCP是一种面向连接的、可靠的传输层协议,它...
客户端首先需要建立到服务器的Socket连接,然后在后台线程中定期发送心跳包。心跳包通常包含简单的标识信息,如“ping”。收到服务器的心跳回应后,客户端确认连接有效。若未收到回应,客户端应尝试重连。 五、心跳...
java socket client 断线重连的简单实现 有什么意见可以提哦
文件"MinaSocket"可能包含了实现上述功能的详细代码,包括服务端的Acceptor配置、过滤器设置、事件处理器编写,以及客户端的Socket连接、数据发送和接收等。通过阅读和理解这些代码,你可以更好地掌握Mina与Socket...
让我们深入探讨一下Java Socket连接的基本原理和使用方法。 1. **Socket类与ServerSocket类** - `Socket` 类:表示客户端的Socket,用于建立与服务器的连接。创建Socket对象时,会向服务器发起连接请求,连接成功...
在这个基于NIO非阻塞的Java聊天demo中,我们将会看到如何利用NIO实现一个支持单聊和群聊的应用。 首先,NIO的核心组件包括Channel、Buffer、Selector和Pipe。在传统的IO模型中,数据是从流的一端流向另一端,而在...
**实现长连接的关键步骤:** 1. **配置Acceptor:** 创建一个IoAcceptor实例,设置监听的端口,并绑定IoHandler来处理连接事件。 2. **编码和解码:** 实现ProtocolCodecFactory,定义如何将应用层的数据转化为...
在 Netty 中,我们可以通过保持 Channel 活跃状态来实现长连接。当客户端无数据传输时,服务器端可以发送心跳包以检测连接是否仍然有效。 在实际项目中,还需要考虑如异常处理、资源释放、连接管理和性能优化等多...
NIO 提供了一种非阻塞的通信方式,允许在单个线程中处理多个连接,从而减少了线程创建和销毁的开销。在NIO中,核心组件包括Selector、Channel和Buffer。Selector可以监控多个Channel,当Channel准备好读写操作时,...
而`Socket`类则表示客户端和服务器之间的实际连接,可以用来读取和写入数据。 下面我们将分步骤介绍如何使用Java Socket实现TCP双机通讯: 1. **服务器端实现**: - 首先,创建`ServerSocket`实例,指定监听的...