`
Donald_Draper
  • 浏览: 981236 次
社区版块
存档分类
最新评论

SocketChannelImpl 解析一(通道连接,发送数据)

    博客分类:
  • NIO
nio 
阅读更多
ThreadLocal解析 :http://donald-draper.iteye.com/blog/2368159
Java NIO ByteBuffer详解:http://donald-draper.iteye.com/blog/2357084
DirectByteBuffer简介:http://donald-draper.iteye.com/blog/2372351
SelectorProvider定义:http://donald-draper.iteye.com/blog/2369615
ServerSocketChannelImpl解析:http://donald-draper.iteye.com/blog/2370912
SocketChannel接口定义:http://donald-draper.iteye.com/blog/2371218

引言:
在SocketChannel接口定义这篇文章中,我们看了socket的连接,完成连接,是否正在建立连接,读缓冲区写到通道,聚集写,读通道写缓冲区,分散读等方法。在NIO包中TCP发送接受字节序列通过SocketChannel。今天我们来看一下SocketChannel的具体
实现。我们从SocketChannel的open方法开始。
//SocketChannel
 public static SocketChannel open() throws IOException {
        return SelectorProvider.provider().openSocketChannel();
    }

SelectorProvider.provider()这个过程我们就不详说了实际是加载系统默认的SelectorProvider
实例,则个我们在SelectorProvider定义有提过,简单看一下:
//SelectorProviderImpl
public abstract class SelectorProviderImpl extends SelectorProvider
{
  public SocketChannel openSocketChannel()
        throws IOException
    {
        return new SocketChannelImpl(this);
    }
}

从上面可以看出,SocketChannel的默认实现为SocketChannelImpl。再来看SocketChannelImpl的变量
声明和相关方法的实现。
class SocketChannelImpl extends SocketChannel
    implements SelChImpl
{  
    private static NativeDispatcher nd = new SocketDispatcher();//socket的分发器
    private final FileDescriptor fd;//文件描述
    private final int fdVal;//文件描述id
    private volatile long readerThread;//读线程
    private volatile long writerThread;//写线程
    private final Object readLock;//读锁
    private final Object writeLock;//写锁
    private final Object stateLock;//状态锁
    private static final int ST_UNINITIALIZED = -1;//未初始化
    private static final int ST_UNCONNECTED = 0;//未连接
    private static final int ST_PENDING = 1;//正在连接
    private static final int ST_CONNECTED = 2;//已连接
    private static final int ST_KILLPENDING = 3;//正在关闭
    private static final int ST_KILLED = 4;//关闭
    private int state;//通道状态
    private SocketAddress localAddress;//socket本地地址
    private SocketAddress remoteAddress;//socket远端地址
    private boolean isInputOpen;//输入流是否打开
    private boolean isOutputOpen;//输出流是否打开
    private boolean readyToConnect;//是否正在准备连接
    private Socket socket;//通道套接字
    static final boolean $assertionsDisabled = !sun/nio/ch/SocketChannelImpl.desiredAssertionStatus();
    static 
    {
       //加载nio,net资源库
        Util.load();
    }
  }

  SocketChannelImpl的构造方法有三种分别如下
  1.
  
 SocketChannelImpl(SelectorProvider selectorprovider)
        throws IOException
    {
        super(selectorprovider);
        readerThread = 0L;
        writerThread = 0L;
	//初始化读写及状态锁
        readLock = new Object();
        writeLock = new Object();
        stateLock = new Object();
        state = -1;//状态默认为未初始化
        isInputOpen = true;
        isOutputOpen = true;
        readyToConnect = false;
        fd = Net.socket(true);//初始化文件描述符
        fdVal = IOUtil.fdVal(fd);//获取文件描述的值
        state = 0;//已初始化,未连接
    }

2.
 
  SocketChannelImpl(SelectorProvider selectorprovider, FileDescriptor filedescriptor, boolean flag)
        throws IOException
    {
        super(selectorprovider);
        readerThread = 0L;
        writerThread = 0L;
        readLock = new Object();
        writeLock = new Object();
        stateLock = new Object();
        state = -1;
        isInputOpen = true;
        isOutputOpen = true;
        readyToConnect = false;
        fd = filedescriptor;
        fdVal = IOUtil.fdVal(filedescriptor);
        state = 0;//已初始化,未连接
        if(flag)
	    //初始化本地地址
            localAddress = Net.localAddress(filedescriptor);
    }

3.
 
  SocketChannelImpl(SelectorProvider selectorprovider, FileDescriptor filedescriptor, InetSocketAddress inetsocketaddress)
        throws IOException
    {
        super(selectorprovider);
        readerThread = 0L;
        writerThread = 0L;
        readLock = new Object();
        writeLock = new Object();
        stateLock = new Object();
        state = -1;
        isInputOpen = true;
        isOutputOpen = true;
        readyToConnect = false;
        fd = filedescriptor;
        fdVal = IOUtil.fdVal(filedescriptor);
        state = 2;//已连接
        localAddress = Net.localAddress(filedescriptor);
        remoteAddress = inetsocketaddress;
    }

我们需要关注的是这两点,
a.fd = Net.socket(true);//初始化文件描述符

//Net
 static FileDescriptor socket(boolean flag)
        throws IOException
    {
        return socket(UNSPEC, flag);
    }
    static FileDescriptor socket(ProtocolFamily protocolfamily, boolean flag)
        throws IOException
    {
        boolean flag1 = isIPv6Available() && protocolfamily != StandardProtocolFamily.INET;
        return IOUtil.newFD(socket0(flag1, flag, false));
    }
    private static native int socket0(boolean flag, boolean flag1, boolean flag2);

//IOUtil
 static FileDescriptor newFD(int i)
    {
        FileDescriptor filedescriptor = new FileDescriptor();
        setfdVal(filedescriptor, i);
        return filedescriptor;
    }

这个我们在ServerSocketChannelImpl解析这篇文章接触过Net和IOUtil,这里不具体的解释了
,看一下即可,很容易理解。
b.localAddress = Net.localAddress(filedescriptor);

//Net
 static InetSocketAddress localAddress(FileDescriptor filedescriptor)
        throws IOException
    {
        return new InetSocketAddress(localInetAddress(filedescriptor), localPort(filedescriptor));
    }
      private static native int localPort(FileDescriptor filedescriptor)
        throws IOException;

    private static native InetAddress localInetAddress(FileDescriptor filedescriptor)
        throws IOException;

从上面可以看出SocketChannelImpl构造主要是初始化读写及状态锁和通道socket文件描述。
来看SocketChannelImpl的其他方法
//连接socket地址
 public boolean connect(SocketAddress socketaddress)
        throws IOException
    {
        boolean flag = false;
        Object obj = readLock;//同步读锁
        JVM INSTR monitorenter ;//try
        Object obj1 = writeLock;//同步写锁
        JVM INSTR monitorenter ;
        InetSocketAddress inetsocketaddress;
	//确保socket通道处于打开状态,没有连接
        ensureOpenAndUnconnected();
	//检查socketAddress正确与合法性
        inetsocketaddress = Net.checkAddress(socketaddress);
        SecurityManager securitymanager = System.getSecurityManager();
        if(securitymanager != null)
	    //检查当前线程是否有Connect方法的访问控制权限
            securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort());
        //同步regLock锁,Lock for registration and configureBlocking operations
	//这个在AbstractSelectableChannel中定义
	Object obj2 = blockingLock();
        JVM INSTR monitorenter ;
        int i = 0;
	//Marks the begin/end of an I/O operation that might block indefinitely.
        begin();//与end协调使用,用于可能阻塞IO操作
        boolean flag1;
	//同步状态锁
        synchronized(stateLock)
        {
            if(isOpen())
                break MISSING_BLOCK_LABEL_149;
            flag1 = false;
        }
	//清除Reader线程
        readerCleanup();
        end(i > 0 || i == -2);
	//断言连接结果大于-2,则连接失败,抛出断言异常
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        return flag1;
        if(localAddress == null)
	    //beforeTcpConnect为静态空方法体,这个我们在ServerSocketChannelImpl中有说
            NetHooks.beforeTcpConnect(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
        //初始化读线程
	readerThread = NativeThread.current();
        obj3;
        JVM INSTR monitorexit ;
        do
        {
            InetAddress inetaddress = inetsocketaddress.getAddress();
            if(inetaddress.isAnyLocalAddress())
                inetaddress = InetAddress.getLocalHost();
            //尝试连接socket地址
            i = Net.connect(fd, inetaddress, inetsocketaddress.getPort());
        } while(i == -3 && isOpen());
        readerCleanup();
        end(i > 0 || i == -2);
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        break MISSING_BLOCK_LABEL_358;
        Exception exception1;
        exception1;
        readerCleanup();
        end(i > 0 || i == -2);
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        else
            throw exception1;
        IOException ioexception;
        ioexception;
	//出现IO异常,则关闭通道
        close();
        throw ioexception;
        Object obj4 = stateLock;
        JVM INSTR monitorenter ;
        remoteAddress = inetsocketaddress;
        if(i <= 0) goto _L2; else goto _L1
_L1:
        state = 2;
        if(isOpen())
            localAddress = Net.localAddress(fd);
        true;
        obj2;
        JVM INSTR monitorexit ;//退出同步
        obj1;
        JVM INSTR monitorexit ;
        obj;
        JVM INSTR monitorexit ;
        return;
_L2:
        if(!isBlocking())
            state = 1;
        else
        if(!$assertionsDisabled)
            throw new AssertionError();
        obj4;
        JVM INSTR monitorexit ;
          goto _L3
        Exception exception2;
        exception2;
        obj4;
        JVM INSTR monitorexit ;
        throw exception2;
_L3:
        obj2;
        JVM INSTR monitorexit ;
          goto _L4
        Exception exception3;
        exception3;
        obj2;
        JVM INSTR monitorexit ;
        throw exception3;
_L4:
        false;
        obj1;
        JVM INSTR monitorexit ;
        obj;
        JVM INSTR monitorexit ;
        return;
        Exception exception4;
        exception4;
        throw exception4;
        Exception exception5;
        exception5;
        throw exception5;
    }

connect连接方法有几点要看:
1.
//确保socket通道处于打开状态,没有连接
ensureOpenAndUnconnected();

2.
//清除Reader线程
readerCleanup();

3.尝试连接socket地址
do
{
    InetAddress inetaddress = inetsocketaddress.getAddress();
    if(inetaddress.isAnyLocalAddress())
        inetaddress = InetAddress.getLocalHost();
    //尝试连接socket地址
    i = Net.connect(fd, inetaddress, inetsocketaddress.getPort());
} while(i == -3 && isOpen());

4.检查连接结果
if(!$assertionsDisabled && !IOStatus.check(i))
       throw new AssertionError();
   else
       throw exception1;
   IOException ioexception;
   ioexception;
   //出现IO异常,则关闭通道
   close();

下面分别来看这四点:
1.
//确保socket通道处于打开状态,没有连接
ensureOpenAndUnconnected();

 void ensureOpenAndUnconnected()
        throws IOException
    {
        synchronized(stateLock)
        {
            if(!isOpen())//通道关闭
                throw new ClosedChannelException();
            if(state == 2)//已经连接
                throw new AlreadyConnectedException();
            if(state == 1)//正在来接
                throw new ConnectionPendingException();
        }
    }

2.
//清除Reader线程
readerCleanup();

 private void readerCleanup()
        throws IOException
    {
        synchronized(stateLock)
        {
            readerThread = 0L;
	    //连接正在关闭,则调用kill完成实际关闭工作
            if(state == 3)
                kill();
        }
    }

3.尝试连接socket地址
do
{
    InetAddress inetaddress = inetsocketaddress.getAddress();
    if(inetaddress.isAnyLocalAddress())
        inetaddress = InetAddress.getLocalHost();
    //尝试连接socket地址,这里为什么是循序,因为连接操作有可能被中断,及i为-3,
    //当中断位消除时,继续尝试连接
    i = Net.connect(fd, inetaddress, inetsocketaddress.getPort());
} while(i == -3 && isOpen());

//Net
  
static int connect(FileDescriptor filedescriptor, InetAddress inetaddress, int i)
        throws IOException
    {
        return connect(UNSPEC, filedescriptor, inetaddress, i);
    }

    static int connect(ProtocolFamily protocolfamily, FileDescriptor filedescriptor, InetAddress inetaddress, int i)
        throws IOException
    {
        boolean flag = isIPv6Available() && protocolfamily != StandardProtocolFamily.INET;
        return connect0(flag, filedescriptor, inetaddress, i);
    }

    private static native int connect0(boolean flag, FileDescriptor filedescriptor, InetAddress inetaddress, int i)
        throws IOException;

4.检查连接结果
if(!$assertionsDisabled && !IOStatus.check(i))
       throw new AssertionError();
   else
       throw exception1;
   IOException ioexception;
   ioexception;
   //出现IO异常,则关闭通道
   close();

这一点我们需要关注的是IOStatus.check(i)这句:
//IOStatus
package sun.nio.ch;
final class IOStatus
{
    static final int EOF = -1;//结束
    static final int UNAVAILABLE = -2;//不可用
    static final int INTERRUPTED = -3;//操作中断
    static final int UNSUPPORTED = -4;//不支持
    static final int THROWN = -5;//异常
    static final int UNSUPPORTED_CASE = -6;
    private IOStatus()
    {
    }
    static int normalize(int i)
    {
        if(i == -2)
            return 0;
        else
            return i;
    }
    //连接结果i大于等于-2,即连接失败
    static boolean check(int i)
    {
        return i >= -2;
    }
    static long normalize(long l)
    {
        if(l == -2L)
            return 0L;
        else
            return l;
    }
    static boolean check(long l)
    {
        return l >= -2L;
    }
    static boolean checkAll(long l)
    {
        return l > -1L || l < -6L;
    }
}

从上面可以看出,connect连接方法首先同步读锁和写锁,确保socket通道打开,并没有连接;
然后检查socket地址的正确性与合法性,然后检查当前线程是否有Connect方法的访问控制权限,
最后尝试连接socket地址。
再来看地址绑定方法bind
 public SocketChannel bind(SocketAddress socketaddress)
        throws IOException
    {
       //同步读锁,写锁,状态锁
        synchronized(readLock)
        {
            synchronized(writeLock)
            {
                synchronized(stateLock)
                {
                    if(!isOpen())//通道关闭
                        throw new ClosedChannelException();
                    if(state == 1)//正在连接
                        throw new ConnectionPendingException();
                    if(localAddress != null)
                        throw new AlreadyBoundException();
		    //检查地址
                    InetSocketAddress inetsocketaddress = socketaddress != null ? Net.checkAddress(socketaddress) : new InetSocketAddress(0);
                    NetHooks.beforeTcpBind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
                    //绑定地址,这个在ServerSocketChannelImpl篇,一看过不在重复。
		    Net.bind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
                    //初始化localAddress
		    localAddress = Net.localAddress(fd);
                }
            }
        }
        return this;
    }

下面来看SocketChannelImpl的几个读写方法
先来看从缓冲区读取数据,写到通道
public int write(ByteBuffer bytebuffer)
        throws IOException
    {
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = writeLock;//同步写锁
        JVM INSTR monitorenter ;//进入同步
        int i;
	//确保没有关闭输出流
        ensureWriteOpen();
        i = 0;
        begin();//end,
        int k;
        synchronized(stateLock)
        {
            if(isOpen())
                break MISSING_BLOCK_LABEL_140;
            k = 0;
        }
	//清除写线程
        writerCleanup();
        end(i > 0 || i == -2);
	//同步状态锁,如果通道输出流关闭或写异常,则抛出AsynchronousCloseException
        synchronized(stateLock)
        {
            if(i <= 0 && !isOutputOpen)
                throw new AsynchronousCloseException();
        }
	//断言,检查写结果
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        return k;
	//初始化线程
        writerThread = NativeThread.current();
        obj1;
        JVM INSTR monitorexit ;
        int j;
        do
	    //写字节流,为什么是循环写,如果字节序列太多,发送缓冲区一次写不完,需要分多次写
            i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
        while(i == -3 && isOpen());
        j = IOStatus.normalize(i);
        writerCleanup();
        end(i > 0 || i == -2);
        synchronized(stateLock)
        {
            if(i <= 0 && !isOutputOpen)
                throw new AsynchronousCloseException();
        }
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        obj;
        JVM INSTR monitorexit ;
        return j;
        Exception exception3;
        exception3;
        writerCleanup();
        end(i > 0 || i == -2);
        synchronized(stateLock)
        {
            if(i <= 0 && !isOutputOpen)
                throw new AsynchronousCloseException();
        }
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        else
            throw exception3;
        Exception exception5;
        exception5;
        throw exception5;
    }

写操作需要关注一下几点,
1.
//确保没有关闭输出流
 ensureWriteOpen();

2.
 //写字节流
 do
    //写字节流
     i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
 while(i == -3 && isOpen())

3.
//清除写线程
writerCleanup();

下面分别来看这三点
1.
//确保没有关闭输出流
 ensureWriteOpen();
  private void ensureWriteOpen()
     throws ClosedChannelException
 {
     synchronized(stateLock)
     {
         if(!isOpen())//通道关闭
             throw new ClosedChannelException();
         if(!isOutputOpen)//输出流关闭
             throw new ClosedChannelException();
         if(!isConnected())//还没连接
             throw new NotYetConnectedException();
     }
 }

2.
//写字节流
 do
    //写字节流,为什么是循环写,如果字节序列太多,发送缓冲区一次写不完,需要分多次写
     i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
 while(i == -3 && isOpen())

//IOUtil
static int write(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)
        throws IOException
    {
        int i;
        ByteBuffer bytebuffer1;
	//如果ByteBffer为DirectBuffer,则调用writeFromNativeBuffer
        if(bytebuffer instanceof DirectBuffer)
            return writeFromNativeBuffer(filedescriptor, bytebuffer, l, nativedispatcher, obj);
        //获取缓冲区的当前位置
	i = bytebuffer.position();
	//获取缓冲区limit位置
        int j = bytebuffer.limit();
	//断言position是否大于limit,是抛出AssertionError
        if(!$assertionsDisabled && i > j)
            throw new AssertionError();
        int k = i > j ? 0 : j - i;//需要些的字节数
	//获取k个字节的临时DirectBuffer
        bytebuffer1 = Util.getTemporaryDirectBuffer(k);
        int j1;
	写缓冲区到临时内存缓冲区DirectBuffer-bytebuffer1
        bytebuffer1.put(bytebuffer);
	//转换bytebuffer1写模式,为读模式
        bytebuffer1.flip();
        bytebuffer.position(i);//重新定位bytebuffer的position位置
	//从本地缓冲空间写字节流,i1为已写的字节数
        int i1 = writeFromNativeBuffer(filedescriptor, bytebuffer1, l, nativedispatcher, obj);
        if(i1 > 0)
	    //重新定位bytebuffer的position位置
	    //为什么重新定位bytebuffer的position位,
	    //如果字节序列太多,发送缓冲区一次写不完,需要分多次写
	    //将position向前移动i1位置,避免重复写即已写过的字节序列。
            bytebuffer.position(i + i1);
        j1 = i1;
	//将byteBuffer内存写到当前线程的缓存区
        Util.offerFirstTemporaryDirectBuffer(bytebuffer1);
        return j1;
        Exception exception;
        exception;
        Util.offerFirstTemporaryDirectBuffer(bytebuffer1);
        throw exception;
    }

这一步我们有几点要关注:
a.
//获取k个字节的临时DirectBuffer
bytebuffer1 = Util.getTemporaryDirectBuffer(k);

想要理解这点,先看一下Util的定义
//Util
class Util
{
    private static final int TEMP_BUF_POOL_SIZE;//临时缓冲区大小
    private static ThreadLocal localSelector = new ThreadLocal();
    private static ThreadLocal localSelectorWrapper = new ThreadLocal();
    private static Unsafe unsafe = Unsafe.getUnsafe();
    private static int pageSize = -1;
    private static volatile Constructor directByteBufferConstructor = null;
    private static volatile Constructor directByteBufferRConstructor = null;
    private static volatile String bugLevel = null;
    private static boolean loaded = false;
    static final boolean $assertionsDisabled = !sun/nio/ch/Util.desiredAssertionStatus();
    static 
    {
        //初始化临时缓冲区大小,为IOUtil的IOV_MAX,及系统默认最大IO缓冲区大小
	//static final int IOV_MAX = iovMax();
	//static native int iovMax();
        TEMP_BUF_POOL_SIZE = IOUtil.IOV_MAX;
    }
    //线程本地缓存区
    private static ThreadLocal bufferCache = new ThreadLocal() {

        protected BufferCache initialValue()
        {
            return new BufferCache();
        }

        protected volatile Object initialValue()
        {
            return initialValue();
        }

    };
}

//IOUtil,变量IOV_MAX
static native int iovMax();
static final int IOV_MAX = iovMax();

再来看Util的缓冲区的定义BufferCache
//Util
 private static class BufferCache
    {
        //存放字节序列的缓存数组,可以这么理解buffers为
	//当前缓冲区存放的字节序列ByteBuffer
	//buffers的size,即为当前缓冲区可以接受写多少个字节序列ByteBuffer
        private ByteBuffer buffers[];
        private int count;//当前缓冲区中,有数据的字节序列ByteBuffer的个数,即buffers计数器
        private int start;//缓冲区buffers的开始索引,即头部
        static final boolean $assertionsDisabled = !sun/nio/ch/Util.desiredAssertionStatus();
        BufferCache()
        {
	    //初始化缓冲区
            buffers = new ByteBuffer[Util.TEMP_BUF_POOL_SIZE];
        }
	//向缓冲区的头部添加一个字节序列bytebuffer,即写字节序列到缓存区
	 boolean offerFirst(ByteBuffer bytebuffer)
        {
            if(count >= Util.TEMP_BUF_POOL_SIZE)
            {
	        //如果当前缓冲区已满,则返回false,即当前不能写字节序列到缓存区
                return false;
            } else
            {
	        //获取缓冲区byteBuffers的当前头部索引start的前一个索引
                start = ((start + Util.TEMP_BUF_POOL_SIZE) - 1) % Util.TEMP_BUF_POOL_SIZE;
                //写字节序列到缓存区的索引start对应的ByteBuffer
		buffers[start] = bytebuffer;
                count++;//缓冲区bytebuffer计数器+1
                return true;//写字节序列到缓存区成功
            }
        }
	//这个与offerFirst恰好相反,写字节序列到缓冲区的尾部(索引start + count)
        boolean offerLast(ByteBuffer bytebuffer)
        {
            if(count >= Util.TEMP_BUF_POOL_SIZE)
            {
                return false;
            } else
            {
                int i = (start + count) % Util.TEMP_BUF_POOL_SIZE;
                buffers[i] = bytebuffer;
                count++;
                return true;
            }
        }
	//缓冲区buffers,索引向后移动
        private int next(int i)
        {
            return (i + 1) % Util.TEMP_BUF_POOL_SIZE;
        }
	//注意这个i不是索引的意思,是需要写的字节序列的字节个数,
	//这个在IOUtil的write方法中调用,如下面两行代码
	//获取k个字节的临时DirectBuffer
        //bytebuffer1 = Util.getTemporaryDirectBuffer(k);
        ByteBuffer get(int i)
        {
	    //如果缓存区当前可用的可用的ByteBuffer,返回null
            if(count == 0)
                return null;
            ByteBuffer abytebuffer[] = buffers;
            ByteBuffer bytebuffer = abytebuffer[start];
	    //如果当前缓冲区start索引对应的bytebuffer,不够用,即容量不够存放要写的字节序列
	    //则遍历当前buffers,找到可以存放的bytebuffer
            if(bytebuffer.capacity() < i)
            {
                bytebuffer = null;
                int j = start;
                do
                {
                    if((j = next(j)) == start)
		        //只有一个bytebuffer,break
                        break;
                    ByteBuffer bytebuffer1 = abytebuffer[j];
                    if(bytebuffer1 == null)
		        //下一个bytebuffer为null,break
                        break;
                    if(bytebuffer1.capacity() < i)
		         //容量不够用,continue
                        continue;
		    //找到可以存放i个字节序列的bytebuffer
                    bytebuffer = bytebuffer1;
                    break;
                } while(true);
                if(bytebuffer == null)
                    return null;
                abytebuffer[j] = abytebuffer[start];
            }
	    //清空
            abytebuffer[start] = null;
            start = next(start);
            count--;//缓冲区bytebuffer计数器-1
	    //调用rewind,为了从开始位置写字节流
            bytebuffer.rewind();
            bytebuffer.limit(i);//限制bytebuffer的可用空间limit
            return bytebuffer;
        }
	//缓冲区是否为空
        boolean isEmpty()
        {
            return count == 0;
        }
	//移除缓冲区头部的bytebuffer
        ByteBuffer removeFirst()
        {
	   //如果断言开启, 缓冲区为空,抛出断言异常
            if(!$assertionsDisabled && count <= 0)
            {
                throw new AssertionError();
            } else
            {
	       //有了上面几个方法,下面应该很好理解,就不说了
                ByteBuffer bytebuffer = buffers[start];
                buffers[start] = null;
                start = next(start);
                count--;
                return bytebuffer;
            }
        }
    }

从上面可以看出BufferCache用一个ByteBuffer数组buffers存放写到缓冲区的字节流序列,每次写字节流对应一个ByteBuffer,用count记录当前缓冲区中的有数据或可用的ByteBuffer数量,start记录当前缓冲区buffers的头部;offerFirst方法向缓冲区的头部添加一个字节序列bytebuffer,即写字节序列到缓存区;offerLast与offerFirst恰好相反,写字节序列到缓冲区的尾部(索引start + count);next方法为向后移动缓冲区buffers索引;get(int i)方法为从缓冲区获取可以存放i个字节序列的ByteBuffer,并rewind字节缓冲区ByteBuffer,
限制孔勇空间为ByteBuffer。removeFirst为移除缓冲区头部的bytebuffer,并返回。

看过Util的BufferCache的定义,我们再回到
//获取k个字节的临时DirectBuffer
bytebuffer1 = Util.getTemporaryDirectBuffer(k);

//Util
static ByteBuffer getTemporaryDirectBuffer(int i)
{
    //获取当前线程的缓冲区(ThreadLocal-bufferCache)
    BufferCache buffercache = (BufferCache)bufferCache.get();
    //从缓冲区获取容量第一个大于i的ByteBuffer
    ByteBuffer bytebuffer = buffercache.get(i);
    //如果缓冲区存在容量大于i个字节的bytebuffer,直接返回
    if(bytebuffer != null)
        return bytebuffer;
    //如果缓冲区中不存在容量大于i的bytebuffer,且不为空;
    //则移除缓冲区头部的bytebuffer
    if(!buffercache.isEmpty())
    {
        ByteBuffer bytebuffer1 = buffercache.removeFirst();
	//释放bytebuffer1
        free(bytebuffer1);
    }
    //ByteBuffer直接分配一个DirectByteBuffer,存放字节序列
    return ByteBuffer.allocateDirect(i);
}

获取临时DirectByteBuffer有两点要看
a.1
//释放bytebuffer1
free(bytebuffer1);

//Util
 private static void free(ByteBuffer bytebuffer)
    {
        //实际委托给DirectBuffer的clean,这个我们在DirectByteBuffer有说,
	//即释放分配的实际物理内存
        ((DirectBuffer)bytebuffer).cleaner().clean();
    }

//DirectBuffer
package sun.nio.ch;
import sun.misc.Cleaner;
public interface DirectBuffer
{
    public abstract long address();
    public abstract Object attachment();
    public abstract Cleaner cleaner();
}

a.2
 
public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
    }

b.
//从本地缓冲空间写字节流,i1为已写的字节数
int i1 = writeFromNativeBuffer(filedescriptor, bytebuffer1, l, nativedispatcher, obj);

//nativedispatcher参数实际为SocketDispatcher
private static int writeFromNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)
        throws IOException
    {
        int i = bytebuffer.position();
        int j = bytebuffer.limit();
        if(!$assertionsDisabled && i > j)
            throw new AssertionError();
        int k = i > j ? 0 : j - i;
        int i1 = 0;
        if(k == 0)
            return 0;
        if(l != -1L)
	    //这个方法在Nativedispatcher定义,在SocketDispatcher并没有实现,obj为writeLock
            i1 = nativedispatcher.pwrite(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);
        else
	    //默认的写操作
            i1 = nativedispatcher.write(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);
        if(i1 > 0)
	    //将position向前移动i1位置,避免重复写即已写过的字节序列
            bytebuffer.position(i + i1);
        return i1;
    }

来看两种方式的写
b.1
 if(l != -1L)
    //这个在Nativedispatcher,在SocketDispatcher并没有实现
    i1 = nativedispatcher.pwrite(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);

//Nativedispatcher
 int pwrite(FileDescriptor filedescriptor, long l, int i, long l1, Object obj)
        throws IOException
    {
       //操作当前JDK,不支持,留待以后扩展用吧,我的JDK为1.7.0.17
        throw new IOException("Operation Unsupported");
    }

b.2
else
    //默认的写操作
    i1 = nativedispatcher.write(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);

//SocketDispatcher
int write(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return write0(filedescriptor, l, i);
    }
  static native int write0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;

从缓冲读取字节序列,写到通道中,实际是通过SocketDispatcher完成实际的写工作,当前默认的写方法为write(FileDescriptor filedescriptor, long l, int i)。
c.
//添加bytebuffer到线程当前缓冲区
Util.offerFirstTemporaryDirectBuffer(bytebuffer1);

static void offerFirstTemporaryDirectBuffer(ByteBuffer bytebuffer)
   {
       if(!$assertionsDisabled && bytebuffer == null)
           throw new AssertionError();
       //获取当前线程缓冲区
       BufferCache buffercache = (BufferCache)bufferCache.get();
       //将bytebuffer添加到缓冲区
       if(!buffercache.offerFirst(bytebuffer))
           free(bytebuffer);
   }

3.
//清除写线程
writerCleanup();
 private void writerCleanup()
        throws IOException
    {
        synchronized(stateLock)
        {
            writerThread = 0L;
            if(state == 3)
	        //这个kill操作,我们会在后面再讲
                kill();
        }
    }

从以上分析可以看出,从缓冲区读取字节序列写到通道,首先确保通道打开,且输出流没有关闭,然后委托给IOUtil写字节序列;IOUtil写字节流过程为首先通过Util从当前线程的缓冲区获取可以容下字节序列的临时缓冲区(DirectByteBuffer),如果没有则创建一个DirectByteBuffer,将字节序列写到临时的DirectByteBuffer中,然后将写操作委托给nativedispatcher(SocketDispatcher),将DirectByteBuffer添加到当前线程的缓冲区,
以便重用,因为DirectByteBuffer实际上是存在物理内存中,频繁的分配将会消耗更多的资源。

总结:

SocketChannelImpl构造主要是初始化读写及状态锁和通道socket文件描述。
connect连接方法首先同步读锁和写锁,确保socket通道打开,并没有连接;然后检查socket地址的正确性与合法性,然后检查当前线程是否有Connect方法的访问控制权限,最后尝试连接socket地址。从缓冲区读取字节序列写到通道write(ByteBuffer),首先确保通道打开,且输出流没有关闭,然后委托给IOUtil写字节序列;IOUtil写字节流过程为首先通过Util从当前线程的缓冲区获取可以容下字节序列的临时缓冲区(DirectByteBuffer),如果没有则创建一个DirectByteBuffer,将字节序列写到临时的DirectByteBuffer中,然后将写操作委托给nativedispatcher(SocketDispatcher),将DirectByteBuffer添加到当前线程的缓冲区,
以便重用,因为DirectByteBuffer实际上是存在物理内存中,频繁的分配将会消耗更多的资源。

SocketChannelImpl 解析二(发送数据后续):http://donald-draper.iteye.com/blog/2372548
附:
权限检查:SecurityManager为系统的默认安全检查管理器,主要用于检查当前线程是否拥有
某个权限的访问控制权限,比如socket连接,监听,获取类加载等。
//SecurityManager
//检查socket连接权限
 public void checkConnect(String host, int port) {
        if (host == null) {
            throw new NullPointerException("host can't be null");
        }
        if (!host.startsWith("[") && host.indexOf(':') != -1) {
            host = "[" + host + "]";
        }
        if (port == -1) {
            checkPermission(new SocketPermission(host,
                SecurityConstants.SOCKET_RESOLVE_ACTION));
        } else {
	    //检查是否socket连接访问控制权限
            checkPermission(new SocketPermission(host+":"+port,
                SecurityConstants.SOCKET_CONNECT_ACTION));
        }
    }
     public void checkPermission(Permission perm) {
        //检查是否perm的访问控制权限
        java.security.AccessController.checkPermission(perm);
    }

//SecurityConstants,安全权限常量
public final class SecurityConstants
{   
    //AWT为创建图形界面相关权限
    public static class AWT
    {
        private static PermissionFactory permissionFactory()
        {
            Class class1;
            class1 = (Class)AccessController.doPrivileged(new PrivilegedAction() {

                public Class run()
                {
                    return Class.forName("sun.awt.AWTPermissionFactory", true, null);
                    ClassNotFoundException classnotfoundexception;
                    classnotfoundexception;
                    return null;
                }

                public volatile Object run()
                {
                    return run();
                }

            });
            if(class1 == null)
                break MISSING_BLOCK_LABEL_52;
            return (PermissionFactory)class1.newInstance();
            Object obj;
            obj;
            throw new InternalError(((InstantiationException) (obj)).getMessage());
            obj;
            throw new InternalError(((IllegalAccessException) (obj)).getMessage());
            return new FakeAWTPermissionFactory();
        }
        private static Permission newAWTPermission(String s)
        {
            return factory.newPermission(s);
        }
        private static final String AWTFactory = "sun.awt.AWTPermissionFactory";
        private static final PermissionFactory factory = permissionFactory();
        public static final Permission TOPLEVEL_WINDOW_PERMISSION = newAWTPermission("showWindowWithoutWarningBanner");
        public static final Permission ACCESS_CLIPBOARD_PERMISSION = newAWTPermission("accessClipboard");//访问粘贴板
        public static final Permission CHECK_AWT_EVENTQUEUE_PERMISSION = newAWTPermission("accessEventQueue");
        public static final Permission TOOLKIT_MODALITY_PERMISSION = newAWTPermission("toolkitModality");
        public static final Permission READ_DISPLAY_PIXELS_PERMISSION = newAWTPermission("readDisplayPixels");
        public static final Permission CREATE_ROBOT_PERMISSION = newAWTPermission("createRobot");
        public static final Permission WATCH_MOUSE_PERMISSION = newAWTPermission("watchMousePointer");
        public static final Permission SET_WINDOW_ALWAYS_ON_TOP_PERMISSION = newAWTPermission("setWindowAlwaysOnTop");
        public static final Permission ALL_AWT_EVENTS_PERMISSION = newAWTPermission("listenToAllAWTEvents");
        public static final Permission ACCESS_SYSTEM_TRAY_PERMISSION = newAWTPermission("accessSystemTray");


        private AWT()
        {
        }
    }
    private static class FakeAWTPermission extends BasicPermission
    {

        public String toString()
        {
            return (new StringBuilder()).append("(\"java.awt.AWTPermission\" \"").append(getName()).append("\")").toString();
        }

        private static final long serialVersionUID = -1L;

        public FakeAWTPermission(String s)
        {
            super(s);
        }
    }
    private static class FakeAWTPermissionFactory
        implements PermissionFactory
    {

        public FakeAWTPermission newPermission(String s)
        {
            return new FakeAWTPermission(s);
        }

        public volatile Permission newPermission(String s)
        {
            return newPermission(s);
        }

        private FakeAWTPermissionFactory()
        {
        }

    }
    private SecurityConstants()
    {
    }
    public static final String FILE_DELETE_ACTION = "delete";//文件删除
    public static final String FILE_EXECUTE_ACTION = "execute";//文件执行
    public static final String FILE_READ_ACTION = "read";//文件读
    public static final String FILE_WRITE_ACTION = "write";//写文件
    public static final String FILE_READLINK_ACTION = "readlink";
    public static final String SOCKET_RESOLVE_ACTION = "resolve";
    public static final String SOCKET_CONNECT_ACTION = "connect";//socket连接
    public static final String SOCKET_LISTEN_ACTION = "listen";//socket监听
    public static final String SOCKET_ACCEPT_ACTION = "accept";//socket接受连接
    public static final String SOCKET_CONNECT_ACCEPT_ACTION = "connect,accept";//socket连接,接受连接
    public static final String PROPERTY_RW_ACTION = "read,write";//读写属性
    public static final String PROPERTY_READ_ACTION = "read";//读属性
    public static final String PROPERTY_WRITE_ACTION = "write";//写属性
    public static final AllPermission ALL_PERMISSION = new AllPermission();
    public static final NetPermission SPECIFY_HANDLER_PERMISSION = new NetPermission("specifyStreamHandler");
    public static final NetPermission SET_PROXYSELECTOR_PERMISSION = new NetPermission("setProxySelector");
    public static final NetPermission GET_PROXYSELECTOR_PERMISSION = new NetPermission("getProxySelector");
    public static final NetPermission SET_COOKIEHANDLER_PERMISSION = new NetPermission("setCookieHandler");
    public static final NetPermission GET_COOKIEHANDLER_PERMISSION = new NetPermission("getCookieHandler");
    public static final NetPermission SET_RESPONSECACHE_PERMISSION = new NetPermission("setResponseCache");
    public static final NetPermission GET_RESPONSECACHE_PERMISSION = new NetPermission("getResponseCache");
    //创建类加载器
    public static final RuntimePermission CREATE_CLASSLOADER_PERMISSION = new RuntimePermission("createClassLoader");
    public static final RuntimePermission CHECK_MEMBER_ACCESS_PERMISSION = new RuntimePermission("accessDeclaredMembers");
    //修改线程
    public static final RuntimePermission MODIFY_THREAD_PERMISSION = new RuntimePermission("modifyThread");
    //修改线程分组信息
    public static final RuntimePermission MODIFY_THREADGROUP_PERMISSION = new RuntimePermission("modifyThreadGroup");
    public static final RuntimePermission GET_PD_PERMISSION = new RuntimePermission("getProtectionDomain");
    //获取类加载器
    public static final RuntimePermission GET_CLASSLOADER_PERMISSION = new RuntimePermission("getClassLoader");
    public static final RuntimePermission STOP_THREAD_PERMISSION = new RuntimePermission("stopThread");
    public static final RuntimePermission GET_STACK_TRACE_PERMISSION = new RuntimePermission("getStackTrace");
    public static final SecurityPermission CREATE_ACC_PERMISSION = new SecurityPermission("createAccessControlContext");
    public static final SecurityPermission GET_COMBINER_PERMISSION = new SecurityPermission("getDomainCombiner");
    public static final SecurityPermission GET_POLICY_PERMISSION = new SecurityPermission("getPolicy");
    public static final SocketPermission LOCAL_LISTEN_PERMISSION = new SocketPermission("localhost:1024-", "listen");

}
0
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics