- 浏览: 875765 次
- 性别:
- 来自: 美国图森
最新评论
-
jnjeC:
jake_12345 写道大哥,这写错了吧Class.isAs ...
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别 -
lgh1992314:
https://my.oschina.net/xianggao ...
Servlet生命周期 -
qq412796770:
大哥,百度第一条就是你的,好歹也修改一下吧
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别 -
技术无涯苦作舟:
大哥,百度第一条就是你的,好歹也修改一下吧
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别 -
lgh1992314:
大哥,百度第一条就是你的,好歹也修改一下吧
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别
有做过通信程序或着短信接入程序的程序员都知道,与之通信的每条命令都由消息头和消息尾构成,消息头一般包括整个消息体的长度、流水号、命令类型等信息,客户端向服务端发送一个请求,服务端返回一个响应,请求的流水号和返回的流水号为一一对应关系。如图:
一般我们做法是写一个同步的方法用于发送命令和接收命令,如
public synchronized String recMsg(String reqMsg) { //TODO:发送消息 ..... //TODO:接收消息 return 收到的消息 }
这样做虽然能满足要求,但是效率不高,因为每发送一次命令,需要等到命令成功响应后才能继续发送下一条命令。用收发线程来实现下(直接从项目copy的代码):
package com.bill99.svr; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; /** *<p>title: socket通信包装类</p> *<p>Description: </p> *<p>CopyRight: CopyRight (c) 2009</p> *<p>Company: 99bill.com</p> *<p>Create date: 2009-10-14</P> *author sunnylocus * v0.10 2009-10-14 初类 * v0.11 2009-11-12 对命令收发逻辑及收发线程互斥机制进行了优化,处理命令速度由原来8~16个/秒提高到25~32个/秒 */ public class SocketConnection { private volatile Socket socket; private int timeout = 1000*10; //超时时间,初始值10秒 private boolean isLaunchHeartcheck = false;//是否已启动心跳检测 private boolean isNetworkConnect = false; //网络是否已连接 private static String host = ""; private static int port; static InputStream inStream = null; static OutputStream outStream = null; private static Logger log =Logger.getLogger(SocketConnection.class); private static SocketConnection socketConnection = null; private static java.util.Timer heartTimer=null; //------------------------------------------- //private final Map<String, Object> recMsgMap= Collections.synchronizedMap(new HashMap<String, Object>()); private final ConcurrentHashMap<String, Object> recMsgMap = new ConcurrentHashMap<String, Object>(); private static Thread receiveThread = null; private final ReentrantLock lock = new ReentrantLock(); private SocketConnection(){ Properties conf = new Properties(); try { conf.load(SocketConnection.class.getResourceAsStream("test.conf")); this.timeout = Integer.valueOf(conf.getProperty("timeout")); init(conf.getProperty("ip"),Integer.valueOf(conf.getProperty("port"))); } catch(IOException e) { log.fatal("socket初始化异常!",e); throw new RuntimeException("socket初始化异常,请检查配置参数"); } } /** * 单态模式 */ public static SocketConnection getInstance() { if(socketConnection==null) { synchronized(SocketConnection.class) { if(socketConnection==null) { socketConnection = new SocketConnection(); return socketConnection; } } } return socketConnection; } private void init(String host,int port) throws IOException { InetSocketAddress addr = new InetSocketAddress(host,port); socket = new Socket(); synchronized (this) { log.info("【准备与"+addr+"建立连接】"); socket.connect(addr, timeout); log.info("【与"+addr+"连接已建立】"); inStream = socket.getInputStream(); outStream = socket.getOutputStream(); socket.setTcpNoDelay(true);//数据不作缓冲,立即发送 socket.setSoLinger(true, 0);//socket关闭时,立即释放资源 socket.setKeepAlive(true); socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输 isNetworkConnect=true; receiveThread = new Thread(new ReceiveWorker()); receiveThread.start(); SocketConnection.host=host; SocketConnection.port=port; if(!isLaunchHeartcheck) launchHeartcheck(); } } /** * 心跳包检测 */ private void launchHeartcheck() { if(socket == null) throw new IllegalStateException("socket is not established!"); heartTimer = new Timer(); isLaunchHeartcheck = true; heartTimer.schedule(new TimerTask() { public void run() { String msgStreamNo = StreamNoGenerator.getStreamNo("kq"); int mstType =9999;//999-心跳包请求 SimpleDateFormat dateformate = new SimpleDateFormat("yyyyMMddHHmmss"); String msgDateTime = dateformate.format(new Date()); int msgLength =38;//消息头长度 String commandstr = "00" +msgLength + mstType + msgStreamNo; log.info("心跳检测包 -> IVR "+commandstr); int reconnCounter = 1; while(true) { String responseMsg =null; try { responseMsg = readReqMsg(commandstr); } catch (IOException e) { log.error("IO流异常",e); reconnCounter ++; } if(responseMsg!=null) { log.info("心跳响应包 <- IVR "+responseMsg); reconnCounter = 1; break; } else { reconnCounter ++; } if(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,重新建立连接。连接未被建立时不释放锁 reConnectToCTCC(); break; } } } },1000 * 60*1,1000*60*2); } /** * 重连与目标IP建立重连 */ private void reConnectToCTCC() { new Thread(new Runnable(){ public void run(){ log.info("重新建立与"+host+":"+port+"的连接"); //清理工作,中断计时器,中断接收线程,恢复初始变量 heartTimer.cancel(); isLaunchHeartcheck=false; isNetworkConnect = false; receiveThread.interrupt(); try { socket.close(); } catch (IOException e1) {log.error("重连时,关闭socket连接发生IO流异常",e1);} //---------------- synchronized(this){ for(; ;){ try { Thread.currentThread(); Thread.sleep(1000 * 1); init(host,port); this.notifyAll(); break ; } catch (IOException e) { log.error("重新建立连接未成功",e); } catch (InterruptedException e){ log.error("重连线程中断",e); } } } } }).start(); } /** * 发送命令并接受响应 * @param requestMsg * @return * @throws SocketTimeoutException * @throws IOException */ public String readReqMsg(String requestMsg) throws IOException { if(requestMsg ==null) { return null; } if(!isNetworkConnect) { synchronized(this){ try { this.wait(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常 if(!isNetworkConnect) { throw new IOException("网络连接中断!"); } } catch (InterruptedException e) { log.error("发送线程中断",e); } } } String msgNo = requestMsg.substring(8, 8 + 24);//读取流水号 outStream = socket.getOutputStream(); outStream.write(requestMsg.getBytes()); outStream.flush(); Condition msglock = lock.newCondition(); //消息锁 //注册等待接收消息 recMsgMap.put(msgNo, msglock); try { lock.lock(); msglock.await(timeout,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("发送线程中断",e); } finally { lock.unlock(); } Object respMsg = recMsgMap.remove(msgNo); //响应信息 if(respMsg!=null &&(respMsg != msglock)) { //已经接收到消息,注销等待,成功返回消息 return (String) respMsg; } else { log.error(msgNo+" 超时,未收到响应消息"); throw new SocketTimeoutException(msgNo+" 超时,未收到响应消息"); } } public void finalize() { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } //消息接收线程 private class ReceiveWorker implements Runnable { String intStr= null; public void run() { while(!Thread.interrupted()){ try { byte[] headBytes = new byte[4]; if(inStream.read(headBytes)==-1){ log.warn("读到流未尾,对方已关闭流!"); reConnectToCTCC();//读到流未尾,对方已关闭流 return; } byte[] tmp =new byte[4]; tmp = headBytes; String tempStr = new String(tmp).trim(); if(tempStr==null || tempStr.equals("")) { log.error("received message is null"); continue; } intStr = new String(tmp); int totalLength =Integer.parseInt(intStr); //---------------- byte[] msgBytes = new byte[totalLength-4]; inStream.read(msgBytes); String resultMsg = new String(headBytes)+ new String(msgBytes); //抽出消息ID String msgNo = resultMsg.substring(8, 8 + 24); Condition msglock =(Condition) recMsgMap.get(msgNo); if(msglock ==null) { log.warn(msgNo+"序号可能已被注销!响应消息丢弃"); recMsgMap.remove(msgNo); continue; } recMsgMap.put(msgNo, resultMsg); try{ lock.lock(); msglock.signalAll(); }finally { lock.unlock(); } }catch(SocketException e){ log.error("服务端关闭socket",e); reConnectToCTCC(); } catch(IOException e) { log.error("接收线程读取响应数据时发生IO流异常",e); } catch(NumberFormatException e){ log.error("收到没良心包,String转int异常,异常字符:"+intStr); } } } } }
使用收发线程,一个发一个收,能提高socket通信的效率,不过有隐患:
比如在高并发环境下,每秒有100个线程调用 readReqMsg(String requestMsg)方法,也就是每秒会有100个需要发送的命令,由于网络传输、服务端处理命令等因素,接收线程每秒只能接收80个响应,将已返回命令的流水号从recMsgMap移除,但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机。
怎么预防这个隐患,还没有找到合适的解决办法,还请有经验的朋友指点一二。
评论
5 楼
caolongaaron
2011-11-02
4 楼
caolongaaron
2011-11-02
[b][/b]dsadsa
3 楼
lyy3323
2010-04-05
对于发送消息而言,把处理完的命令仍到队列中,单独开一个线程读取队列中的值,然后write出去。
2 楼
sunnylocus
2010-03-08
lyy3323 写道
大概看了下你的代码。感觉写的还是有点乱。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。
接受队列----类似操作。
不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。
recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。
另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。
接受队列----类似操作。
不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。
recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。
另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。
你是说将准备发送的信息放在一个队列中,然后让消费者线程去取队列中的信息么?
1 楼
lyy3323
2010-03-08
大概看了下你的代码。感觉写的还是有点乱。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。
接受队列----类似操作。
不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。
recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。
另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。
接受队列----类似操作。
不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。
recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。
另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。
发表评论
-
人在江湖:如何用代码保护自己
2011-10-12 16:30 11434现在上一点规模的 ... -
Spring freemarker页面乱码解决
2011-01-13 11:56 7566在开发过程中遇到乱码十分的头痛,如果你在开发过程中也遇 ... -
数据漂白算法研究
2010-12-07 18:05 3830你的手机是不是 ... -
理解使用static import 机制
2010-11-09 08:48 3237J2SE 1.5里引入了“Sta ... -
理解多线程设计模式
2010-11-08 17:43 10547多线程设计模式:1.Single Threaded Execu ... -
理解ThreadLocal
2010-11-03 17:04 1967ThreadLocal是什么 早在JDK 1 ... -
经验总结:高性能的数据同步
2010-11-03 10:03 6457最近在做一个银行的生产数据脱敏系统,今天写代码时遇到 ... -
用JSSE实现网络安全通信
2010-06-25 15:11 3898在网络上信息由源主机到目标主机要经过很多路由和计算机, ... -
Java实时监控日志文件并输出
2010-06-19 17:21 61329最近有一个银行数据漂白系统,要求操作人员在页面调用远端 ... -
Junit测试private方法
2010-04-28 14:09 8077package com.bill99.junit; pu ... -
保护眼睛的豆沙色
2010-03-19 09:46 3602作我们IT这行的,一天要盯着电脑看,时间长了眼睛会感觉发酸 ... -
中国联通短信网关接入程序源代码(SGIP1.2协议)
2010-01-11 12:23 43306自从我发了博文“中国联通SP业务开发总结”后有很多的朋友问 ... -
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别
2009-12-24 13:14 67560原地址:http://topic.csdn.net/t/200 ... -
非阻塞通信
2009-12-03 11:43 4670对于用ServerSocket和Socket写的服务 ... -
处理线程泄露
2009-12-01 15:10 8629当一个单线程化 ... -
在Timer和ScheduledExecutorService间决择
2009-11-27 10:25 13430java.util.Timer计时器有管理任务延迟执行(& ... -
ASCII码对照表
2009-11-12 11:26 2600ASCII表 ASCII值 控制字符 ASC ... -
java.net.SocketException: Software caused connection abort: recv failed 异常分析
2009-11-12 11:01 15826java.net.SocketException: Softw ... -
用State模式减少if..elseif语句
2009-11-03 17:20 7130我们在写程序的过 ... -
HttpURLConnection设置网络超时
2009-10-29 17:30 9513Java中可以使用HttpURLConnection来请 ...
相关推荐
在C++编程中,多线程SOCKET收发是一项重要的技术,它允许程序同时处理多个网络连接,提高系统的并发性能。下面将详细讲解这个主题,包括C++中的多线程概念、SOCKET基础以及如何结合两者实现数据的收发。 首先,让...
6. **多线程通信中的同步与互斥**:为了防止线程间的资源竞争,需要使用锁(如互斥量mutex)或条件变量来实现同步。例如,当多个线程同时访问共享资源时,必须保证一次只有一个线程访问,以避免数据冲突。 7. **...
这可能涉及到线程同步机制,如互斥量(mutex)、信号量或事件对象,以避免资源竞争和数据不一致。 4. **网络协议**:TCP/IP协议族是互联网的基础,其中TCP(传输控制协议)负责建立和维护可靠的连接,IP(网际协议...
在Linux操作系统中,基于Socket的多线程编程是网络编程中的一个重要领域,它结合了Socket通信和多线程技术,可以实现高效的并发服务。Socket是网络通信的基本接口,TCP(Transmission Control Protocol)协议则提供...
线程的创建通常使用`CreateThread()`函数,而线程间的同步机制如互斥量(Mutex)、信号量(Semaphore)或事件(Event)则用于防止资源竞争,确保数据一致性。 多线程Socket编程需要注意几个关键点: 1. **线程安全...
在实际的C/C++代码中,可以看到如何使用socket API与线程API结合,实现高效、可靠的TCP通信。这个项目对于理解和实践网络编程以及多线程编程技巧非常有帮助,可以加深对TCP协议及多线程模型的理解。通过分析和学习此...
在IT行业中,网络编程是不可或缺的一部分,而Socket编程则是实现客户端与服务器通信的基础。本教程将探讨如何在Java中利用Socket实现多线程,以提高应用程序的并发性能。我们将主要关注`SubThread.java`、`Client....
总的来说,这个项目涵盖了Socket编程中的基础概念和高级特性,如多线程、网络通信、错误处理等,对于理解和掌握网络编程具有很高的实践价值。通过深入学习和实践,开发者能够运用这些知识开发更复杂、更高效的网络...
不过,多线程也带来了同步和互斥的问题,如数据竞争。为了解决这些问题,可以使用锁(Lock)、条件变量(Condition)等同步机制。在Python中,`threading.Lock()`和`threading.Condition()`等工具可以帮助我们确保...
总之,"一个Linux下的socket线程池实现"是一个综合性的网络编程项目,涵盖了socket通信、多线程管理和资源调度等多个重要知识点。对于想要深入学习Linux网络编程和并发控制的开发者来说,这是一个极好的实践案例。
线程间的同步机制,如互斥锁或者条件变量,可能需要用来确保对共享资源(如socket对象)的安全访问。 服务器端则更复杂一些,通常需要监听多个客户端连接。服务器启动后,主线程会监听特定端口,当有新的客户端连接...
在IT领域,网络通信是不可或缺的一部分,而Socket编程则是实现网络通信的一种基础方式。本项目“基于socket的聊天室”就是一个很好的学习实例,它涵盖了客户端和服务端的基本设计和实现,适用于初学者掌握网络编程的...
例如,一个线程专门用于监听新的客户端连接,一旦接收到新的连接,则创建一个新的线程来处理后续的数据收发。这种方式能够有效地分散处理负载。 - **线程池**:线程池是一种更为高效的线程管理机制,它预先创建一定...
综上所述,"多线程文件传输系统"的实现涉及到多线程编程、文件I/O操作、网络Socket通信和可能的MFC应用,这些都是计算机科学尤其是软件工程中的基础但重要的知识点。通过学习和实践这样的系统,开发者可以深入理解...
这可能涉及到线程同步技术,例如互斥锁或事件对象,以保证并发安全。 此外,为了识别不同的客户端,每个连接的Socket都会有自己的IP和端口号,这些信息可以在接收连接时获取。客户端的标识可以通过在数据包中包含...
线程管理是关键,包括同步、互斥以及避免死锁,以确保数据安全和程序的正确运行。 2. **Socket编程**:Java的Socket类和ServerSocket类用于实现客户端-服务器通信。ServerSocket在服务器端监听特定端口,等待客户端...
4. **数据收发**:每个线程负责与一个客户端进行通信。服务器端通过`send()`和`recv()`函数来完成数据的发送和接收。为了保证数据传输的正确性和完整性,还需要考虑错误处理机制,如超时重传、数据校验等。 #### ...
3. **并发与同步**:讨论多线程和多进程编程,以及ACE提供的线程池、信号量、互斥量和条件变量等同步机制。 4. **事件驱动编程**:介绍ACE的Reactor模式,用于处理并发事件,如I/O完成、定时器触发和信号处理。 5....
4. **线程同步**:由于双线程操作同一资源(套接字),为了防止数据竞争和死锁,必须使用线程同步机制,如互斥锁(Mutex)、信号量(Semaphore)或条件变量(Condition Variable)。这些机制确保在同一时间只有一个...
5. **线程同步与互斥**:在多线程环境下,线程同步和互斥是避免数据竞争和保证数据一致性的关键。VB.NET中的Monitor、Mutex、Semaphore、Lock等机制可以用来实现这些功能,确保线程安全地访问共享资源。 6. **事件...