- 浏览: 77594 次
- 性别:
- 来自: 上海
最新评论
-
rockythd:
视界这个概念终于搞清楚了,谢谢!
Java关于Scala的“视界(view bound)”的模拟 -
regular:
写了一个更通用的方法:ObjectUtils.cast。目的是 ...
Java关于Scala的“视界(view bound)”的模拟 -
lrztiancai:
谢谢分享。正在找这个!
Parsley+SpiceLib 2.4 Developer Manual -
kraft:
第二版什么时候出啊
Programming In Scala 翻译 -
justjavac:
xpf7622 写道haixu.huang@live @前的名 ...
Programming In Scala 翻译
Socket通信比较常见的问题有如下几种:
1、设置收发超时;
2、正确的每一个bit的收发;
3、物理线路故障的保护;
4、始终能正常工作;
5、尽量少占系统资源;
n、……
而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是:通信。
为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。
本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话……
(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动)
使用演示见以下代码。
大致说明一下,Server端开5656侦听,Client端开若干线程测试Socket通信。每次发送240字节信息+16字节MD5校验。服务端收到信息之后做MD5检查,正确的,发送“.xxxx”表示认可,否则发送“?xxxx”表示故障。
正式应用中可以再设置tryout尝试n次。
Server端,代码演示:
Client端,代码演示:
重点说明:
发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。
1、设置收发超时;
2、正确的每一个bit的收发;
3、物理线路故障的保护;
4、始终能正常工作;
5、尽量少占系统资源;
n、……
而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是:通信。
为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。
本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话……
(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动)
// Copyright (c) 2005 Brian Wellington (bwelling@xbill.org) package asynchronizedchannel; import java.io.*; import java.net.*; import java.nio.*; import java.nio.channels.*; final class TcpChannel { private long endTime; private SelectionKey key; public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException { boolean done = false; Selector selector = null; this.endTime = endTime; try { selector = Selector.open(); channel.configureBlocking(false); key = channel.register(selector, op); done = true; } finally { if (!done && selector != null) { selector.close(); } if (!done) { channel.close(); } } } static void blockUntil(SelectionKey key, long endTime) throws IOException { long timeout = endTime - System.currentTimeMillis(); int nkeys = 0; if (timeout > 0) { nkeys = key.selector().select(timeout); } else if (timeout == 0) { nkeys = key.selector().selectNow(); } if (nkeys == 0) { throw new SocketTimeoutException(); } } void cleanup() { try { key.selector().close(); key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } } void bind(SocketAddress addr) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); channel.socket().bind(addr); } void connect(SocketAddress addr) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT); try { if (!key.isConnectable()) { blockUntil(key, endTime); } if (!channel.connect(addr) && !channel.finishConnect()) { throw new ConnectException(); } } finally { if (key.isValid()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); } } } void send(ByteBuffer buffer) throws IOException { Send.operate(key, buffer, endTime); } void recv(ByteBuffer buffer) throws IOException { Recv.operate(key, buffer, endTime); } } interface Operator { class Operation { static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException { final SocketChannel channel = (SocketChannel) key.channel(); final int total = buffer.capacity(); key.interestOps(op); try { while (buffer.position() < total) { if (System.currentTimeMillis() > endTime) { throw new SocketTimeoutException(); } if ((key.readyOps() & op) != 0) { if (optr.io(channel, buffer) < 0) { throw new EOFException(); } } else { TcpChannel.blockUntil(key, endTime); } } } finally { if (key.isValid()) { key.interestOps(0); } } } } int io(SocketChannel channel, ByteBuffer buffer) throws IOException; } class Send implements Operator { public int io(SocketChannel channel, ByteBuffer buffer) throws IOException { return channel.write(buffer); } public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException { Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator); } public static final Send operator = new Send(); } class Recv implements Operator { public int io(SocketChannel channel, ByteBuffer buffer) throws IOException { return channel.read(buffer); } public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException { Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator); } public static final Recv operator = new Recv(); }
使用演示见以下代码。
大致说明一下,Server端开5656侦听,Client端开若干线程测试Socket通信。每次发送240字节信息+16字节MD5校验。服务端收到信息之后做MD5检查,正确的,发送“.xxxx”表示认可,否则发送“?xxxx”表示故障。
正式应用中可以再设置tryout尝试n次。
Server端,代码演示:
package asynchronizedchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.security.MessageDigest; import java.util.Iterator; public class Server { /** * 服务端通信范例程序主函数 * * @param args * @throws IOException */ public static void main(String[] args) throws IOException { // Create the selector final Selector selector = Selector.open(); final ServerSocketChannel server = ServerSocketChannel.open(); server.configureBlocking(false); server.socket().bind(new InetSocketAddress("xx.xx.xx.xx", 5656), 5); // Register both channels with selector server.register(selector, SelectionKey.OP_ACCEPT); new Thread(new Daemon(selector)).start(); } } class Daemon implements Runnable { private final Selector selector; Daemon(Selector selector) { this.selector = selector; } public void run() { while (true) { try { // Wait for an event selector.select(); // Get list of selection keys with pending events Iterator<SelectionKey> it = selector.selectedKeys().iterator(); // Process each key while (it.hasNext()) { // Get the selection key SelectionKey selKey = it.next(); // Remove it from the list to indicate that it is being processed it.remove(); // Check if it's a connection request if (selKey.isAcceptable()) { // Get channel with connection request ServerSocketChannel server = (ServerSocketChannel) selKey.channel(); // Accept the connection request. // If serverSocketChannel is blocking, this method blocks. // The returned channel is in blocking mode. SocketChannel channel = server.accept(); // If serverSocketChannel is non-blocking, sChannel may be null if (channel != null) { // Use the socket channel to communicate with the client new Thread(new ServerHandler(channel)).start(); } else { System.out.println("---No Connection---"); // There were no pending connection requests; try again later. // To be notified of connection requests, } } } } catch (Exception ex) { ex.printStackTrace(); } } } } class ServerHandler implements Runnable { private static final long timeout = 30 * 1000; // 设置超时时间为30秒 private static int counter = 0; private final TcpChannel channel; private final MessageDigest md; ServerHandler(SocketChannel channel) throws Exception { this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ); md = MessageDigest.getInstance("md5"); } public void run() { try { while (true) { work(); synchronized (ServerHandler.class) { if ((++counter & 65535) == 0) { System.out.println(counter); } } } } catch (Exception e) { e.printStackTrace(); } finally { channel.cleanup(); } } private void work() throws IOException { // 模拟工作流程 byte[] cache = new byte[256], reply = new byte[5]; read(cache, reply); } private void read(byte[] cache, byte[] reply) throws IOException { // 从套接字读入数据 channel.recv(ByteBuffer.wrap(cache)); md.reset(); md.update(cache, 0, 240); byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码 if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节比较 reply[0] = '?'; System.out.println("MISMATCH!"); } else { reply[0] = '.'; } channel.send(ByteBuffer.wrap(reply)); // 返回接收结果 } } final class ExtArrays { private ExtArrays() { } public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len) { // 字节数组的部分比较 if (a == null || b == null) { return false; } if (offset_a + len > a.length || offset_b + len > b.length) { return false; } for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k--) { if (a[i] != b[j]) { return false; } } return true; } }
Client端,代码演示:
package asynchronizedchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.security.DigestException; import java.security.MessageDigest; import java.util.Random; public class Client { private static int id = 0; /** * 客户端通信范例程序主函数 * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); } } class ClientHandler implements Runnable { private static final long timeout = 30 * 1000; // 设置超时时间为30秒 private final TcpChannel channel; private final int id; private final MessageDigest md; private final Random rand; ClientHandler(int id) throws Exception { this.id = id; channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE); md = MessageDigest.getInstance("md5"); rand = new Random(); } @Override public void run() { try { channel.connect(new InetSocketAddress("xx.xx.xx.xx", 5656)); int i = 0; while (true) { work(); if ((++i & 16383) == 0) { System.out.println(String.format("client(%1$d): %2$d", id, i)); } Thread.yield(); } } catch (Exception e) { e.printStackTrace(); } finally { channel.cleanup(); } } private void work() throws IOException, DigestException { byte[] cache = new byte[256], reply = new byte[5]; write(cache, reply); } private void write(byte[] cache, byte[] reply) throws DigestException, IOException { rand.nextBytes(cache); // 只用前面的240字节 md.reset(); md.update(cache, 0, 240); md.digest(cache, 240, 16); // MD5校验码占后面16字节 ByteBuffer buffer = ByteBuffer.wrap(cache); channel.send(buffer); buffer = ByteBuffer.wrap(reply); channel.recv(buffer); if (reply[0] != '.') { // 若接收的结果不正确,可以考虑尝试再次发送 System.out.println("MISMATCH!"); } } }
重点说明:
发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。
评论
1 楼
regular
2010-07-16
事实证明,NIO在不同操作系统下使用不同的连接建立流程。
因此,如果调用Selector.select()方法检测OP_CONNECT事件不能在不同操作系统之间兼容。
具体bug参见:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4960791
要实现跨平台的兼容性,我目前采用的方法是:首先采用阻塞方式连接,之后转为非阻塞方式。
因此,如果调用Selector.select()方法检测OP_CONNECT事件不能在不同操作系统之间兼容。
具体bug参见:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4960791
要实现跨平台的兼容性,我目前采用的方法是:首先采用阻塞方式连接,之后转为非阻塞方式。
发表评论
-
把Spring容器中的bean绑定到通过代码创建的对象
2012-04-26 16:17 2267Spring提供了对配置中创建对象的字段实例注入。但如果是通过 ... -
动态注册消息类型及处理函数
2011-10-01 15:56 992内容略。参见代码演示。 -
代码实例
2011-02-14 17:17 1007代码实例文件 -
如何在类外部调用被子类覆盖的父类方法
2011-01-20 14:46 1888题目比较绕。以下用一个简单的例子说明: public cl ... -
SWT应用的开发实例:没有使用到OSGi
2011-01-14 11:27 1518添加音效,以及中奖名单回看功能。 SWT应用一枚。具体方法见 ... -
运行期代码问题检查技术的研究
2010-11-29 13:30 1137以下用我之前代码中的一个bug作为说明,解释如何实现代码在运行 ... -
代码潜在故障的动态分析
2010-11-16 12:24 1499引子 大家都听说过FindBugs的大名。这是一款静态代码分析 ... -
打算研究学习一下OSGi和Equinox
2010-02-10 11:26 1139看到一本很直接讨论这个题目的书,不过要等到3月1日才出来。 ... -
关键应用服务的集群技术模拟
2010-01-08 14:41 1076集群技术,也就是俗称的Cluster,是为了保证某种应用不间断 ... -
JarSpur 检查引用包归属的小工具
2009-12-25 17:31 1076图形化的界面,允许你导入任意多的在项目中可能需要的Jar包。 ... -
class.getResourceAsStream()与ClassLoader.getResourceAsStream()的区别
2009-11-11 17:33 2045在jar包里获得流形式的资源有两种方法,一个是Class.ge ... -
MultiKeyedMap方案的实现
2009-11-10 11:55 3751方案背景 所谓“MultiKey ... -
Java2D: 硬件加速 - 第二部分 - 缓冲策略:Buffer Strategies
2009-11-02 12:52 2934原文地址:Java2D: Hardware ... -
Java2D: 硬件加速 - 第一部分 - 非恒定图像类:Volatile Image
2009-10-30 16:19 4291原文地址:Java2D: Hareware Accelerat ... -
自建的MiniChart库,目前实现了点图、折线图、柱状图和饼图
2009-07-15 11:08 1228花了大约一个星期时间做的MiniChart库。 由于现在的免费 ... -
BM方案模式匹配的Java代码实现
2009-06-17 13:47 1565速度还算快,例子里比较的文件一共371个,3,293,472字 ... -
对于经典模式匹配算法的一些改动
2009-06-12 12:44 1485从一个很长的字符串(或者数组)中,查找某个子串(模式串)是否存 ... -
读写进程的互斥锁
2009-03-16 15:27 1588以下的代码完成了对某个资源的读写互斥锁,具体说明如下:1. 若 ... -
Object数组到泛型数组转换的伪解决方案
2009-01-23 10:44 4325闲来无事,想要用目前的Java技术模拟一个对象数据库。最初只是 ...
相关推荐
一个 Java SocketChannel 实现,它使用提供的 Proxy 实例通过提供的代理建立网络连接。 SocketChannel 是通过表面下的 Socket 实例实现的。 限制 此实现目前仅支持阻塞模式。 请注意,这是 SocketChannel 实例的默认...
与传统的Socket相比,SocketChannel提供了异步操作的能力,这意味着我们可以在不等待I/O操作完成的情况下继续执行其他代码。通过注册Selector并监听特定事件(如读就绪、写就绪),我们可以管理多个SocketChannel,...
总的来说,`ServerSocketChannel`和`SocketChannel`是Java NIO中处理网络通信的核心工具,它们提供了异步、非阻塞的I/O操作,这对于构建高并发、低延迟的网络服务至关重要。通过熟练掌握这两者,开发者可以设计出更...
`SocketSelectorServer`和`SocketSelectorClient`是源代码文件,它们展示了如何使用SocketChannel和Selector实现服务器端和客户端的通信。服务器端通常会创建一个Selector,然后注册SocketChannel,监听客户端的连接...
首先,`Service.java`可能是服务器端的核心服务类,它利用Java NIO的Selector和Channel来实现异步I/O。Selector是一个多路复用器,它可以监视多个通道的事件(如连接请求、数据到达等)。当某个通道准备好进行读写...
总之,使用Java NIO实现异步连接池,不仅优化了多服务访问的性能瓶颈,还通过非阻塞I/O技术显著提升了系统的并发处理能力和健壮性。在高负载、多服务交互的应用场景中,异步连接池成为了提升系统性能的关键技术。
5. **实现异步通信核心**: 实现一个简单的异步通信核心,首先需要创建一个Selector实例。然后,将感兴趣的通道(如ServerSocketChannel)注册到这个选择器,指定关注的事件(通常为OP_ACCEPT、OP_READ或OP_WRITE)...
在这个案例中,我们将讨论如何使用Socket来实现一个简单的聊天室功能,包括服务器端和客户端的实现。 首先,我们要理解Socket的基本概念。Socket是网络通信中的一个抽象接口,可以看作是两台计算机之间的通信通道。...
通过Java NIO技术实现的异步连接池,有效解决了传统连接池在处理多个后台服务时的性能瓶颈问题,提高了系统的并发处理能力和健壮性。这种方式特别适合于需要与多个服务交互的应用场景,能够显著提升整体应用的性能...
在Java NIO(非阻塞I/O)框架中,SocketChannel、ServerSocketChannel和Selector扮演着关键角色。本文将深入探讨这些组件,并通过一个实际案例——"NServer.java"和"NClient.java"来阐述它们的工作原理和用法。 ...
NIO SSL 与阻塞IO不同,JVM不提供扩展基本套接字通道类的标准SSLSocketChannel和SSLServerSocketChannel类。 相反,必须使用手动编排SSL交换。 该项目提供了和,可以像和一样使用。入门直接下载您可以直接下载并将其...
在Java、C++等语言中,通常使用事件驱动或回调函数来实现异步Socket通信。这种方式在处理大量并发连接时特别有效,因为它避免了线程阻塞,减少了系统资源的消耗。 在实现"socket异步传文件和信息传输"的过程中,...
本文将深入探讨如何利用异步I/O进行socket通信,重点关注在Java中如何实现这一机制,同时会涉及到socket通信的抽象、字符串通信以及对象的序列化与反序列化。 首先,我们要理解什么是异步I/O。传统的同步I/O操作会...
在实际的Java异步Socket通信应用中,程序员需要定义如何管理不同的连接和它们的状态,例如接受新的连接请求时,需要为每个新的SocketChannel创建新的线程,或者使用事件驱动模型来管理所有活动的SocketChannel。...
通过阅读和理解`MultiPortEcho.java`的源码,你可以更深入地了解NIO在多端口服务器中的实际应用,以及如何通过Selector实现异步网络通信。 **工具** 在实际开发中,除了Java标准库外,还有一些优秀的第三方库,如...
这个"LongSocketDemo"应该包含了实现异步长连接的基本步骤,你可以根据这个示例进一步学习和优化,以适应你的具体需求。记得在实际项目中,还要考虑安全性、性能优化以及代码的可维护性等因素。
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。整合Spring Boot与Netty,可以利用Spring的便捷性和Netty的高效性来构建实时通信系统,例如基于Socket的服务...
例如,读取文件、接收网络数据等操作都会触发事件,当这些事件发生时,回调函数会被调用,从而实现异步处理。 在源码层面,理解同步和异步IO的关键在于了解底层系统调用,如Linux中的read、write、epoll等。在Java...
在标题中提到的"JAVA nio异步长连接服务端与客户端",我们可以理解为使用Java NIO实现的TCP长连接通信。TCP长连接是指在客户端和服务端之间保持一个持久的连接,可以多次收发数据,而不必每次通信都建立新的连接。这...