0 0

线程池实现socket 通信问题,不能长时间运行。3

1.创建一个监听

import java.io.IOException;
import java.net.*;
import java.util.concurrent.Future;

public class Listener extends ServerSocket implements Runnable {

	public Listener() throws IOException {
		super(Server.AppConfig.getLocalhost().getListenport());
	}

	@Override
	public void run() {
		while (true) {
			try {
				Socket socket = accept();
				CreateServer server = new CreateServer(socket, Server.pool);
				Future<Integer> result = Server.pool.submit(server);				
				Server.Results.add(result);
			} catch (Exception e) {
				Server.createMessage("Listener:"+e.getMessage());
			} finally {
			}
		}

	}
}


2. 创建一个解析socket的 服务

import java.net.Socket;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.io.*;

import com.szcust.einfo.receiveEntity.serverEntity.DataText;

public class CreateServer implements  Callable<Integer> {
	private Socket client;
	private Scanner in;
	private PrintWriter out;
	private Resolve resolve;
	private int timeOut = 30;
	private Date lastTime;

	public CreateServer(Socket s, ExecutorService pool) {
		client = s;
		lastTime = new Date();
		resolve = new Resolve();
		try {
			client.setSoTimeout(30 * 60 * 1000);
			Server.ClientCount = Server.ClientCount + 1;
			in = new Scanner(client.getInputStream(), "GB2312");

			// in = new BufferedReader(new
			// InputStreamReader(client.getInputStream(), "GB2312"));
			out = new PrintWriter(client.getOutputStream(), true);
			out
					.println("--- Welcome To Universtar Science & Technology Softwear System ---");

		} catch (Exception ex) {
			Server.createMessage("Ex " + ex.getMessage());
		}
	}

	@Override
	public Integer call() {
		String line = "";
		while ((line = in.next()) != null) {
			try {
				if (check(line)) {
					DataText dataText = resolve.getDataTextBySoketString(line);
					if (dataText != null) {
						// Server.Data_Array.add(dataText);
						resolve.saveDataRun(dataText);//业务代码
					} else {
						Server.createMessage("Resolve error " + line);
					}
				} else {
					Server.createMessage("Check error " + line);
				}

			} catch (Exception ex) {
				Server.createMessage("Ex " + ex.getMessage());
				closeSocket(this.client	);
			}
		}
		Server.ClientCount--;
		return Server.ClientCount;
	}


3. 静态变量,用于保存各个线程之间的数据。

import java.util.Calendar;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.szcust.einfo.receiveBiz.FactorBiz;
import com.szcust.einfo.receiveBiz.StationBiz;
import com.szcust.einfo.receiveEntity.einfoEntity.Station;
import com.szcust.einfo.receiveEntity.serverEntity.DataText;
import com.szcust.einfo.receiveEntity.configEntity.Config;
import com.szcust.einfo.receiveEntity.configEntity.ExLog;

public class Server{


	public static  ConcurrentLinkedQueue<DataText> Data_Array = null;
	public static Vector<Station> Client_Stations = null;
	public static Vector<Station> Server_Stations = null;
	private static ConcurrentLinkedQueue<String> Message = null;
	public static ConcurrentLinkedQueue<String> ErrorText = null;
	public static ConcurrentLinkedQueue<Future<Integer>> Results = null;

	public static ExecutorService pool = Executors.newCachedThreadPool();
	public static Config AppConfig = null;
	public static int ClientCount = 0;
	public static boolean IsClear = true;

	public static void  init() {
		Data_Array =new ConcurrentLinkedQueue<DataText>();
		Client_Stations = new Vector<Station>();
		Server_Stations = new Vector<Station>();
		Message = new ConcurrentLinkedQueue<String>();
		ErrorText = new ConcurrentLinkedQueue<String>();
		Results = new ConcurrentLinkedQueue<Future<Integer>>();
		AppConfig = new Config();
		ClientCount = 0;
		IsClear = true;
		loadData();
	}

问题补充:
dwangel 写道
Blocking Socket不适合用线程池处理通讯。
单个线程会被挂起的。
用NIO才合适。

我这个是个长连接,还有心跳包不断的发上来。

我对那个NIO 不是很熟,但是我试用到时候 好像有新的连接时要断开原有的连接,不能长连接。

期待解决!!

问题补充:
dwangel 写道
Blocking Socket不适合用线程池处理通讯。
单个线程会被挂起的。
用NIO才合适。

我这个是个长连接,还有心跳包不断的发上来。

我对那个NIO 不是很熟,但是我试用到时候 好像有新的连接时要断开原有的连接,不能长连接。

期待解决!!
yx200404 写道
个人觉得没必要在accept后submit一个Callable

直接execute一个Runnable

同步好ClientCount就可以了吧


我仔细检查并写日志调试了,上面的好像没有问题。但是问题归于:

package com.szcust.einfo.receive;

import java.util.ArrayList;

import com.szcust.einfo.receiveBiz.BaseDataBiz;
import com.szcust.einfo.receiveEntity.serverEntity.BaseData;

public class DataSave extends Thread {
	private Resolve resolve;
	private BaseDataBiz baseDataBiz;

	public DataSave() {
		resolve = new Resolve();
		baseDataBiz = new BaseDataBiz();
	}

	@Override

	public void run() {
		while (true) {
			try {
				if (dataArrayCheck(Server.Data_Array)) {
					resolve.saveDataRun(Server.Data_Array.poll());
				} else {
					this.setPriority(MAX_PRIORITY);
					Thread.sleep(10 * 1000);
				}
			} catch (Exception ex) {
				Server.createMessage("DataSave: " + ex.getMessage());
			}
		}
	}


一直打印空错误:DataSave:null

问题补充:
dwangel 写道
你的项目可以用外部库吗?

你可以看看Netty这个NIO库。(或者apache mina,netty是mina的创始者写的,我用起来蛮方便的。)

NIO是基于SELECT模型的。就是注册一批关注对象socketChannel,
然后重复调用Select方法,当SocketChannel处于指定状态,比如 有数据,可写
等状态,再拿出来处理。
这样,可以分成主线程持续select,select出来需要处理的socketchannel放到一个队列,由threadpool里分线程取出来 处理。

因为socketchannel操作不会阻塞,所以可以充分利用多线程。

而你的代码用的是普通socket,普通socket的特性就是在read时,如果没有数据,则会阻塞住。而如果想利用线程池,应当是read没有数据时,继续往下走,把当前线程资源还回线程池。


我开始用了 那个mina 但是我发现我的连接有很多,他开了很多线程。但是我不知道怎么及时的结束这些线程。

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;

public class Service implements Runnable {

	private IoAcceptor acceptor;
	private IoAcceptorConfig config;

	public Service() {
		acceptor = new SocketAcceptor();
		config = new SocketAcceptorConfig();
		DefaultIoFilterChainBuilder chain = config.getFilterChain();
		// 使用字符串编码
		chain.addLast("codec", new ProtocolCodecFilter(
				new TextLineCodecFactory(Charset.forName( "UTF-8" ))));

	}

	public void run() {
		try{
		acceptor.bind(new InetSocketAddress(Server.AppConfig.getLocalhost()
				.getListenport()), new MinaHandler(), config);
		}catch (Exception e) {
			Server.createMessage("Server Ex: "+e.getMessage());
		}
	}

}



import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;

import com.szcust.einfo.receiveEntity.serverEntity.DataText;


public class MinaHandler  extends IoHandlerAdapter {
	/**
	 * 当有异常发生时触发
	 */
	public void exceptionCaught(IoSession ssn, Throwable cause) {
		//cause.printStackTrace();
		Server.createMessage("Handler Ex : " + cause.getMessage());
		ssn.getCloseFuture().getSession().close();
		ssn.close();
	}

	/**
	 * 有新连接时触发
	 */
	@Override
	public void sessionOpened(IoSession ssn) throws Exception {
		ssn.write("--- Welcome To Universtar Science & Technology Softwear System ---");
		//System.out.println("session open for " + ssn.getRemoteAddress());
	}

	/**
	 * 连接被关闭时触发
	 */
	@Override
	public void sessionClosed(IoSession ssn) throws Exception {
		ssn.close();
		//System.out.println("session closed from " + ssn.getRemoteAddress());		
	}

	/**
	 * 收到来自客户端的消息
	 */
	public void messageReceived(IoSession ssn, Object msg) throws Exception {
		//String ip = ssn.getRemoteAddress().toString();		
		//ssn.write("Hello " + msg);
		Resolve resolve = new Resolve();
		DataText dataText = resolve.getDataTextBySoketString(msg.toString());
		if (dataText != null) {
			// Server.Data_Array.add(dataText);
			resolve.saveDataRun(dataText);
		} else {
			Server.createMessage("Resolve error : " + msg);
		}
	}
}



问题补充:我对这个方面是第一次接触,期待解决。

问题补充:
dwangel 写道
不用你主动结束那些线程,那些线程被应该会被设为 daemon=true,即主线程结束时自动结束。平时,等待 主监听线程传需要处理的任务进来,如果没有,则自己wait。

那些就是线程池里的线程。


我的是长连接,现在是每读取一行字符串,他都生成一个线程。
2012年5月08日 17:54

5个答案 按时间排序 按投票排序

0 0

采纳的答案

哦,我不是很清楚 MINA的实现,Netty里,是用线程池模型的,可以控制生成线程的上限。

Mina里如果找不到控制线程数的地方,我建议还是用Netty吧。
至少Netty算是公司在维护 

2012年5月16日 11:28
0 0

不用你主动结束那些线程,那些线程被应该会被设为 daemon=true,即主线程结束时自动结束。平时,等待 主监听线程传需要处理的任务进来,如果没有,则自己wait。

那些就是线程池里的线程。

2012年5月15日 18:32
0 0

你的项目可以用外部库吗?

你可以看看Netty这个NIO库。(或者apache mina,netty是mina的创始者写的,我用起来蛮方便的。)

NIO是基于SELECT模型的。就是注册一批关注对象socketChannel,
然后重复调用Select方法,当SocketChannel处于指定状态,比如 有数据,可写
等状态,再拿出来处理。
这样,可以分成主线程持续select,select出来需要处理的socketchannel放到一个队列,由threadpool里分线程取出来 处理。

因为socketchannel操作不会阻塞,所以可以充分利用多线程。

而你的代码用的是普通socket,普通socket的特性就是在read时,如果没有数据,则会阻塞住。而如果想利用线程池,应当是read没有数据时,继续往下走,把当前线程资源还回线程池。

2012年5月15日 13:00
0 0

个人觉得没必要在accept后submit一个Callable

直接execute一个Runnable

同步好ClientCount就可以了吧

2012年5月11日 17:44
0 0

Blocking Socket不适合用线程池处理通讯。
单个线程会被挂起的。
用NIO才合适。

2012年5月09日 17:59

相关推荐

    socket 线程池实现(已经在项目中应用)

    在Java编程中,Socket通信是网络编程的基础,它允许两台计算机通过TCP/IP协议进行双向通信。线程池是多线程编程中的一个重要概念,它能够有效地管理并发任务,提高系统资源的利用率。本篇文章将深入讲解如何在Socket...

    简单实用,线程池+socket收发数据+解析字节格式报文

    在IT行业中,网络通信是不可或缺的一部分,而Socket编程和线程池则是实现高效网络通信的两个关键工具。本文将深入探讨如何结合线程池和Socket实现数据的收发,并解析字节格式的报文。 首先,让我们理解什么是Socket...

    Java实现Socket长连接和短连接

    综上所述,Java实现Socket长连接和短连接涉及网络通信基础、连接管理、异常处理等多个方面,开发者需要根据实际需求权衡选择合适的方式。通过深入理解这些概念和技术,可以有效地优化网络服务,提升应用的性能和用户...

    Android-Socket长连接通信心跳包消息回调Java服务端

    为了保持长连接,需要处理好网络异常、应用后台运行限制等问题。例如,当设备进入休眠模式时,可以使用Service或者WorkManager等手段确保Socket连接不被关闭。 2. **心跳包机制**: 长连接可能会遇到网络不稳定或...

    单线程与多线程socket通信

    本项目涵盖了单线程和多线程两种方式的Socket通信实现,帮助开发者理解这两种模式的差异和应用场景。 首先,我们来详细探讨单线程Socket通信。在单线程模型中,服务器端只有一个线程处理所有客户端的连接请求。当一...

    Android应用源码安卓与PC的Socket通信项目java版_串口通讯.zip

    本项目就是这样一个示例,它演示了如何使用Java实现Android与PC之间的Socket通信,实现串口通讯功能。Socket通信是网络编程中常见的一种方式,允许两台设备通过TCP/IP协议进行双向数据传输。 在Android端,你需要...

    C# socket通信的服务器和客户端

    在IT领域,网络通信是不可或缺的一部分,而Socket编程则是实现跨网络通信的一种基本技术。本文将深入探讨如何在C#环境中使用Socket进行服务器和客户端的通信。C#提供了丰富的类库,使得Socket编程变得相对简单,我们...

    Androidsocket通信加长连接(有心跳检测)

    在Android应用开发中,Socket通信是一种常见的数据传输方式,尤其在需要实现实时通信或长连接的场景下,如即时聊天、在线游戏等。本文将详细介绍如何在Android中实现带有心跳检测的Socket长连接。 首先,我们需要...

    socket短连接和长连接 多线程的应用

    这种方式减少了连接建立和释放的开销,提高了效率,但对于长时间无数据传输的连接,可能会占用较多的系统资源。 多线程(Multithreading)是并发执行多个任务的技术,特别是在Socket编程中,多线程能有效提高服务端...

    线程池技术在J2ME网络通信中的应用研究.

    综上所述,线程池技术在J2ME网络通信中的应用不仅可以有效解决频繁创建和销毁线程带来的资源消耗问题,还能提高系统的稳定性和响应性。通过合理的实现和优化,线程池技术将成为J2ME应用开发中不可或缺的一部分。

    Android的socket长连接(心跳检测)

    在Android开发中,Socket长连接是一种常见的通信方式,特别是在实时性要求高的应用中,如即时通讯、在线游戏等。为了保持连接的稳定性和检测网络状况,通常会采用心跳检测机制。下面将详细介绍Android中实现Socket长...

    UDP-socket.rar_Socket通信协议_udp 多线程_udp多线程_网络UDP

    为了不让接收线程被数据处理耗时过长而阻塞,我们可以使用单独的线程池来处理这些任务。 4. 错误处理和同步机制:在多线程环境下,必须考虑线程安全问题。例如,使用互斥锁(mutex)确保同一时间只有一个线程访问...

    同步异步多线程SOCKET通讯

    这种方式易于理解和实现,但可能会遇到“线程饥饿”问题,即某些线程可能长时间被阻塞,导致其他线程无法获得足够的CPU时间。 **异步多线程Socket通信** 异步多线程模式结合了异步I/O和多线程的优点。线程不再阻塞...

    smart-socket AIO通信框架 v1.6.6.zip

    5. **稳定性**:经过多次迭代优化,具备良好的健壮性和稳定性,能应对长时间运行的高负载环境。 6. **日志与监控**:可能内置了详细的日志记录和性能监控机制,方便排查问题和性能调优。 "说明.htm"可能是框架的...

    Socket说明文档(英文版)

    - **Socket池**:对于长时间运行的应用程序来说,重复创建和关闭 Socket 连接可能会消耗大量的系统资源。因此,可以采用 Socket 池的方式来管理 Socket 资源,减少连接建立和断开的时间。 ```java public class ...

    JAVA SOCKET实现CMPP2.0

    Java Socket实现CMPP2.0是中国移动互联网短信网关...总的来说,Java Socket实现CMPP2.0涉及到网络通信、协议解析、数据编码解码等多个技术点。理解并掌握这些知识点,能有效帮助开发者构建稳定、高效的短信服务平台。

    socket通讯框架例子

    可以设置超时机制,避免长时间的等待。此外,为了增强系统的健壮性,可以考虑使用心跳机制来检测连接状态,及时发现并恢复断开的连接。 总的来说,"极限Socket传输+大文件+多线程"的示例旨在教会我们如何在C#中构建...

    socket简单实现聊天功能

    在IT行业中,网络通信是不可或缺的一部分,而套接字(Socket)则是实现网络通信的基础工具。本篇文章将探讨如何利用套接字技术在Android Studio环境下简单实现聊天功能。通过理解并实践这一过程,开发者可以更好地...

    socket源码及资料

    Socket编程是计算机网络编程中的重要组成部分,主要用于实现进程间的通信,无论是本地的进程间通信(IPC)还是跨网络的通信。在本压缩包文件中,包含的“socket源码及资料”应该涵盖了关于socket编程的各种实例、源...

    socket长连接 demo

    综上所述,“socket长连接 demo”通过心跳机制实现了稳定可靠的网络连接,保证了多客户端服务的正常运行。通过学习和理解这个示例,开发者可以更好地掌握Socket编程,特别是长连接管理和心跳检测的实现技巧。

Global site tag (gtag.js) - Google Analytics