论坛首页 Java企业应用论坛

Java网络编程的那些事儿

浏览 8710 次
精华帖 (0) :: 良好帖 (10) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-04-18   最后修改:2011-04-19
我在这里用Java写一个非常简单的网络传输程序。

public class MyClient {

	public static void main(String[] args) throws IOException {
		Socket s = new Socket("127.0.0.1", 9817);
		try {
			OutputStream os = s.getOutputStream();
			byte[] t = new byte[1024 * 1024];
			ByteArrayInputStream bais = new ByteArrayInputStream(t);
			byte[] buf = new byte[1024];
			int readed = 0;
			while ((readed = bais.read(buf)) >= 0) {
				os.write(buf, 0, readed); 
				os.flush();
			}
		} finally {
			s.close();
		}
	}
	
}


例子比较简单,就是把1M的数据内容,发送到远程服务器。

这里我们很自然就会用上一个while循环来发送。在循环体里每次发送1k的数据内容。并且每次write之后还加了个flush,确保发送过去。

这是一个看上去很自然的代码,但就是这么自然简单的代码就隐藏了一些比较诡异的东西。

首先,我们必须要知道os到底是一个什么类?有人说事OutputStream,这是废话。但这是一个abstract类,总会有一个实现类的。我们print一下它的class,发现是java.net.SocketOutputStream。

在这个SocketOutputStream类我们会发现这样两件事情:

(1)flush方法没有被重写
(2)write方法最终会调用socketWrite0这个native方法

第一个问题比较搞笑,SocketOutputStream的父类是FileOutputStream,这个类也没有覆盖flush。而FileOutputStream的父类就是OutputStream,这里的flush方法是空的。于是我们可以得出一个搞笑的结论,在socket里,flush是没用的。

不过发现这个结论一点都不能值得骄傲,而且还算是后知后觉。因为早在2000年的时候就有人提过这个“bug”,大家可以看看下面的回答。



回答得比较清楚:TCP流不是可缓冲的,所以不需要实现这个flush。这里还提到了nagle算法和setTcpNoDelay方法,有兴趣的可以google一下。Nagle算法其实就是通过一些算法把一些小包串起来发送以尽可能减少发送包的数量,优化网络。

第二个问题其实就是关于那个循环write。我们之所以把代码写成这个样子,实际上是因为受到了c Socket的影响,因为在C,我们一般都会把数据尽可能的分成小段发送,例如4096。

但如果我们深入socketWrite0这个native这个方法就会法相一些不一样的东西了。在正常情况下我们是看不到这个方法的源代码,还好我们有OpenJDK。这里我参考的是openjdk-7-ea-src-b133-10_mar_2011,各位可以根据自己喜欢选择。

在OpenJDK有很多Native的代码,这里我选择Windows平台的,大家熟悉嘛。打开openjdk\jdk\src\windows\native\java\net\SocketOutputStream.c,这里的Java_java_net_SocketOutputStream_socketWrite0就是对应的socketWrite0方法。

/*
 * Use stack allocate buffer if possible. For large sizes we allocate
 * an intermediate buffer from the heap (up to a maximum). If heap is
 * unavailable just use our stack buffer.
 */
if (len <= MAX_BUFFER_LEN) {
    bufP = BUF;
    buflen = MAX_BUFFER_LEN;
} else {
    buflen = min(MAX_HEAP_BUFFER_LEN, len);
    bufP = (char *)malloc((size_t)buflen);
    if (bufP == NULL) {
        bufP = BUF;
        buflen = MAX_BUFFER_LEN;
    }
}

while(len > 0) {
    int loff = 0;
    int chunkLen = min(buflen, len);
    int llen = chunkLen;
    int retry = 0;

    (*env)->GetByteArrayRegion(env, data, off, chunkLen, (jbyte *)bufP);

    while(llen > 0) {
        int n = send(fd, bufP + loff, llen, 0);
        if (n > 0) {
            llen -= n;
            loff += n;
            continue;
        }

        /*
         * Due to a bug in Windows Sockets (observed on NT and Windows
         * 2000) it may be necessary to retry the send. The issue is that
         * on blocking sockets send/WSASend is supposed to block if there
         * is insufficient buffer space available. If there are a large
         * number of threads blocked on write due to congestion then it's
         * possile to hit the NT/2000 bug whereby send returns WSAENOBUFS.
         * The workaround we use is to retry the send. If we have a
         * large buffer to send (>2k) then we retry with a maximum of
         * 2k buffer. If we hit the issue with <=2k buffer then we backoff
         * for 1 second and retry again. We repeat this up to a reasonable
         * limit before bailing out and throwing an exception. In load
         * conditions we've observed that the send will succeed after 2-3
         * attempts but this depends on network buffers associated with
         * other sockets draining.
         */
        if (WSAGetLastError() == WSAENOBUFS) {
            if (llen > MAX_BUFFER_LEN) {
                buflen = MAX_BUFFER_LEN;
                chunkLen = MAX_BUFFER_LEN;
                llen = MAX_BUFFER_LEN;
                continue;
            }
            if (retry >= 30) {
                JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                    "No buffer space available - exhausted attempts to queue buffer");
                if (bufP != BUF) {
                    free(bufP);
                }
                return;
            }
            Sleep(1000);
            retry++;
            continue;
        }

        /*
         * Send failed - can be caused by close or write error.
         */
        if (WSAGetLastError() == WSAENOTSOCK) {
            JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "Socket closed");
        } else {
            NET_ThrowCurrent(env, "socket write error");
        }
        if (bufP != BUF) {
            free(bufP);
        }
        return;
    }
    len -= chunkLen;
    off += chunkLen;
}

if (bufP != BUF) {
    free(bufP);
}


实际上大家可能会被一大堆的代码淹没,其实重点就是那两个while循环,哈哈,无语吧,原来Java内部其实已经实现了这个容错机制。这里实际上是每次取一块数据(chunk)出来,然后循环把这块数据发完(调用send函数有可能不能一次性发完)。

So,我们会发现我们一开始的那个while处理是显得如此的多余……

于是,我们发现其实我们的代码可以这样写:

public class MyClient {

	public static void main(String[] args) throws IOException {
		Socket s = new Socket("127.0.0.1", 9817);
		try {
			OutputStream os = s.getOutputStream();
			byte[] t = new byte[1024 * 1024];
			os.write(t);
		} finally {
			s.close();
		}
	}
	
}


干脆明了。

实际上如果我们能足够仔细就会发现一些问题:Socket的send函数是会返回一个int的类型,表示发送了多长数据,而java的socket的write的返回类型是void。两者的区别其实应经能够很好地说明问题了。


如果我们把这个问题在延伸一下,普通文件的写操作,我们应该也会很自然地写出类似前面的循环小数据量写入,或者使用JDK的BufferedOutputStream,实际上情况跟Socket类似,JDK本身已经提供了容错性:

JNIEXPORT void JNICALL Java_java_io_FileOutputStream_writeBytes(JNIEnv *env,
    jobject this, jbyteArray bytes, jint off, jint len, jboolean append)
{
    writeBytes(env, this, bytes, off, len, append, fos_fd);
}

void writeBytes(JNIEnv *env, jobject this, jbyteArray bytes,
           jint off, jint len, jboolean append, jfieldID fid)
{
    jint n;
    char stackBuf[BUF_SIZE];
    char *buf = NULL;
    FD fd;

    if (IS_NULL(bytes)) {
        JNU_ThrowNullPointerException(env, NULL);
        return;
    }

    if (outOfBounds(env, off, len, bytes)) {
        JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);
        return;
    }

    if (len == 0) {
        return;
    } else if (len > BUF_SIZE) {
        buf = malloc(len);
        if (buf == NULL) {
            JNU_ThrowOutOfMemoryError(env, NULL);
            return;
        }
    } else {
        buf = stackBuf;
    }

    (*env)->GetByteArrayRegion(env, bytes, off, len, (jbyte *)buf);

    if (!(*env)->ExceptionOccurred(env)) {
        off = 0;
        while (len > 0) {
            fd = GET_FD(this, fid);
            if (fd == -1) {
                JNU_ThrowIOException(env, "Stream Closed");
                break;
            }
            if (append == JNI_TRUE) {
                n = (jint)IO_Append(fd, buf+off, len);
            } else {
                n = (jint)IO_Write(fd, buf+off, len);
            }
            if (n == JVM_IO_ERR) {
                JNU_ThrowIOExceptionWithLastError(env, "Write error");
                break;
            } else if (n == JVM_IO_INTR) {
                JNU_ThrowByName(env, "java/io/InterruptedIOException", NULL);
                break;
            }
            off += n;
            len -= n;
        }
    }
    if (buf != stackBuf) {
        free(buf);
    }
}

#define IO_Append handleAppend
#define IO_Write handleWrite

JNIEXPORT size_t handleWrite(jlong fd, const void *buf, jint len) {
    return writeInternal(fd, buf, len, JNI_FALSE);
}

static size_t writeInternal(jlong fd, const void *buf, jint len, jboolean append)
{
    BOOL result = 0;
    DWORD written = 0;
    HANDLE h = (HANDLE)fd;
    if (h != INVALID_HANDLE_VALUE) {
        OVERLAPPED ov;
        LPOVERLAPPED lpOv;
        if (append == JNI_TRUE) {
            ov.Offset = (DWORD)0xFFFFFFFF;
            ov.OffsetHigh = (DWORD)0xFFFFFFFF;
            ov.hEvent = NULL;
            lpOv = &ov;
        } else {
            lpOv = NULL;
        }
        result = WriteFile(h,                /* File handle to write */
                           buf,              /* pointers to the buffers */
                           len,              /* number of bytes to write */
                           &written,         /* receives number of bytes written */
                           lpOv);            /* overlapped struct */
    }
    if ((h == INVALID_HANDLE_VALUE) || (result == 0)) {
        return -1;
    }
    return (size_t)written;
}


本来最后想来几句结束语的,不过后来想了一下还是算了,毕竟这里牛人太多了,还是别乱说废话,以免被喷。
  • 大小: 23.8 KB
   发表时间:2011-04-19  
android用的实现木有这个机制。  写程序还是不能依赖jvm实现啊。
0 请登录后投票
   发表时间:2011-04-19  
lz很有钻研精神,赞下
0 请登录后投票
   发表时间:2011-04-19  
是,即使是buffered stream也没有必要每次flush,因为buffer满了后就会flush,只有最后的一次write需要flush
0 请登录后投票
   发表时间:2011-04-19  
文件读写的话NIO中的FileChanel似乎可以工作的更好.
当然了,大家习惯上还是用循环并且flush的.
毕竟这样用之四海而皆准,不用考虑每个实现是不是优化过了.
0 请登录后投票
   发表时间:2011-04-19   最后修改:2011-04-20
tcp-stream is non-buffered ?

nagale算法的前提就是必须buffered住小块的tcp data,这两个观点不是冲突了?

莫非:
此buffered指的是应用层/库的buffer而非os内核里从属于该tcp端点的buffer?

0 请登录后投票
   发表时间:2011-04-19  
流操作习惯于关闭以前flush。
0 请登录后投票
   发表时间:2011-04-19  
深入钻研的精神值得学习!
0 请登录后投票
   发表时间:2011-04-19  
yangyi 写道
是,即使是buffered stream也没有必要每次flush,因为buffer满了后就会flush,只有最后的一次write需要flush

对,没有必要每次都flush,最后可以显示的flush一次。
0 请登录后投票
   发表时间:2011-04-19  
写得很详细,支持。

其实可以接着写其他方面的内容

比如多线程的socket

NIO ……
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics