`
Donald_Draper
  • 浏览: 987823 次
社区版块
存档分类
最新评论

DatagramChannelImpl 解析二(报文发送与接收)

    博客分类:
  • NIO
nio 
阅读更多
DatagramChannelImpl 解析一(初始化):http://donald-draper.iteye.com/blog/2373245
引言:
     DatagramChannelImpl主要成员有报文socket分发器,这个与SocketChannleImpl中的socket分发器原理基本相同,报文socket分发器可以理解为报文通道的静态代理;网络协议family表示当前报文通道的网络协议family;多播关系注册器MembershipRegistry,主要是通过一个Map-HashMap<InetAddress,LinkedList<MembershipKeyImpl>>来管理多播组和多播组成员关系key的映射(关系);通道本地读写线程记录器,及读写锁控制通道读写,一个状态锁,当通道状态改变时,需要获取状态锁。DatagramChannelImpl构造方法,主要是初始化读写线程,及读写锁和状态锁,初始化网络协议family,及报文通道描述符和文件描述id。DatagramChannelImpl(SelectorProvider selectorprovider)与其他两个不同的是构造时更新当前报文socket的数量。
    今天这篇我们来看报文通道的具体实现我们需要关注的方法drop,block,unblock,join,
send,receive,read和write。
我们先来看一下send方法
//发送报文到指定的socketaddress
 public int send(ByteBuffer bytebuffer, SocketAddress socketaddress)
   throws IOException
   //buffer为null,则抛出空指针异常
   if(bytebuffer == null)
       throw new NullPointerException();
   Object obj = writeLock;//同步写锁
   JVM INSTR monitorenter ;//进入同步,try
   InetSocketAddress inetsocketaddress;
   InetAddress inetaddress;
   ensureOpen();//确保通道打开
   //检查socketaddress
   inetsocketaddress = Net.checkAddress(socketaddress);
   inetaddress = inetsocketaddress.getAddress();
   if(inetaddress == null)
       throw new IOException("Target address not resolved");
   Object obj1 = stateLock;//同步状态锁
   JVM INSTR monitorenter ;//try
   if(!isConnected())
   {
       if(socketaddress == null)
           throw new NullPointerException();
       SecurityManager securitymanager = System.getSecurityManager();
       if(securitymanager != null)
           //如果地址为多播地址,则检查是否具有多播权限
           if(inetaddress.isMulticastAddress())
               securitymanager.checkMulticast(inetaddress);
           else
	   //否则检查是否具有连接inetaddress.getHostAddress()和相应端口的权限
               securitymanager.checkConnect(inetaddress.getHostAddress(), inetsocketaddress.getPort());
       break MISSING_BLOCK_LABEL_156;
   }
   //socketaddress不为报文socket的远端地址,则抛出IllegalArgumentException
   if(!socketaddress.equals(remoteAddress))
       throw new IllegalArgumentException("Connected address not equal to target address");
   ...
    int i = 0;
    int j;
    begin();//与end方法协同,记录中断器,控制中断。
    ...
    //获取本地写线程
    writerThread = NativeThread.current();
    do
        //委托给send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
        i = send(fd, bytebuffer, inetsocketaddress);
    while(i == -3 && isOpen());
    ...
}

send(ByteBuffer bytebuffer, SocketAddress socketaddress)方法,我们
需要关注的是一下几点:
1.
 
 ensureOpen();//确保通道打开

 private void ensureOpen()
     throws ClosedChannelException
 {
     if(!isOpen())
         throw new ClosedChannelException();
     else
         return;
 }

2.
 //如果地址为多播地址,则检查是否具有多播权限
if(inetaddress.isMulticastAddress())

    securitymanager.checkMulticast(inetaddress);
//SecurityManager,检查多播权限
  public void checkMulticast(InetAddress maddr) {
        String host = maddr.getHostAddress();
        if (!host.startsWith("[") && host.indexOf(':') != -1) {
            host = "[" + host + "]";
        }
        checkPermission(new SocketPermission(host,
            SecurityConstants.SOCKET_CONNECT_ACCEPT_ACTION));
    }

//SecurityConstants
public static final String SOCKET_CONNECT_ACCEPT_ACTION = "connect,accept";

3.
 do
     //委托给send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
     i = send(fd, bytebuffer, inetsocketaddress);
 while(i == -3 && isOpen());

这个之所以是循环为了,当发送报文操作因某种原因中断,但报文没有发送完,当中断位消除时,
继续发送报文。
我们来看send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
private int send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
    throws IOException
{
    int i;
    ByteBuffer bytebuffer1;
    //如果buffer为direct类型,则直接调用sendFromNativeBuffer
    if(bytebuffer instanceof DirectBuffer)
        return sendFromNativeBuffer(filedescriptor, bytebuffer, inetsocketaddress);
    //获取当前buffer的实际容量,即remaining
    i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    int k = i > j ? 0 : j - i;//remaining
    //从当前线程缓存区获取临时DirectByteBuffer
    bytebuffer1 = Util.getTemporaryDirectBuffer(k);
    int i1;
    //读取buffer字节序列,写到临时DirectByteBuffer中
    bytebuffer1.put(bytebuffer);
    //读写模式转换,以便发送报文
    bytebuffer1.flip();
    bytebuffer.position(i);//重新定位buffer的position为原始位置,以便一次发送不完,再次发送
    //委托个sendFromNativeBuffer,发送已写的字节数
    int l = sendFromNativeBuffer(filedescriptor, bytebuffer1, inetsocketaddress);
    if(l > 0)
        //buffer的position向前移动i个位置
        bytebuffer.position(i + l);
    i1 = l;
    //添加DirectByteBuffer到当前线程缓存区,以便重用,因为DirectByteBuffer是直接操作系统内存
    //频繁的分配内存,将消耗过多的系统资源。
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    return i1;
    Exception exception;
    exception;
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    throw exception;
}

再来看sendFromNativeBuffer方法
private int sendFromNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
    throws IOException
{
    //获取当前buffer的实际容量,即remaining
    int i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    int k = i > j ? 0 : j - i;//remaining
    boolean flag = family != StandardProtocolFamily.INET;
    int l;
    try
    {
        //委托个send0,返回已发送的字节数
        l = send0(flag, filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
    }
    catch(PortUnreachableException portunreachableexception)
    {
        if(isConnected())
            throw portunreachableexception;
        l = k;
    }
    if(l > 0)
         //buffer的position向前移动i个位置
        bytebuffer.position(i + l);
    return l;
}

private native int send0(boolean flag, FileDescriptor filedescriptor, long l, int i, InetAddress inetaddress, int j)
        throws IOException;

从上面来看send方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
再来看receive方法:
 public SocketAddress receive(ByteBuffer bytebuffer)
        throws IOException
    {
        //如果buffer为只读,则抛出IllegalArgumentException
        if(bytebuffer.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = readLock;//同步读锁
        JVM INSTR monitorenter ;
        int i;
        ByteBuffer bytebuffer1;
        ensureOpen();//确保通道打开
	//如果本地地址为null,则绑定空地址
        if(localAddress() == null)
            bind(null);
        i = 0;
        bytebuffer1 = null;
        Object obj1;
        begin();//与end方法协同,记录中断器,控制中断。
        ...
        readerThread = NativeThread.current();
        if(!isConnected() && obj1 != null)
            break MISSING_BLOCK_LABEL_248;
        do
	    //读取报文,写到buffer
            i = receive(fd, bytebuffer);
        while(i == -3 && isOpen());
        ...
    }

接收报文方法,有以下几点要关注
1.
//如果本地地址为null,则绑定空地址
if(localAddress() == null)
    bind(null);

//获取通道本地socket地址
 public SocketAddress localAddress()
    {
        Object obj = stateLock;
        JVM INSTR monitorenter ;
        return localAddress;
        Exception exception;
        exception;
        throw exception;
    }

//通道绑定socket地址
  public volatile NetworkChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        return bind(socketaddress);
    }

 public DatagramChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        synchronized(readLock)
        {
            synchronized(writeLock)
            {
                synchronized(stateLock)
                {
                   //同步读写锁,及状态锁
		    ensureOpen();
                    if(localAddress != null)
                        throw new AlreadyBoundException();
                    InetSocketAddress inetsocketaddress;
                    if(socketaddress == null)
                    {
		        //地址为空,则获取本地地址
                        if(family == StandardProtocolFamily.INET)
                            inetsocketaddress = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
                        else
                            inetsocketaddress = new InetSocketAddress(0);
                    } else
                    {
                        inetsocketaddress = Net.checkAddress(socketaddress);
                        if(family == StandardProtocolFamily.INET)
                        {
                            InetAddress inetaddress = inetsocketaddress.getAddress();
                            if(!(inetaddress instanceof Inet4Address))
                                throw new UnsupportedAddressTypeException();
                        }
                    }
                    SecurityManager securitymanager = System.getSecurityManager();
                    if(securitymanager != null)
		        //检查地址端口监听权限
                        securitymanager.checkListen(inetsocketaddress.getPort());
		    //委托net完成实际的绑定工作
                    Net.bind(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
		    //初始化本地地址
                    localAddress = Net.localAddress(fd);
                }
            }
        }
        return this;
    }

2.
do                              
    //读取报文,写到buffer      
    i = receive(fd, bytebuffer);
while(i == -3 && isOpen()); 
 
这个之所以是循环为了,当接收报文操作因某种原因中断,但报文没有读取完,当中断位消除时,继续读取报文。
private int receive(FileDescriptor filedescriptor, ByteBuffer bytebuffer)
    throws IOException
{
    int k;
    int l;
    ByteBuffer bytebuffer1;
    //获取buffer当前可用空间remaining
    int i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    k = i > j ? 0 : j - i;//remaining
    //如果buffer为direct类型,则直接接收报文
    if((bytebuffer instanceof DirectBuffer) && k > 0)
        return receiveIntoNativeBuffer(filedescriptor, bytebuffer, k, i);
    l = Math.max(k, 1);
    //从当前线程缓冲区获取临时DirectByteBuffer
    bytebuffer1 = Util.getTemporaryDirectBuffer(l);
    int j1;
    //接收报文
    int i1 = receiveIntoNativeBuffer(filedescriptor, bytebuffer1, l, 0);
    //切换读写模式
    bytebuffer1.flip();
    if(i1 > 0 && k > 0)
        //读取临时DirectByteBuffer,写到buffer中
        bytebuffer.put(bytebuffer1);
    j1 = i1;
    //释放临时DirectByteBuffer,即,添加DirectByteBuffer到当前线程缓存区,以便重用
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    return j1;
    Exception exception;
    exception;
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    throw exception;
}

private int receiveIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, int i, int j)
    throws IOException
{
    //读取报文,写到buffer中
    int k = receive0(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)j, i, isConnected());
    if(k > 0)
        //buffer的position向前移动k个位置
        bytebuffer.position(j + k);
    return k;
}

private native int receive0(FileDescriptor filedescriptor, long l, int i, boolean flag)
        throws IOException;

receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
发送报文和接受报文方法看完,我们来看一下需要通道建立连接,才能进行使用的读写操作方法:先来看写操作,读取buffer,写到输出流
  public int write(ByteBuffer bytebuffer)
        throws IOException
    {
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = writeLock;
        JVM INSTR monitorenter ;
        int i;
        synchronized(stateLock)
        {
            ensureOpen();
            if(!isConnected())
                throw new NotYetConnectedException();
        }
        i = 0;
        int j;
        begin();
        if(isOpen())
            break MISSING_BLOCK_LABEL_123;
        j = 0;
        writerThread = 0L;
        end(i > 0 || i == -2);
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        return j;
        writerThread = NativeThread.current();
        do
	    //关键在这,委托给IOUtil
            i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
        while(i == -3 && isOpen());
       ...
    }

再来看读写buffer组,写到输出流
public long write(ByteBuffer abytebuffer[], int i, int j)
        throws IOException
    {
        ...
        writerThread = NativeThread.current();
        do
	    //关键在这,委托给IOUtil
            l = IOUtil.write(fd, abytebuffer, i, j, nd);
        while(l == -3L && isOpen());
	...
   }

再来读操作,从输入流读取报文,写到buffer中
 public int read(ByteBuffer bytebuffer)
        throws IOException
    {
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = readLock;
        JVM INSTR monitorenter ;
        int i;
        synchronized(stateLock)
        {
            ensureOpen();
            if(!isConnected())
                throw new NotYetConnectedException();
        }
        i = 0;
        int j;
        begin();
        if(isOpen())
            break MISSING_BLOCK_LABEL_123;
        j = 0;
        readerThread = 0L;
        end(i > 0 || i == -2);
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        return j;
        readerThread = NativeThread.current();
        do
	    //关键点在这,委托给IOUtil
            i = IOUtil.read(fd, bytebuffer, -1L, nd, readLock);
        while(i == -3 && isOpen());
	...
    }

从输入流读取报文,写到buffer数组中,
 public long read(ByteBuffer abytebuffer[], int i, int j)
        throws IOException
    {
       ...
        readerThread = NativeThread.current();
        do
	    //关键点在这,委托给IOUtil
            l = IOUtil.read(fd, abytebuffer, i, j, nd);
        while(l == -3L && isOpen());
       ...
    }

read和write形式的方法关键的读写操作都是委托给IOUtil来完成,这个与SocketChannelImpl中读写操作基本思路相同,这里就不再介绍。
总结:
      send(发送报文)方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
      receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
     send(发送报文)和receive(接收报文)方法不需要通道已经处于连接状态,而read和write需要通道建立连接状态,这种方式与SocketChannel的读写操作相同,这样与SocketChannel无异,如果需要不如使用SocketChannel。如果使用DatagramChannel,建议使用send和recieve方法进行报文的发送和接收。

DatagramChannelImpl 解析三(多播):http://donald-draper.iteye.com/blog/2373507
0
1
分享到:
评论

相关推荐

    TCP报文发送接收测试工具

    TCP报文发送接收测试工具,正如其标题所言,是用于检验TCP通信的有效性,帮助开发者调试TCP报文传输过程中的问题。这款工具可以模拟客户端与服务器端的角色,实现双向通信,从而对TCP协议的各个方面进行深度测试。 ...

    linux下使用RAW SOCKET接收LLDP报文并解析LLDP报文

    本话题主要关注的是使用RAW SOCKET来接收和解析LLDP(Link Layer Discovery Protocol)报文。LLDP是一种网络邻接发现协议,用于交换设备间关于自身身份和能力的信息。RAW SOCKET允许程序员直接与网络层交互,获取...

    北斗短报文发送与接收,Android串口编程

    北斗短报文发送与接收是北斗卫星导航系统中的一项特色服务,它允许用户在没有移动通信网络覆盖的区域,如海洋、山区、荒漠等地,通过卫星进行信息传递。这项技术在应急通信、海洋渔业、野外探险等领域具有广泛应用。...

    java UDP报文的发送与接收

    接下来,我们将详细讨论如何在Java中发送和接收UDP报文。 一、发送UDP报文 发送UDP报文主要涉及以下几个步骤: 1. **创建DatagramSocket**:首先,我们需要创建一个`DatagramSocket`实例,这将代表我们的UDP...

    解析swift报文所需jar包

    java解析swift报文所需jar,可以用来解析Swift报文等客户对账单信息 java解析swift报文所需jar,可以用来解析Swift报文等客户对账单信息 java解析swift报文所需jar,可以用来解析Swift报文等客户对账单信息 java解析...

    linux网络报文接收发送过程分析

    Linux 网络报文接收发送过程分析 Linux 网络报文接收发送过程分析是 Linux 操作系统中网络通信的核心部分。网络报文的接收和发送是通过网络设备和网络协议栈之间的交互来实现的。在本文中,我们将详细介绍 Linux ...

    Labview解析CAN报文与发送DBC格式数据的解决方案(包含调用dll的使用指南和跨版本兼容修改方案),Labview 用DBC文件解析CAN报文以及DBC格式发送CAN,调用的dll有说明文档

    Labview解析CAN报文与发送DBC格式数据的解决方案(包含调用dll的使用指南和跨版本兼容修改方案),Labview 用DBC文件解析CAN报文以及DBC格式发送CAN,调用的dll有说明文档。 2013,2016,2019版本。 参考程序后续可以...

    vs2010 mfc ARP报文发送

    在本文中,我们将深入探讨如何使用Visual Studio 2010和MFC(Microsoft Foundation Classes)库来构建一个ARP(地址解析协议)报文发送器。ARP是TCP/IP协议栈中的一个重要组成部分,它用于将IP地址映射到物理(MAC)...

    国南网报文解析V9.9.9.exe(698.45报文解析,南网规约报文解析,376报文解析,101/104报文解析)

    本人写的电力行业报文解析工具,单文件免安装,随存随用,详细解析到每个字节。支持如下规约: 1.698.45报文解析;2.南网规约报文解析;3.376.2报文解析(茜茜写的);4.376.1规约帧结构解析;5.645.07表规约帧结构解析...

    串口调试工具 可以发送和接收 报文数据

    串口调试工具是一种用于测试和调试串行通信接口(如RS-232、RS-485等)的应用程序,它允许用户发送和接收报文数据,以便于验证通信协议、设备功能或者排查硬件或软件问题。在IT行业中,这类工具是开发者、工程师和...

    TCP报文段发送接收模拟

    下面将详细讨论如何使用Java来模拟TCP报文段的发送和接收过程。 首先,模拟TCP报文段的发送和接收,我们需要理解TCP的工作原理。TCP通过三次握手建立连接,然后进行数据传输,最后通过四次挥手释放连接。在Java中,...

    iso8583实例解析银联报文(java)

    二、Java解析ISO8583报文 在Java中,解析ISO8583报文通常需要自定义解析器或者使用已有的开源库。报文通常由固定长度和可变长度的字段组成,解析过程包括以下几个步骤: 1. **位图解析**:位图(Bitmap)是ISO8583...

    Goose报文发送模拟工具

    4. **验证与调试**:通过发送和接收GOOSE报文,工具可以帮助验证变电站设备对GOOSE报文的处理是否符合IEC 61850标准,从而确保系统的正确性和可靠性。 5. **兼容性测试**:工具可能还支持多种不同厂商的设备,帮助...

    解析8583报文工具源码

    2. **报文构建与解构**:源码中应包含功能,允许用户通过GUI界面输入字段值,然后构造一个完整的8583报文,或者从接收到的报文数据中提取各个字段。 3. **错误处理**:由于8583报文的规范性要求较高,源码应该包含...

    用java实现ARP报文的发送

    在 ARP 报文的发送和接收中,我们使用 jpcap 库来调用 winpcap,以实现 ARP 报文的发送和接收。jpcap 库提供了一个接口来调用 winpcap,从而实现了 ARP 报文的发送和接收。 四、所使用到的类 在本程序中,我们使用...

    CAN报文的解析与应用

    CAN报文的解析与应用是汽车电子领域中非常关键的技术之一,尤其在电动汽车的电子控制单元(ECU)通信中扮演着重要的角色。本文将详细探讨CAN报文的基本概念、接收方法、结构组成以及实际应用,特别是在电动汽车充电...

    tcp报文发送工具SocketTool.rar

    软件介绍: 一款小巧但功能强大的tcp报文发送工具,可创建TCP服务端和客户端,UDP服务端和客户端,UDP工作组,发送指定报文数据,并显示数据接收及提示窗口。

    快充报文解析.rar_BMS通讯协议_快充_报文解析_电池管理_电池管理系统

    充电设备接收到后,根据报文内容决定充电策略,并返回确认报文。 2. **状态信息交换**:在充电过程中,BMS不断监测电池状态,如单体电池电压、总电压、总电流、温度等,并将这些数据编码为报文发送给充电设备。充电...

    cantest软件,报文发送,接收

    二、普通发送模式 高级操作可直接在发送列表设置数据帧,发送列表最多可设置100条数据,勾选每条数据左边勾可以将这条数据添加到发送列表;右边可以设置发送方式,包括:正常发送、单次发送、自发自收、单次自发自...

Global site tag (gtag.js) - Google Analytics