`
liangguanhui
  • 浏览: 112895 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

Java网络编程的那些事儿

阅读更多
我在这里用Java写一个非常简单的网络传输程序。

public class MyClient {

	public static void main(String[] args) throws IOException {
		Socket s = new Socket("127.0.0.1", 9817);
		try {
			OutputStream os = s.getOutputStream();
			byte[] t = new byte[1024 * 1024];
			ByteArrayInputStream bais = new ByteArrayInputStream(t);
			byte[] buf = new byte[1024];
			int readed = 0;
			while ((readed = bais.read(buf)) >= 0) {
				os.write(buf, 0, readed); 
				os.flush();
			}
		} finally {
			s.close();
		}
	}
	
}


例子比较简单,就是把1M的数据内容,发送到远程服务器。

这里我们很自然就会用上一个while循环来发送。在循环体里每次发送1k的数据内容。并且每次write之后还加了个flush,确保发送过去。

这是一个看上去很自然的代码,但就是这么自然简单的代码就隐藏了一些比较诡异的东西。

首先,我们必须要知道os到底是一个什么类?有人说事OutputStream,这是废话。但这是一个abstract类,总会有一个实现类的。我们print一下它的class,发现是java.net.SocketOutputStream。

在这个SocketOutputStream类我们会发现这样两件事情:

(1)flush方法没有被重写
(2)write方法最终会调用socketWrite0这个native方法

第一个问题比较搞笑,SocketOutputStream的父类是FileOutputStream,这个类也没有覆盖flush。而FileOutputStream的父类就是OutputStream,这里的flush方法是空的。于是我们可以得出一个搞笑的结论,在socket里,flush是没用的。

不过发现这个结论一点都不能值得骄傲,而且还算是后知后觉。因为早在2000年的时候就有人提过这个“bug”,大家可以看看下面的回答。



回答得比较清楚:TCP流不是可缓冲的,所以不需要实现这个flush。这里还提到了nagle算法和setTcpNoDelay方法,有兴趣的可以google一下。Nagle算法其实就是通过一些算法把一些小包串起来发送以尽可能减少发送包的数量,优化网络。

第二个问题其实就是关于那个循环write。我们之所以把代码写成这个样子,实际上是因为受到了c Socket的影响,因为在C,我们一般都会把数据尽可能的分成小段发送,例如4096。

但如果我们深入socketWrite0这个native这个方法就会法相一些不一样的东西了。在正常情况下我们是看不到这个方法的源代码,还好我们有OpenJDK。这里我参考的是openjdk-7-ea-src-b133-10_mar_2011,各位可以根据自己喜欢选择。

在OpenJDK有很多Native的代码,这里我选择Windows平台的,大家熟悉嘛。打开openjdk\jdk\src\windows\native\java\net\SocketOutputStream.c,这里的Java_java_net_SocketOutputStream_socketWrite0就是对应的socketWrite0方法。

/*
 * Use stack allocate buffer if possible. For large sizes we allocate
 * an intermediate buffer from the heap (up to a maximum). If heap is
 * unavailable just use our stack buffer.
 */
if (len <= MAX_BUFFER_LEN) {
    bufP = BUF;
    buflen = MAX_BUFFER_LEN;
} else {
    buflen = min(MAX_HEAP_BUFFER_LEN, len);
    bufP = (char *)malloc((size_t)buflen);
    if (bufP == NULL) {
        bufP = BUF;
        buflen = MAX_BUFFER_LEN;
    }
}

while(len > 0) {
    int loff = 0;
    int chunkLen = min(buflen, len);
    int llen = chunkLen;
    int retry = 0;

    (*env)->GetByteArrayRegion(env, data, off, chunkLen, (jbyte *)bufP);

    while(llen > 0) {
        int n = send(fd, bufP + loff, llen, 0);
        if (n > 0) {
            llen -= n;
            loff += n;
            continue;
        }

        /*
         * Due to a bug in Windows Sockets (observed on NT and Windows
         * 2000) it may be necessary to retry the send. The issue is that
         * on blocking sockets send/WSASend is supposed to block if there
         * is insufficient buffer space available. If there are a large
         * number of threads blocked on write due to congestion then it's
         * possile to hit the NT/2000 bug whereby send returns WSAENOBUFS.
         * The workaround we use is to retry the send. If we have a
         * large buffer to send (>2k) then we retry with a maximum of
         * 2k buffer. If we hit the issue with <=2k buffer then we backoff
         * for 1 second and retry again. We repeat this up to a reasonable
         * limit before bailing out and throwing an exception. In load
         * conditions we've observed that the send will succeed after 2-3
         * attempts but this depends on network buffers associated with
         * other sockets draining.
         */
        if (WSAGetLastError() == WSAENOBUFS) {
            if (llen > MAX_BUFFER_LEN) {
                buflen = MAX_BUFFER_LEN;
                chunkLen = MAX_BUFFER_LEN;
                llen = MAX_BUFFER_LEN;
                continue;
            }
            if (retry >= 30) {
                JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                    "No buffer space available - exhausted attempts to queue buffer");
                if (bufP != BUF) {
                    free(bufP);
                }
                return;
            }
            Sleep(1000);
            retry++;
            continue;
        }

        /*
         * Send failed - can be caused by close or write error.
         */
        if (WSAGetLastError() == WSAENOTSOCK) {
            JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "Socket closed");
        } else {
            NET_ThrowCurrent(env, "socket write error");
        }
        if (bufP != BUF) {
            free(bufP);
        }
        return;
    }
    len -= chunkLen;
    off += chunkLen;
}

if (bufP != BUF) {
    free(bufP);
}


实际上大家可能会被一大堆的代码淹没,其实重点就是那两个while循环,哈哈,无语吧,原来Java内部其实已经实现了这个容错机制。这里实际上是每次取一块数据(chunk)出来,然后循环把这块数据发完(调用send函数有可能不能一次性发完)。

So,我们会发现我们一开始的那个while处理是显得如此的多余……

于是,我们发现其实我们的代码可以这样写:

public class MyClient {

	public static void main(String[] args) throws IOException {
		Socket s = new Socket("127.0.0.1", 9817);
		try {
			OutputStream os = s.getOutputStream();
			byte[] t = new byte[1024 * 1024];
			os.write(t);
		} finally {
			s.close();
		}
	}
	
}


干脆明了。

实际上如果我们能足够仔细就会发现一些问题:Socket的send函数是会返回一个int的类型,表示发送了多长数据,而java的socket的write的返回类型是void。两者的区别其实应经能够很好地说明问题了。


如果我们把这个问题在延伸一下,普通文件的写操作,我们应该也会很自然地写出类似前面的循环小数据量写入,或者使用JDK的BufferedOutputStream,实际上情况跟Socket类似,JDK本身已经提供了容错性:

JNIEXPORT void JNICALL Java_java_io_FileOutputStream_writeBytes(JNIEnv *env,
    jobject this, jbyteArray bytes, jint off, jint len, jboolean append)
{
    writeBytes(env, this, bytes, off, len, append, fos_fd);
}

void writeBytes(JNIEnv *env, jobject this, jbyteArray bytes,
           jint off, jint len, jboolean append, jfieldID fid)
{
    jint n;
    char stackBuf[BUF_SIZE];
    char *buf = NULL;
    FD fd;

    if (IS_NULL(bytes)) {
        JNU_ThrowNullPointerException(env, NULL);
        return;
    }

    if (outOfBounds(env, off, len, bytes)) {
        JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);
        return;
    }

    if (len == 0) {
        return;
    } else if (len > BUF_SIZE) {
        buf = malloc(len);
        if (buf == NULL) {
            JNU_ThrowOutOfMemoryError(env, NULL);
            return;
        }
    } else {
        buf = stackBuf;
    }

    (*env)->GetByteArrayRegion(env, bytes, off, len, (jbyte *)buf);

    if (!(*env)->ExceptionOccurred(env)) {
        off = 0;
        while (len > 0) {
            fd = GET_FD(this, fid);
            if (fd == -1) {
                JNU_ThrowIOException(env, "Stream Closed");
                break;
            }
            if (append == JNI_TRUE) {
                n = (jint)IO_Append(fd, buf+off, len);
            } else {
                n = (jint)IO_Write(fd, buf+off, len);
            }
            if (n == JVM_IO_ERR) {
                JNU_ThrowIOExceptionWithLastError(env, "Write error");
                break;
            } else if (n == JVM_IO_INTR) {
                JNU_ThrowByName(env, "java/io/InterruptedIOException", NULL);
                break;
            }
            off += n;
            len -= n;
        }
    }
    if (buf != stackBuf) {
        free(buf);
    }
}

#define IO_Append handleAppend
#define IO_Write handleWrite

JNIEXPORT size_t handleWrite(jlong fd, const void *buf, jint len) {
    return writeInternal(fd, buf, len, JNI_FALSE);
}

static size_t writeInternal(jlong fd, const void *buf, jint len, jboolean append)
{
    BOOL result = 0;
    DWORD written = 0;
    HANDLE h = (HANDLE)fd;
    if (h != INVALID_HANDLE_VALUE) {
        OVERLAPPED ov;
        LPOVERLAPPED lpOv;
        if (append == JNI_TRUE) {
            ov.Offset = (DWORD)0xFFFFFFFF;
            ov.OffsetHigh = (DWORD)0xFFFFFFFF;
            ov.hEvent = NULL;
            lpOv = &ov;
        } else {
            lpOv = NULL;
        }
        result = WriteFile(h,                /* File handle to write */
                           buf,              /* pointers to the buffers */
                           len,              /* number of bytes to write */
                           &written,         /* receives number of bytes written */
                           lpOv);            /* overlapped struct */
    }
    if ((h == INVALID_HANDLE_VALUE) || (result == 0)) {
        return -1;
    }
    return (size_t)written;
}


本来最后想来几句结束语的,不过后来想了一下还是算了,毕竟这里牛人太多了,还是别乱说废话,以免被喷。
  • 大小: 23.8 KB
分享到:
评论
12 楼 sunqitang 2011-04-20  
楼主, 我现在对javaNIO的缓存机制 跟 普通IO中的字节数组之间的区别不是太明白。 能帮忙解释一下吗?
11 楼 pan_java 2011-04-20  
精神值的学习
10 楼 alosin 2011-04-20  
谢谢lz,写代码就是要一减再减,不要做那些脱裤子打屁的事
9 楼 janeky 2011-04-19  
写得很详细,支持。

其实可以接着写其他方面的内容

比如多线程的socket

NIO ……
8 楼 sunqy 2011-04-19  
yangyi 写道
是,即使是buffered stream也没有必要每次flush,因为buffer满了后就会flush,只有最后的一次write需要flush

对,没有必要每次都flush,最后可以显示的flush一次。
7 楼 nitianyi 2011-04-19  
深入钻研的精神值得学习!
6 楼 dwbin 2011-04-19  
流操作习惯于关闭以前flush。
5 楼 AlwenS 2011-04-19  
tcp-stream is non-buffered ?

nagale算法的前提就是必须buffered住小块的tcp data,这两个观点不是冲突了?

莫非:
此buffered指的是应用层/库的buffer而非os内核里从属于该tcp端点的buffer?

4 楼 i2534 2011-04-19  
文件读写的话NIO中的FileChanel似乎可以工作的更好.
当然了,大家习惯上还是用循环并且flush的.
毕竟这样用之四海而皆准,不用考虑每个实现是不是优化过了.
3 楼 yangyi 2011-04-19  
是,即使是buffered stream也没有必要每次flush,因为buffer满了后就会flush,只有最后的一次write需要flush
2 楼 tracyhuyan 2011-04-19  
lz很有钻研精神,赞下
1 楼 biAji 2011-04-19  
android用的实现木有这个机制。  写程序还是不能依赖jvm实现啊。

相关推荐

    Java编程那些事儿

    "Java编程那些事儿"无疑是对这个强大语言的深入探讨,旨在帮助开发人员提升技能,拓宽视野。这份资料可能是由一系列章节或主题组成的文档,比如基础语法、面向对象编程、异常处理、集合框架、多线程、IO流、网络编程...

    java编程那些事儿

    ### Java编程那些事儿 #### 程序设计概述与核心要素 **程序设计**,作为一项专业技能,涉及将现实世界的问题转化为计算机可执行的指令序列。它不仅仅是一门技术,更是一种思维方式的体现。本章节将从三个方面展开...

    Java编程那些事儿系列文章.pdf(全)

    《Java编程那些事儿》系列文章,由IT教育专家陈跃峰撰写,旨在通俗易懂地讲解Java编程的基础知识与实践技巧,特别强调了程序设计的基本概念及其对初学者的重要性。以下是从该系列文章中提炼出的关键知识点: ### ...

    java编程那些事儿-陈跃峰71-102

    《Java编程那些事儿》是陈跃峰先生的一本关于Java编程技术的著作,该书深入浅出地介绍了Java编程中的重要概念和实践技巧。这里我们主要聚焦于压缩包中的部分内容,涉及Java集合框架、时间日期处理、文件操作、多线程...

    Java编程那些事儿_java_

    本文将围绕“Java编程那些事儿”,探讨JVM的工作原理,诊断工具的使用以及性能优化策略。 1. JVM原理: JVM是一种虚拟机,它为Java程序提供了跨平台的执行环境。它负责解析.class文件,执行字节码,并管理内存区域...

    java程序员的那些事儿

    首先,学习Java编程语言是基础。Java以其“一次编写,到处运行”的特性,成为跨平台开发的首选。初学者应该掌握基本语法、面向对象编程概念(如封装、继承和多态)、异常处理、集合框架(如ArrayList、LinkedList、...

Global site tag (gtag.js) - Google Analytics