`
to_zoe_yang
  • 浏览: 143374 次
  • 性别: Icon_minigender_2
  • 来自: 01
社区版块
存档分类
最新评论

非阻塞通信

阅读更多
   最近看孙老师的《Java网络编程精解》,读到非阻塞通信!感觉框架的重要性!服务器端和客户端的开发可以有框架遵循!开始写歌简单的,然后逐渐添加功能!

package nonblock;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

public class MyServer {
	
	private ServerSocketChannel ssc ;
	private Selector selector ;
	private int port = 8000;
	private Charset charset = Charset.forName("GBK");
	public static int num = 1;
	
	public MyServer() throws IOException{
		selector = Selector.open();
		ssc = ServerSocketChannel.open();
		ssc.socket().setReuseAddress(true);
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(port));
		System.out.println("Serve 启动了!");
	}
	
	public void server() throws IOException{
		ssc.register(selector, SelectionKey.OP_ACCEPT);
		while(selector.select()>0){
			Set keySet = selector.selectedKeys();
			Iterator iter = keySet.iterator();
			while(iter.hasNext()){
				SelectionKey key = (SelectionKey)iter.next();
				//一定要remove,否则会一直存在
				iter.remove();
				if(key.isAcceptable()){
					ServerSocketChannel s = (ServerSocketChannel)key.channel();
					SocketChannel sc = (SocketChannel)s.accept();
					System.out.println("服务器端接收到连接...");
					System.out.println("客户端信息"+sc.socket().getLocalAddress()+":"+sc.socket().getPort());
					sc.configureBlocking(false);
					ByteBuffer buffer = ByteBuffer.allocate(1024);
					sc.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, buffer);
					System.out.println("服务器完成注册...");
				}
				if(key.isReadable()){
				 //        receive(key);
				//	System.out.println("服务器端可以读取信息...");
				}
				if(key.isWritable()){
				//	System.out.println("服务器端可以写入信息...");
				}
			}
		}
	}
	
	public void receive(SelectionKey key) throws IOException {
		ByteBuffer buffer = (ByteBuffer)key.attachment();
		SocketChannel socketChannel = (SocketChannel)key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(2);
		socketChannel.read(readBuffer);
		readBuffer.flip();
		
		System.out.println((num++)+"From client:"+charset.decode(readBuffer).toString());
		buffer.limit(readBuffer.capacity());
		buffer.put(readBuffer);
		
	}
	
	public void send(SelectionKey key) throws IOException {
		ByteBuffer buffer = (ByteBuffer)key.attachment();
		String str = charset.decode(buffer).toString();
		
		if(str.indexOf("\r\n")==-1)
			return ;
		String output = str.substring(0, str.indexOf("\r\n"));
		ByteBuffer outBuffer = charset.encode(output);
		SocketChannel socketChannel = (SocketChannel)key.channel();
		while (outBuffer.hasRemaining())
			socketChannel.write(outBuffer);
		
		if (output.equals("bye\r\n")) {
			key.cancel();
			socketChannel.close();
			System.out.println("关闭与客户的连接");
		}
	}
	
	public static void main(String[] arg){
		try {
			new MyServer().server();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}



客户端

package nonblock;
import java.net.*;
import java.nio.channels.*;
import java.nio.*;
import java.io.*;
import java.nio.charset.*;
import java.util.*;

public class MyClient {
	private SocketChannel socketChannel = null;
	private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
	private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
	private Charset charset = Charset.forName("GBK");
	private Selector selector;
	
	public MyClient() throws IOException {
		socketChannel = SocketChannel.open();
		InetAddress ia = InetAddress.getLocalHost();
		InetSocketAddress isa = new InetSocketAddress(ia, 8000);
		socketChannel.connect(isa);
		socketChannel.configureBlocking(false);
		System.out.println("与服务器的连接建立成功");
		selector = Selector.open();
	}
	
	public static void main(String args[]) throws IOException {
		final MyClient client = new MyClient();
//		Thread receiver = new Thread() {
//			public void run() {
//				client.receiveFromUser();
//			}
//		};
//
//		receiver.start();
		client.talk();
	}

	private void talk() throws IOException {
		// TODO Auto-generated method stub
		socketChannel.register(selector, SelectionKey.OP_READ
				| SelectionKey.OP_WRITE);
		while(selector.select()>0){
			Set keySet = selector.selectedKeys();
			Iterator iter = keySet.iterator();
			while(iter.hasNext()){
				SelectionKey key = (SelectionKey)iter.next();
				iter.remove();
				if(key.isWritable()){
				//	send(key);
					System.out.println("客户端可写...");
				}
				if(key.isReadable()){
					System.out.println("客户端可读...");
				}		
			}			
		}
	}
	
	public void send(SelectionKey key) throws IOException {
		
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		String input ;
		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
		
		Charset charset = Charset.forName("GBK");
		
		while((input=br.readLine())!=null){
			ByteBuffer buffer = ByteBuffer.allocate(2);
			ByteBuffer outBuffer = ByteBuffer.allocate(2);
			buffer= charset.encode(input+"\r\n");
		//	outBuffer.put(buffer);
		//	buffer.flip();
			socketChannel.write(buffer);
			buffer.flip();
			System.out.println("I say:"+ charset.decode(buffer).toString());
		}	
	}
}


运行后,服务器端显示:
Serve 启动了!
此时运行客户端,服务器端显示:
服务器端接收到连接...
客户端信息/172.30.0.8:1766
服务器完成注册...
服务器端可以写入信息...
服务器端可以写入信息...
.
.
.
而客户端一直循环显示:
客户端可写...
客户端可写...
客户端可写...
.
.
.

当服务器端运行后,便首先使用register方法,是的当前的ServerSocketChannel可以进行OP_ACCEPT,此时运行客户端,则客户端便SocketChannel使用register方法,是的可以Selector可以对OP-READ和OP_WRITE进行等待并执行,与此同时,服务器发现客户端有链接,激活了Selectot的OP_ACCEPT,便建立连接,使用accept接收了SocketChannel的连接,此后SocketChannel便成了服务器端与客户端操作的通道!此时服务器端和当前的客户端都有一个SocketChannel,而SocketChannel是联通的!而服务器端和客户端此时共同维持一个缓冲区,可以对这个缓冲区进行读写操作。而这个缓冲区就是SocketChannel的register操作时的产生的!
个人理解,如有不对的,望大家指出!
分享到:
评论
2 楼 to_zoe_yang 2011-03-27  
试验了下,都不执行操作的时候,服务器端和客户端都是 可写状态,不可写,
因为此时客户端循环显示:客户可写;服务器端显示:服务器可写

但是如果此时断了客户端,服务器轮流显示:服务器可写和服务器可读

现在我觉得,客户端和服务器端是全双工的,有两个通道!
服务器端一个负责写,对应的客户端一个负责读;
服务器端另一个负责读,对应的客户端一个负责写!
而register时的attachment则是用于自己内部通信的!

想明白了!
呵呵~~
不错
今天有成就!

当然,哪一天如果发现自己这个错误了,希望大家指正啊!
可不敢误导别人啊!
1 楼 to_zoe_yang 2011-03-27  
服务器端
package nonblock;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

public class MyServer {
	
	private ServerSocketChannel ssc ;
	private Selector selector ;
	private int port = 8000;
	private Charset charset = Charset.forName("GBK");
	public static int num = 1;
	
	public MyServer() throws IOException{
		selector = Selector.open();
		ssc = ServerSocketChannel.open();
		ssc.socket().setReuseAddress(true);
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(port));
		System.out.println("Serve 启动了!");
	}
	
	public void server() throws IOException{
		ssc.register(selector, SelectionKey.OP_ACCEPT);
		while(selector.select()>0){
			Set keySet = selector.selectedKeys();
			Iterator iter = keySet.iterator();
			while(iter.hasNext()){
				SelectionKey key = (SelectionKey)iter.next();
				//一定要remove,否则会一直存在
				iter.remove();
				if(key.isAcceptable()){
					ServerSocketChannel s = (ServerSocketChannel)key.channel();
					SocketChannel sc = (SocketChannel)s.accept();
					System.out.println("服务器端接收到连接...");
					System.out.println("客户端信息"+sc.socket().getLocalAddress()+":"+sc.socket().getPort());
					sc.configureBlocking(false);
					ByteBuffer buffer = ByteBuffer.allocate(1024);
					sc.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, buffer);
					System.out.println("服务器完成注册...");
				}
				if(key.isReadable()){
				//	receive(key);
				System.out.println("服务器端可以读取信息...");
				}
				if(key.isWritable()){
				//	send1(key);
					System.out.println("服务器端可以写入信息...");
				}
			}
		}
	}
	
	public void receive(SelectionKey key) throws IOException {
//		ByteBuffer buffer = (ByteBuffer)key.attachment();
//		SocketChannel socketChannel = (SocketChannel)key.channel();
//		ByteBuffer readBuffer = ByteBuffer.allocate(2);
//		socketChannel.read(readBuffer);
//		readBuffer.flip();
//		
//		System.out.println((num++)+"From client:"+charset.decode(readBuffer).toString());
//		buffer.limit(readBuffer.capacity());
//		buffer.put(readBuffer);
		System.out.println("Reading");
		
	}
	
	public void send1(SelectionKey key) throws IOException {
		
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		String input = "Hello,Client22";
		
			ByteBuffer buffer = ByteBuffer.allocate(200);
			
			buffer= charset.encode(input+"\r\n");
		//	outBuffer.put(buffer);
		//	buffer.flip();
			socketChannel.write(buffer);
			buffer.flip();
			System.out.println("I say:"+ charset.decode(buffer).toString()+","+buffer.position()+","+buffer.limit());
		
			buffer.flip();
	}
	
	public void send(SelectionKey key) throws IOException {
		ByteBuffer buffer = (ByteBuffer)key.attachment();
		String str = charset.decode(buffer).toString();
		
		if(str.indexOf("\r\n")==-1)
			return ;
		String output = str.substring(0, str.indexOf("\r\n"));
		ByteBuffer outBuffer = charset.encode(output);
		SocketChannel socketChannel = (SocketChannel)key.channel();
		while (outBuffer.hasRemaining())
			socketChannel.write(outBuffer);
		
		if (output.equals("bye\r\n")) {
			key.cancel();
			socketChannel.close();
			System.out.println("关闭与客户的连接");
		}
	}
	
	public static void main(String[] arg){
		try {
			new MyServer().server();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}



客户端

package nonblock;
import java.net.*;
import java.nio.channels.*;
import java.nio.*;
import java.io.*;
import java.nio.charset.*;
import java.util.*;

public class MyClient {
	private SocketChannel socketChannel = null;
	private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
	private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
	private Charset charset = Charset.forName("GBK");
	private Selector selector;
	
	public MyClient() throws IOException {
		socketChannel = SocketChannel.open();
		InetAddress ia = InetAddress.getLocalHost();
		InetSocketAddress isa = new InetSocketAddress(ia, 8000);
		socketChannel.connect(isa);
		socketChannel.configureBlocking(false);
		System.out.println("与服务器的连接建立成功");
		selector = Selector.open();
	}
	
	public static void main(String args[]) throws IOException {
		final MyClient client = new MyClient();
//		Thread receiver = new Thread() {
//			public void run() {
//				client.receiveFromUser();
//			}
//		};
//
//		receiver.start();
		client.talk();
	}

	private void talk() throws IOException {
		// TODO Auto-generated method stub
		socketChannel.register(selector, SelectionKey.OP_READ
				| SelectionKey.OP_WRITE);
		while(selector.select()>0){
			Set keySet = selector.selectedKeys();
			Iterator iter = keySet.iterator();
			while(iter.hasNext()){
				SelectionKey key = (SelectionKey)iter.next();
				iter.remove();
				if(key.isWritable()){
			//		send(key);
					System.out.println("客户端可写...");
				}
				if(key.isReadable()){
			//		receive(key);
					System.out.println("客户端可读...");
				}		
			}			
		}
	}
	
	public void receive(SelectionKey key) throws IOException {
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(150);
		ByteBuffer buffer = ByteBuffer.allocate(20);
		socketChannel.read(readBuffer);
		readBuffer.flip();
		
		System.out.println(("From server:"+charset.decode(readBuffer).toString()));
		System.out.println("Cap:"+readBuffer.capacity());
		
		readBuffer.flip();
		
		buffer.put(readBuffer);
		System.out.println(("From"+charset.decode(buffer).toString()));
	}
	
	public void send(SelectionKey key) throws IOException {
		
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		String input ;
		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
		
		Charset charset = Charset.forName("GBK");
		
		while((input=br.readLine())!=null){
			ByteBuffer buffer = ByteBuffer.allocate(2);
			ByteBuffer outBuffer = ByteBuffer.allocate(2);
			buffer= charset.encode(input+"\r\n");
		//	outBuffer.put(buffer);
		//	buffer.flip();
			socketChannel.write(buffer);
			buffer.flip();
			System.out.println("I say:"+ charset.decode(buffer).toString());
		}	
	}
}

相关推荐

    阻塞通信和非阻塞通信的区别

    阻塞通信和非阻塞通信的区别 阻塞通信和非阻塞通信是两种不同的通信模式,主要应用于Socket通信中。在Java中,通过使用java.nio包中的类可以实现非阻塞通信。 阻塞通信是指在发送或接收数据时,当前线程将被阻塞,...

    阻塞通信和非阻塞通信

    ### 阻塞通信与非阻塞通信:深入解析与应用 #### 1. 概述 在并行计算领域,尤其是使用MPI(Message Passing Interface)进行编程时,通信模式的选择对于程序性能至关重要。阻塞通信与非阻塞通信是两种基本的通信方式...

    java非阻塞通信研究

    Java非阻塞通信是现代Java开发中的一个重要概念,特别是在高并发和高性能的系统设计中。非阻塞通信允许程序在等待数据就绪时不会被挂起,而是继续执行其他任务,提高了系统的整体效率。这种技术主要应用于网络I/O...

    用Java实现非阻塞通信

    用Java实现非阻塞通信 java.nio包提供了支持非阻塞通信的类,主要包括: ● ServerSocketChannel:ServerSocket的替代类,支持阻塞通信与非阻塞通信。 ● SocketChannel:Socket的替代类,支持阻塞通信与非阻塞通信...

    基于JavaNIO的非阻塞通信的研究与实现

    ### 基于Java NIO的非阻塞通信的研究与实现 #### 摘要 本文探讨了Java NIO(New I/O)框架中的非阻塞通信机制,并对其原理及应用进行了深入研究。NIO是一种现代I/O处理方法,通过引入缓冲区、通道和选择器等新概念...

    基于WSAAsyncSelect非阻塞通信程序设计

    非阻塞通信模式下,当调用 socket 的读写操作时,如果数据未准备好,系统不会挂起线程,而是立即返回一个错误,允许线程继续执行其他任务。这种方式提高了并发性能,因为多个任务可以在单个线程中并行处理。 ### 3....

    阻塞及非阻塞通信

    阻塞与非阻塞通信是计算机网络编程中的两种基本通信方式,主要涉及到Java NIO(Non-blocking Input/Output,非阻塞输入/输出)框架。Java NIO 提供了一种新的方式来处理I/O操作,使得程序在进行读写操作时,不再必须...

    非阻塞通信例子【nonblocking】示例

    非阻塞通信是一种高效、灵活的网络编程方式,它与传统的阻塞通信模式不同,能够显著提高系统的并发处理能力。在Java中,非阻塞通信主要通过NIO(Non-blocking Input/Output,非阻塞输入/输出)实现,这是一种基于...

    JAVA非阻塞通信技术原理研究与实现.pdf

    为此,Java非阻塞通信技术应运而生,它基于Java NIO(New I/O)包的全新设计理念,为服务端的高并发处理提供了新的解决方案。 Java NIO的非阻塞通信机制,与传统阻塞式通信的最大区别在于它采用了事件驱动的方式...

    java网络编程socket非阻塞通信

    通过java网络编程深入理解socket阻塞通信和非阻塞通信的在网络中的应用 源码包每一行都有注释,在代码里面每一个类都有详细的注释来解释这个类的功能这个方法的功能,调用哪一个类的哪一个功能等等。 压缩包包含实验...

    利用Python中SocketServer 实现客户端与服务器间非阻塞通信

    利用SocketServer模块来实现网络客户端与服务器并发连接非阻塞通信。 首先,先了解下SocketServer模块中可供使用的类: BaseServer:包含服务器的核心功能与混合(mix-in)类挂钩;这个类只用于派生,所以不会生成这...

    JavaSocket学习---NIO实现非阻塞的通信

    接下来是`EchoClient.java`,客户端通常采用单线程阻塞I/O模型,但在NIO模式下,也可以实现非阻塞通信。客户端创建一个`SocketChannel`连接到服务器,然后同样注册到`Selector`,设置感兴趣的读事件。客户端发送数据...

    ePump是一个基于I,O事件通知、非阻塞通信、多路复用、多线程等机制开发的事件驱动模型的C语言应用开发框架,利用该框架可以很容易地开发出高性能、大并发连接的服务器程序 .zip

    ePump - 一个事件驱动的多线程 c 框架英文自述文件基于I/O事件通知、非阻塞通信和多线程事件驱动模型的C语言框架帮助您开发高性能、大量并发连接的服务器。ePump是一个基于I/O事件通知、非阻塞通信、多路复用、多...

    ePump是一个基于IO事件通知、非阻塞通信、多路复用、多线.zip

    ePump是一个基于IO事件通知、非阻塞通信、多路复用、多线

    1、C++SOCKET同步阻塞、异步非阻塞通信服务端、客户端代码,支持多个客户端连接 2、断线重连(服务端或客户端没有启动顺

    1、C++SOCKET同步阻塞、异步非阻塞通信服务端、客户端代码,支持多个客户端连接。 2、断线重连(服务端或客户端没有启动顺序要求,先开启的等待另一端连接); 3、服务端支持同时连接多个客户端; 4、阅读代码就...

    send_isend_C语言_MPI阻塞非阻塞_MPI通信模式_非阻塞_nan_

    然而,非阻塞通信引入了复杂性,需要管理异步通信的状态,并确保正确完成(通过调用`MPI_Wait`或`MPI_Test`)。 3. **非阻塞通信的完成和同步** 在使用`MPI_Isend`后,发送进程需要确认消息是否已经被接收。这通常...

    非阻塞java通信代码

    在Java中实现非阻塞通信,主要依赖于Java NIO(Non-blocking Input/Output)库。 1. Java NIO简介: Java NIO是Java 1.4引入的新I/O API,是对传统BIO(Blocking I/O)模型的补充。NIO的核心概念包括通道(Channel...

Global site tag (gtag.js) - Google Analytics