`
sunnylocus
  • 浏览: 875765 次
  • 性别: Icon_minigender_1
  • 来自: 美国图森
社区版块
存档分类
最新评论

Socket通信模式:收发线程互斥

    博客分类:
  • Java
阅读更多

      有做过通信程序或着短信接入程序的程序员都知道,与之通信的每条命令都由消息头和消息尾构成,消息头一般包括整个消息体的长度、流水号、命令类型等信息,客户端向服务端发送一个请求,服务端返回一个响应,请求的流水号和返回的流水号为一一对应关系。如图:

 

 一般我们做法是写一个同步的方法用于发送命令和接收命令,如

 

  public synchronized String recMsg(String reqMsg) {
        //TODO:发送消息 ..... 
       //TODO:接收消息 
     return 收到的消息 
 }

 

  这样做虽然能满足要求,但是效率不高,因为每发送一次命令,需要等到命令成功响应后才能继续发送下一条命令。用收发线程来实现下(直接从项目copy的代码):

package com.bill99.svr;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;

/**
 *<p>title: socket通信包装类</p>
 *<p>Description: </p>
 *<p>CopyRight: CopyRight (c) 2009</p>
 *<p>Company: 99bill.com</p>
 *<p>Create date: 2009-10-14</P>
 *author     sunnylocus
 *                v0.10 2009-10-14 初类
 *                v0.11 2009-11-12 对命令收发逻辑及收发线程互斥机制进行了优化,处理命令速度由原来8~16个/秒提高到25~32个/秒
 */ public class SocketConnection {

	private volatile Socket socket;
	private int timeout = 1000*10; //超时时间,初始值10秒
	private boolean isLaunchHeartcheck = false;//是否已启动心跳检测
	private boolean isNetworkConnect = false; //网络是否已连接
	private static String host = "";
	private static int port;
	static InputStream inStream = null;
	static OutputStream outStream = null;
	private static Logger log =Logger.getLogger(SocketConnection.class);
	private static SocketConnection socketConnection = null;
	private static java.util.Timer heartTimer=null;
	//-------------------------------------------
	//private final Map<String, Object> recMsgMap= Collections.synchronizedMap(new HashMap<String, Object>());
	private final ConcurrentHashMap<String, Object> recMsgMap = new ConcurrentHashMap<String, Object>();
	private static Thread receiveThread = null;
	private final ReentrantLock lock = new ReentrantLock();
	
	private SocketConnection(){
		Properties conf = new Properties();
		try {
			conf.load(SocketConnection.class.getResourceAsStream("test.conf"));
			this.timeout = Integer.valueOf(conf.getProperty("timeout"));
			init(conf.getProperty("ip"),Integer.valueOf(conf.getProperty("port")));
		} catch(IOException e) {
			log.fatal("socket初始化异常!",e);
			throw new RuntimeException("socket初始化异常,请检查配置参数");
		}
	}
	/**
	 * 单态模式 
	 */
	public static SocketConnection getInstance() {
		if(socketConnection==null) {
			synchronized(SocketConnection.class) {
				if(socketConnection==null) {
					socketConnection = new SocketConnection();
					return socketConnection;
				}
			}
		}
		return socketConnection;
	}
	
	private void init(String host,int port) throws IOException {
		InetSocketAddress addr = new InetSocketAddress(host,port);
		socket = new Socket();
		synchronized (this) {
			log.info("【准备与"+addr+"建立连接】");
			socket.connect(addr, timeout);
			log.info("【与"+addr+"连接已建立】");
			inStream = socket.getInputStream();
			outStream = socket.getOutputStream();
			socket.setTcpNoDelay(true);//数据不作缓冲,立即发送
			socket.setSoLinger(true, 0);//socket关闭时,立即释放资源
			socket.setKeepAlive(true);
			socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输
			isNetworkConnect=true;
			receiveThread = new Thread(new ReceiveWorker());
			receiveThread.start();
			SocketConnection.host=host;
			SocketConnection.port=port;
			if(!isLaunchHeartcheck)
				launchHeartcheck();
		}
	}
	/**
	 * 心跳包检测
	 */
	private void launchHeartcheck() {
		if(socket == null)
			throw new IllegalStateException("socket is not established!");
		heartTimer  = new Timer();
		isLaunchHeartcheck = true;
		heartTimer.schedule(new TimerTask() {
			public void run() {
				String msgStreamNo = StreamNoGenerator.getStreamNo("kq");
				int mstType =9999;//999-心跳包请求
				SimpleDateFormat dateformate = new SimpleDateFormat("yyyyMMddHHmmss");
				String msgDateTime = dateformate.format(new Date());
				int msgLength =38;//消息头长度
				String commandstr = "00" +msgLength + mstType + msgStreamNo;
				log.info("心跳检测包 -> IVR "+commandstr);
				int reconnCounter = 1;
				while(true) {
					String responseMsg =null;
					try {
						responseMsg = readReqMsg(commandstr);
					} catch (IOException e) {
						log.error("IO流异常",e);
						reconnCounter ++;
					} 
					if(responseMsg!=null) {
						log.info("心跳响应包 <- IVR "+responseMsg);
						reconnCounter = 1;
						break;
					} else { 
						reconnCounter ++;
					}
					if(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,重新建立连接。连接未被建立时不释放锁
						reConnectToCTCC(); break;
					}
				}
			}
		},1000 * 60*1,1000*60*2);
	}
	
	/**
	 * 重连与目标IP建立重连
	 */
	private void reConnectToCTCC() {
		new Thread(new Runnable(){
			public void run(){
				log.info("重新建立与"+host+":"+port+"的连接");
				//清理工作,中断计时器,中断接收线程,恢复初始变量
				heartTimer.cancel();
				isLaunchHeartcheck=false;
				isNetworkConnect = false;
				receiveThread.interrupt();
				try {
					socket.close();
				} catch (IOException e1) {log.error("重连时,关闭socket连接发生IO流异常",e1);}
				//----------------
				synchronized(this){
					for(; ;){
						try {
							Thread.currentThread();
							Thread.sleep(1000 * 1);
							init(host,port);
							this.notifyAll();
							break ;
						} catch (IOException e) {
							log.error("重新建立连接未成功",e);
						} catch (InterruptedException e){
							log.error("重连线程中断",e);
						}
					}
				}
			}
		}).start();
	}
	/**
	 * 发送命令并接受响应
	 * @param requestMsg
	 * @return
	 * @throws SocketTimeoutException
	 * @throws IOException
	 */
	public String readReqMsg(String requestMsg) throws IOException {
		if(requestMsg ==null) {
			return null;
		}
		if(!isNetworkConnect) {
			synchronized(this){
				try {
					this.wait(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常
					if(!isNetworkConnect) {
						throw new IOException("网络连接中断!");
					}
				} catch (InterruptedException e) {
					log.error("发送线程中断",e);
				}
			}
		}
		String msgNo = requestMsg.substring(8, 8 + 24);//读取流水号
		outStream = socket.getOutputStream();
		outStream.write(requestMsg.getBytes());
		outStream.flush();
		Condition msglock = lock.newCondition(); //消息锁
		//注册等待接收消息
		recMsgMap.put(msgNo, msglock);
		try {
			lock.lock();
			msglock.await(timeout,TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			log.error("发送线程中断",e);
		} finally {
			lock.unlock();
		}
		Object respMsg = recMsgMap.remove(msgNo); //响应信息
		if(respMsg!=null &&(respMsg != msglock)) {
			//已经接收到消息,注销等待,成功返回消息
			return (String) respMsg;
		} else {
			log.error(msgNo+" 超时,未收到响应消息");
			throw new SocketTimeoutException(msgNo+" 超时,未收到响应消息");
		}
	}
	
	public void finalize() {
		if (socket != null) {
			try {
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	//消息接收线程
	private class ReceiveWorker implements Runnable {
		String intStr= null;
		public void run() {
			while(!Thread.interrupted()){
				try {
					byte[] headBytes = new byte[4];
					if(inStream.read(headBytes)==-1){
						log.warn("读到流未尾,对方已关闭流!");
						reConnectToCTCC();//读到流未尾,对方已关闭流
						return;
					}
					byte[] tmp  =new byte[4];
					tmp = headBytes;
					String tempStr = new String(tmp).trim();
					if(tempStr==null || tempStr.equals("")) {
						log.error("received message is null");
						continue;
					}
					intStr = new String(tmp);
					int totalLength =Integer.parseInt(intStr);
					//----------------
					byte[] msgBytes = new byte[totalLength-4];
					inStream.read(msgBytes);
					String resultMsg = new String(headBytes)+ new String(msgBytes);
					//抽出消息ID
					String msgNo = resultMsg.substring(8, 8 + 24);
					Condition msglock =(Condition) recMsgMap.get(msgNo);
					if(msglock ==null) {
						log.warn(msgNo+"序号可能已被注销!响应消息丢弃");
						recMsgMap.remove(msgNo);
						continue;
					}
					recMsgMap.put(msgNo, resultMsg);
					try{
						lock.lock();
						msglock.signalAll();
					}finally {
						lock.unlock();
					}
				}catch(SocketException e){
					log.error("服务端关闭socket",e);
					reConnectToCTCC();
				} catch(IOException e) {
					log.error("接收线程读取响应数据时发生IO流异常",e);
				} catch(NumberFormatException e){
					log.error("收到没良心包,String转int异常,异常字符:"+intStr);
				} 
			}
		}
	}
}

 

使用收发线程,一个发一个收,能提高socket通信的效率,不过有隐患:

    比如在高并发环境下,每秒有100个线程调用 readReqMsg(String requestMsg)方法,也就是每秒会有100个需要发送的命令,由于网络传输、服务端处理命令等因素,接收线程每秒只能接收80个响应,将已返回命令的流水号从recMsgMap移除,但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机。

   怎么预防这个隐患,还没有找到合适的解决办法,还请有经验的朋友指点一二。

4
0
分享到:
评论
5 楼 caolongaaron 2011-11-02  
 
4 楼 caolongaaron 2011-11-02  
[b][/b]dsadsa
3 楼 lyy3323 2010-04-05  
对于发送消息而言,把处理完的命令仍到队列中,单独开一个线程读取队列中的值,然后write出去。
2 楼 sunnylocus 2010-03-08  
lyy3323 写道
大概看了下你的代码。感觉写的还是有点乱。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。

接受队列----类似操作。

不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。


recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。


另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。

你是说将准备发送的信息放在一个队列中,然后让消费者线程去取队列中的信息么?
1 楼 lyy3323 2010-03-08  
大概看了下你的代码。感觉写的还是有点乱。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。

接受队列----类似操作。

不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。


recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。


另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。

相关推荐

    C++多线程SOCKET收发

    在C++编程中,多线程SOCKET收发是一项重要的技术,它允许程序同时处理多个网络连接,提高系统的并发性能。下面将详细讲解这个主题,包括C++中的多线程概念、SOCKET基础以及如何结合两者实现数据的收发。 首先,让...

    基于TCP/IP的Socket多线程通信(服务器和客户端)

    6. **多线程通信中的同步与互斥**:为了防止线程间的资源竞争,需要使用锁(如互斥量mutex)或条件变量来实现同步。例如,当多个线程同时访问共享资源时,必须保证一次只有一个线程访问,以避免数据冲突。 7. **...

    C++基于socket的多线程聊天程序

    这可能涉及到线程同步机制,如互斥量(mutex)、信号量或事件对象,以避免资源竞争和数据不一致。 4. **网络协议**:TCP/IP协议族是互联网的基础,其中TCP(传输控制协议)负责建立和维护可靠的连接,IP(网际协议...

    linux下基于socket的多线程编程

    在Linux操作系统中,基于Socket的多线程编程是网络编程中的一个重要领域,它结合了Socket通信和多线程技术,可以实现高效的并发服务。Socket是网络通信的基本接口,TCP(Transmission Control Protocol)协议则提供...

    Windows Socket编程 多线程

    线程的创建通常使用`CreateThread()`函数,而线程间的同步机制如互斥量(Mutex)、信号量(Semaphore)或事件(Event)则用于防止资源竞争,确保数据一致性。 多线程Socket编程需要注意几个关键点: 1. **线程安全...

    TCP-接收线程和发送线程

    在实际的C/C++代码中,可以看到如何使用socket API与线程API结合,实现高效、可靠的TCP通信。这个项目对于理解和实践网络编程以及多线程编程技巧非常有帮助,可以加深对TCP协议及多线程模型的理解。通过分析和学习此...

    socket实现多线程

    在IT行业中,网络编程是不可或缺的一部分,而Socket编程则是实现客户端与服务器通信的基础。本教程将探讨如何在Java中利用Socket实现多线程,以提高应用程序的并发性能。我们将主要关注`SubThread.java`、`Client....

    vc socket编程,多线程聊天,包括客户端以及服务器端

    总的来说,这个项目涵盖了Socket编程中的基础概念和高级特性,如多线程、网络通信、错误处理等,对于理解和掌握网络编程具有很高的实践价值。通过深入学习和实践,开发者能够运用这些知识开发更复杂、更高效的网络...

    简单网络通信程序例子(多线程)

    不过,多线程也带来了同步和互斥的问题,如数据竞争。为了解决这些问题,可以使用锁(Lock)、条件变量(Condition)等同步机制。在Python中,`threading.Lock()`和`threading.Condition()`等工具可以帮助我们确保...

    一个linux下的socket线程池实现

    总之,"一个Linux下的socket线程池实现"是一个综合性的网络编程项目,涵盖了socket通信、多线程管理和资源调度等多个重要知识点。对于想要深入学习Linux网络编程和并发控制的开发者来说,这是一个极好的实践案例。

    TCP协议的客户端和服务器端多线程阻塞模式的简单例子

    线程间的同步机制,如互斥锁或者条件变量,可能需要用来确保对共享资源(如socket对象)的安全访问。 服务器端则更复杂一些,通常需要监听多个客户端连接。服务器启动后,主线程会监听特定端口,当有新的客户端连接...

    基于socket的聊天室

    在IT领域,网络通信是不可或缺的一部分,而Socket编程则是实现网络通信的一种基础方式。本项目“基于socket的聊天室”就是一个很好的学习实例,它涵盖了客户端和服务端的基本设计和实现,适用于初学者掌握网络编程的...

    多线程服务器的几种常用模型

    例如,一个线程专门用于监听新的客户端连接,一旦接收到新的连接,则创建一个新的线程来处理后续的数据收发。这种方式能够有效地分散处理负载。 - **线程池**:线程池是一种更为高效的线程管理机制,它预先创建一定...

    多线程文件传输系统的实现

    综上所述,"多线程文件传输系统"的实现涉及到多线程编程、文件I/O操作、网络Socket通信和可能的MFC应用,这些都是计算机科学尤其是软件工程中的基础但重要的知识点。通过学习和实践这样的系统,开发者可以深入理解...

    TCP实现多客户端聊天-Socket编程

    这可能涉及到线程同步技术,例如互斥锁或事件对象,以保证并发安全。 此外,为了识别不同的客户端,每个连接的Socket都会有自己的IP和端口号,这些信息可以在接收连接时获取。客户端的标识可以通过在数据包中包含...

    java多线程QQ聊天

    线程管理是关键,包括同步、互斥以及避免死锁,以确保数据安全和程序的正确运行。 2. **Socket编程**:Java的Socket类和ServerSocket类用于实现客户端-服务器通信。ServerSocket在服务器端监听特定端口,等待客户端...

    基于TCP下多线程WINSOCK编程

    4. **数据收发**:每个线程负责与一个客户端进行通信。服务器端通过`send()`和`recv()`函数来完成数据的发送和接收。为了保证数据传输的正确性和完整性,还需要考虑错误处理机制,如超时重传、数据校验等。 #### ...

    C++网络编程详解

    3. **并发与同步**:讨论多线程和多进程编程,以及ACE提供的线程池、信号量、互斥量和条件变量等同步机制。 4. **事件驱动编程**:介绍ACE的Reactor模式,用于处理并发事件,如I/O完成、定时器触发和信号处理。 5....

    TCP读写双线程程序

    4. **线程同步**:由于双线程操作同一资源(套接字),为了防止数据竞争和死锁,必须使用线程同步机制,如互斥锁(Mutex)、信号量(Semaphore)或条件变量(Condition Variable)。这些机制确保在同一时间只有一个...

    多线程 TCIIP 侦听器(VB.NET)

    5. **线程同步与互斥**:在多线程环境下,线程同步和互斥是避免数据竞争和保证数据一致性的关键。VB.NET中的Monitor、Mutex、Semaphore、Lock等机制可以用来实现这些功能,确保线程安全地访问共享资源。 6. **事件...

Global site tag (gtag.js) - Google Analytics