`
Donald_Draper
  • 浏览: 981606 次
社区版块
存档分类
最新评论

DatagramChannelImpl 解析二(报文发送与接收)

    博客分类:
  • NIO
nio 
阅读更多
DatagramChannelImpl 解析一(初始化):http://donald-draper.iteye.com/blog/2373245
引言:
     DatagramChannelImpl主要成员有报文socket分发器,这个与SocketChannleImpl中的socket分发器原理基本相同,报文socket分发器可以理解为报文通道的静态代理;网络协议family表示当前报文通道的网络协议family;多播关系注册器MembershipRegistry,主要是通过一个Map-HashMap<InetAddress,LinkedList<MembershipKeyImpl>>来管理多播组和多播组成员关系key的映射(关系);通道本地读写线程记录器,及读写锁控制通道读写,一个状态锁,当通道状态改变时,需要获取状态锁。DatagramChannelImpl构造方法,主要是初始化读写线程,及读写锁和状态锁,初始化网络协议family,及报文通道描述符和文件描述id。DatagramChannelImpl(SelectorProvider selectorprovider)与其他两个不同的是构造时更新当前报文socket的数量。
    今天这篇我们来看报文通道的具体实现我们需要关注的方法drop,block,unblock,join,
send,receive,read和write。
我们先来看一下send方法
//发送报文到指定的socketaddress
 public int send(ByteBuffer bytebuffer, SocketAddress socketaddress)
   throws IOException
   //buffer为null,则抛出空指针异常
   if(bytebuffer == null)
       throw new NullPointerException();
   Object obj = writeLock;//同步写锁
   JVM INSTR monitorenter ;//进入同步,try
   InetSocketAddress inetsocketaddress;
   InetAddress inetaddress;
   ensureOpen();//确保通道打开
   //检查socketaddress
   inetsocketaddress = Net.checkAddress(socketaddress);
   inetaddress = inetsocketaddress.getAddress();
   if(inetaddress == null)
       throw new IOException("Target address not resolved");
   Object obj1 = stateLock;//同步状态锁
   JVM INSTR monitorenter ;//try
   if(!isConnected())
   {
       if(socketaddress == null)
           throw new NullPointerException();
       SecurityManager securitymanager = System.getSecurityManager();
       if(securitymanager != null)
           //如果地址为多播地址,则检查是否具有多播权限
           if(inetaddress.isMulticastAddress())
               securitymanager.checkMulticast(inetaddress);
           else
	   //否则检查是否具有连接inetaddress.getHostAddress()和相应端口的权限
               securitymanager.checkConnect(inetaddress.getHostAddress(), inetsocketaddress.getPort());
       break MISSING_BLOCK_LABEL_156;
   }
   //socketaddress不为报文socket的远端地址,则抛出IllegalArgumentException
   if(!socketaddress.equals(remoteAddress))
       throw new IllegalArgumentException("Connected address not equal to target address");
   ...
    int i = 0;
    int j;
    begin();//与end方法协同,记录中断器,控制中断。
    ...
    //获取本地写线程
    writerThread = NativeThread.current();
    do
        //委托给send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
        i = send(fd, bytebuffer, inetsocketaddress);
    while(i == -3 && isOpen());
    ...
}

send(ByteBuffer bytebuffer, SocketAddress socketaddress)方法,我们
需要关注的是一下几点:
1.
 
 ensureOpen();//确保通道打开

 private void ensureOpen()
     throws ClosedChannelException
 {
     if(!isOpen())
         throw new ClosedChannelException();
     else
         return;
 }

2.
 //如果地址为多播地址,则检查是否具有多播权限
if(inetaddress.isMulticastAddress())

    securitymanager.checkMulticast(inetaddress);
//SecurityManager,检查多播权限
  public void checkMulticast(InetAddress maddr) {
        String host = maddr.getHostAddress();
        if (!host.startsWith("[") && host.indexOf(':') != -1) {
            host = "[" + host + "]";
        }
        checkPermission(new SocketPermission(host,
            SecurityConstants.SOCKET_CONNECT_ACCEPT_ACTION));
    }

//SecurityConstants
public static final String SOCKET_CONNECT_ACCEPT_ACTION = "connect,accept";

3.
 do
     //委托给send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
     i = send(fd, bytebuffer, inetsocketaddress);
 while(i == -3 && isOpen());

这个之所以是循环为了,当发送报文操作因某种原因中断,但报文没有发送完,当中断位消除时,
继续发送报文。
我们来看send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
private int send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
    throws IOException
{
    int i;
    ByteBuffer bytebuffer1;
    //如果buffer为direct类型,则直接调用sendFromNativeBuffer
    if(bytebuffer instanceof DirectBuffer)
        return sendFromNativeBuffer(filedescriptor, bytebuffer, inetsocketaddress);
    //获取当前buffer的实际容量,即remaining
    i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    int k = i > j ? 0 : j - i;//remaining
    //从当前线程缓存区获取临时DirectByteBuffer
    bytebuffer1 = Util.getTemporaryDirectBuffer(k);
    int i1;
    //读取buffer字节序列,写到临时DirectByteBuffer中
    bytebuffer1.put(bytebuffer);
    //读写模式转换,以便发送报文
    bytebuffer1.flip();
    bytebuffer.position(i);//重新定位buffer的position为原始位置,以便一次发送不完,再次发送
    //委托个sendFromNativeBuffer,发送已写的字节数
    int l = sendFromNativeBuffer(filedescriptor, bytebuffer1, inetsocketaddress);
    if(l > 0)
        //buffer的position向前移动i个位置
        bytebuffer.position(i + l);
    i1 = l;
    //添加DirectByteBuffer到当前线程缓存区,以便重用,因为DirectByteBuffer是直接操作系统内存
    //频繁的分配内存,将消耗过多的系统资源。
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    return i1;
    Exception exception;
    exception;
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    throw exception;
}

再来看sendFromNativeBuffer方法
private int sendFromNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
    throws IOException
{
    //获取当前buffer的实际容量,即remaining
    int i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    int k = i > j ? 0 : j - i;//remaining
    boolean flag = family != StandardProtocolFamily.INET;
    int l;
    try
    {
        //委托个send0,返回已发送的字节数
        l = send0(flag, filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
    }
    catch(PortUnreachableException portunreachableexception)
    {
        if(isConnected())
            throw portunreachableexception;
        l = k;
    }
    if(l > 0)
         //buffer的position向前移动i个位置
        bytebuffer.position(i + l);
    return l;
}

private native int send0(boolean flag, FileDescriptor filedescriptor, long l, int i, InetAddress inetaddress, int j)
        throws IOException;

从上面来看send方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
再来看receive方法:
 public SocketAddress receive(ByteBuffer bytebuffer)
        throws IOException
    {
        //如果buffer为只读,则抛出IllegalArgumentException
        if(bytebuffer.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = readLock;//同步读锁
        JVM INSTR monitorenter ;
        int i;
        ByteBuffer bytebuffer1;
        ensureOpen();//确保通道打开
	//如果本地地址为null,则绑定空地址
        if(localAddress() == null)
            bind(null);
        i = 0;
        bytebuffer1 = null;
        Object obj1;
        begin();//与end方法协同,记录中断器,控制中断。
        ...
        readerThread = NativeThread.current();
        if(!isConnected() && obj1 != null)
            break MISSING_BLOCK_LABEL_248;
        do
	    //读取报文,写到buffer
            i = receive(fd, bytebuffer);
        while(i == -3 && isOpen());
        ...
    }

接收报文方法,有以下几点要关注
1.
//如果本地地址为null,则绑定空地址
if(localAddress() == null)
    bind(null);

//获取通道本地socket地址
 public SocketAddress localAddress()
    {
        Object obj = stateLock;
        JVM INSTR monitorenter ;
        return localAddress;
        Exception exception;
        exception;
        throw exception;
    }

//通道绑定socket地址
  public volatile NetworkChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        return bind(socketaddress);
    }

 public DatagramChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        synchronized(readLock)
        {
            synchronized(writeLock)
            {
                synchronized(stateLock)
                {
                   //同步读写锁,及状态锁
		    ensureOpen();
                    if(localAddress != null)
                        throw new AlreadyBoundException();
                    InetSocketAddress inetsocketaddress;
                    if(socketaddress == null)
                    {
		        //地址为空,则获取本地地址
                        if(family == StandardProtocolFamily.INET)
                            inetsocketaddress = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
                        else
                            inetsocketaddress = new InetSocketAddress(0);
                    } else
                    {
                        inetsocketaddress = Net.checkAddress(socketaddress);
                        if(family == StandardProtocolFamily.INET)
                        {
                            InetAddress inetaddress = inetsocketaddress.getAddress();
                            if(!(inetaddress instanceof Inet4Address))
                                throw new UnsupportedAddressTypeException();
                        }
                    }
                    SecurityManager securitymanager = System.getSecurityManager();
                    if(securitymanager != null)
		        //检查地址端口监听权限
                        securitymanager.checkListen(inetsocketaddress.getPort());
		    //委托net完成实际的绑定工作
                    Net.bind(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
		    //初始化本地地址
                    localAddress = Net.localAddress(fd);
                }
            }
        }
        return this;
    }

2.
do                              
    //读取报文,写到buffer      
    i = receive(fd, bytebuffer);
while(i == -3 && isOpen()); 
 
这个之所以是循环为了,当接收报文操作因某种原因中断,但报文没有读取完,当中断位消除时,继续读取报文。
private int receive(FileDescriptor filedescriptor, ByteBuffer bytebuffer)
    throws IOException
{
    int k;
    int l;
    ByteBuffer bytebuffer1;
    //获取buffer当前可用空间remaining
    int i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    k = i > j ? 0 : j - i;//remaining
    //如果buffer为direct类型,则直接接收报文
    if((bytebuffer instanceof DirectBuffer) && k > 0)
        return receiveIntoNativeBuffer(filedescriptor, bytebuffer, k, i);
    l = Math.max(k, 1);
    //从当前线程缓冲区获取临时DirectByteBuffer
    bytebuffer1 = Util.getTemporaryDirectBuffer(l);
    int j1;
    //接收报文
    int i1 = receiveIntoNativeBuffer(filedescriptor, bytebuffer1, l, 0);
    //切换读写模式
    bytebuffer1.flip();
    if(i1 > 0 && k > 0)
        //读取临时DirectByteBuffer,写到buffer中
        bytebuffer.put(bytebuffer1);
    j1 = i1;
    //释放临时DirectByteBuffer,即,添加DirectByteBuffer到当前线程缓存区,以便重用
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    return j1;
    Exception exception;
    exception;
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    throw exception;
}

private int receiveIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, int i, int j)
    throws IOException
{
    //读取报文,写到buffer中
    int k = receive0(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)j, i, isConnected());
    if(k > 0)
        //buffer的position向前移动k个位置
        bytebuffer.position(j + k);
    return k;
}

private native int receive0(FileDescriptor filedescriptor, long l, int i, boolean flag)
        throws IOException;

receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
发送报文和接受报文方法看完,我们来看一下需要通道建立连接,才能进行使用的读写操作方法:先来看写操作,读取buffer,写到输出流
  public int write(ByteBuffer bytebuffer)
        throws IOException
    {
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = writeLock;
        JVM INSTR monitorenter ;
        int i;
        synchronized(stateLock)
        {
            ensureOpen();
            if(!isConnected())
                throw new NotYetConnectedException();
        }
        i = 0;
        int j;
        begin();
        if(isOpen())
            break MISSING_BLOCK_LABEL_123;
        j = 0;
        writerThread = 0L;
        end(i > 0 || i == -2);
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        return j;
        writerThread = NativeThread.current();
        do
	    //关键在这,委托给IOUtil
            i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
        while(i == -3 && isOpen());
       ...
    }

再来看读写buffer组,写到输出流
public long write(ByteBuffer abytebuffer[], int i, int j)
        throws IOException
    {
        ...
        writerThread = NativeThread.current();
        do
	    //关键在这,委托给IOUtil
            l = IOUtil.write(fd, abytebuffer, i, j, nd);
        while(l == -3L && isOpen());
	...
   }

再来读操作,从输入流读取报文,写到buffer中
 public int read(ByteBuffer bytebuffer)
        throws IOException
    {
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = readLock;
        JVM INSTR monitorenter ;
        int i;
        synchronized(stateLock)
        {
            ensureOpen();
            if(!isConnected())
                throw new NotYetConnectedException();
        }
        i = 0;
        int j;
        begin();
        if(isOpen())
            break MISSING_BLOCK_LABEL_123;
        j = 0;
        readerThread = 0L;
        end(i > 0 || i == -2);
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        return j;
        readerThread = NativeThread.current();
        do
	    //关键点在这,委托给IOUtil
            i = IOUtil.read(fd, bytebuffer, -1L, nd, readLock);
        while(i == -3 && isOpen());
	...
    }

从输入流读取报文,写到buffer数组中,
 public long read(ByteBuffer abytebuffer[], int i, int j)
        throws IOException
    {
       ...
        readerThread = NativeThread.current();
        do
	    //关键点在这,委托给IOUtil
            l = IOUtil.read(fd, abytebuffer, i, j, nd);
        while(l == -3L && isOpen());
       ...
    }

read和write形式的方法关键的读写操作都是委托给IOUtil来完成,这个与SocketChannelImpl中读写操作基本思路相同,这里就不再介绍。
总结:
      send(发送报文)方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
      receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
     send(发送报文)和receive(接收报文)方法不需要通道已经处于连接状态,而read和write需要通道建立连接状态,这种方式与SocketChannel的读写操作相同,这样与SocketChannel无异,如果需要不如使用SocketChannel。如果使用DatagramChannel,建议使用send和recieve方法进行报文的发送和接收。

DatagramChannelImpl 解析三(多播):http://donald-draper.iteye.com/blog/2373507
0
1
分享到:
评论

相关推荐

    TCP报文发送接收测试工具

    TCP报文发送接收测试工具,正如其标题所言,是用于检验TCP通信的有效性,帮助开发者调试TCP报文传输过程中的问题。这款工具可以模拟客户端与服务器端的角色,实现双向通信,从而对TCP协议的各个方面进行深度测试。 ...

    linux下使用RAW SOCKET接收LLDP报文并解析LLDP报文

    本话题主要关注的是使用RAW SOCKET来接收和解析LLDP(Link Layer Discovery Protocol)报文。LLDP是一种网络邻接发现协议,用于交换设备间关于自身身份和能力的信息。RAW SOCKET允许程序员直接与网络层交互,获取...

    北斗短报文发送与接收,Android串口编程

    北斗短报文发送与接收是北斗卫星导航系统中的一项特色服务,它允许用户在没有移动通信网络覆盖的区域,如海洋、山区、荒漠等地,通过卫星进行信息传递。这项技术在应急通信、海洋渔业、野外探险等领域具有广泛应用。...

    java UDP报文的发送与接收

    接下来,我们将详细讨论如何在Java中发送和接收UDP报文。 一、发送UDP报文 发送UDP报文主要涉及以下几个步骤: 1. **创建DatagramSocket**:首先,我们需要创建一个`DatagramSocket`实例,这将代表我们的UDP...

    解析swift报文所需jar包

    java解析swift报文所需jar,可以用来解析Swift报文等客户对账单信息 java解析swift报文所需jar,可以用来解析Swift报文等客户对账单信息 java解析swift报文所需jar,可以用来解析Swift报文等客户对账单信息 java解析...

    vs2010 mfc ARP报文发送

    在本文中,我们将深入探讨如何使用Visual Studio 2010和MFC(Microsoft Foundation Classes)库来构建一个ARP(地址解析协议)报文发送器。ARP是TCP/IP协议栈中的一个重要组成部分,它用于将IP地址映射到物理(MAC)...

    国南网报文解析V9.9.9.exe(698.45报文解析,南网规约报文解析,376报文解析,101/104报文解析)

    本人写的电力行业报文解析工具,单文件免安装,随存随用,详细解析到每个字节。支持如下规约: 1.698.45报文解析;2.南网规约报文解析;3.376.2报文解析(茜茜写的);4.376.1规约帧结构解析;5.645.07表规约帧结构解析...

    串口调试工具 可以发送和接收 报文数据

    串口调试工具是一种用于测试和调试串行通信接口(如RS-232、RS-485等)的应用程序,它允许用户发送和接收报文数据,以便于验证通信协议、设备功能或者排查硬件或软件问题。在IT行业中,这类工具是开发者、工程师和...

    TCP报文段发送接收模拟

    下面将详细讨论如何使用Java来模拟TCP报文段的发送和接收过程。 首先,模拟TCP报文段的发送和接收,我们需要理解TCP的工作原理。TCP通过三次握手建立连接,然后进行数据传输,最后通过四次挥手释放连接。在Java中,...

    Goose报文发送模拟工具

    4. **验证与调试**:通过发送和接收GOOSE报文,工具可以帮助验证变电站设备对GOOSE报文的处理是否符合IEC 61850标准,从而确保系统的正确性和可靠性。 5. **兼容性测试**:工具可能还支持多种不同厂商的设备,帮助...

    iso8583实例解析银联报文(java)

    二、Java解析ISO8583报文 在Java中,解析ISO8583报文通常需要自定义解析器或者使用已有的开源库。报文通常由固定长度和可变长度的字段组成,解析过程包括以下几个步骤: 1. **位图解析**:位图(Bitmap)是ISO8583...

    解析8583报文工具源码

    2. **报文构建与解构**:源码中应包含功能,允许用户通过GUI界面输入字段值,然后构造一个完整的8583报文,或者从接收到的报文数据中提取各个字段。 3. **错误处理**:由于8583报文的规范性要求较高,源码应该包含...

    用java实现ARP报文的发送

    在 ARP 报文的发送和接收中,我们使用 jpcap 库来调用 winpcap,以实现 ARP 报文的发送和接收。jpcap 库提供了一个接口来调用 winpcap,从而实现了 ARP 报文的发送和接收。 四、所使用到的类 在本程序中,我们使用...

    tcp报文发送工具SocketTool.rar

    软件介绍: 一款小巧但功能强大的tcp报文发送工具,可创建TCP服务端和客户端,UDP服务端和客户端,UDP工作组,发送指定报文数据,并显示数据接收及提示窗口。

    CAN报文的解析与应用

    CAN报文的解析与应用是汽车电子领域中非常关键的技术之一,尤其在电动汽车的电子控制单元(ECU)通信中扮演着重要的角色。本文将详细探讨CAN报文的基本概念、接收方法、结构组成以及实际应用,特别是在电动汽车充电...

    TCP socket 请求报文发送工具

    在这个场景中,"TCP socket请求报文发送工具"是一种实用的软件工具,它能够帮助用户模拟TCP请求或者执行简单的socket请求,无需安装,属于绿色便携版。 首先,TCP请求报文的结构包括以下几个主要部分: 1. **源...

    快充报文解析.rar_BMS通讯协议_快充_报文解析_电池管理_电池管理系统

    充电设备接收到后,根据报文内容决定充电策略,并返回确认报文。 2. **状态信息交换**:在充电过程中,BMS不断监测电池状态,如单体电池电压、总电压、总电流、温度等,并将这些数据编码为报文发送给充电设备。充电...

    客户端报文发送模拟器,服务端响应模拟器

    fileName:发送的报文文件名称,把需要发送的报文存入文件中,fileName就是这个文件的名称,需要指定路径; 服务端模拟:在软件所在目录打开命令窗口,执行:nc -l -p port -o request 表示监听本地端口,模拟...

    RIP报文的发送和接收.doc

    1、是否接收对端rip协议更新报文的判断标准为:能否学习到对端发送的更新报文中携带的路由信息,即RIP路由表中有路由信息 2、关于RIP的版本,端口视图配置的RIP 版本优先级要高于RIP进程视图下配置的RIP版本优先级 ...

Global site tag (gtag.js) - Google Analytics