`

DatagramChannelImpl 解析三(多播)

    博客分类:
  • NIO
nio 
阅读更多
DatagramChannelImpl 解析一(初始化):http://donald-draper.iteye.com/blog/2373245
DatagramChannelImpl 解析二(报文发送与接收):http://donald-draper.iteye.com/blog/2373281
引言:
上一篇看了报文的发送和接收,先来回顾一下,
   send(发送报文)方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
      receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
     send(发送报文)和receive(接收报文)方法不需要通道已经处于连接状态,而read和write需要通道建立连接状态,这种方式与SocketChannel的读写操作相同,这样与SocketChannel无异,如果需要不如使用SocketChannel。如果使用DatagramChannel,建议使用send和recieve方法进行报文的发送和接收。
今天我们来看一下多播相关的方法为drop,block,unblock,join。
先看join方法
//添加到多播组inetaddress
 public MembershipKey join(InetAddress inetaddress, NetworkInterface networkinterface)
        throws IOException
    {
        return innerJoin(inetaddress, networkinterface, null);
    }
//添加到多播组,只接受源地址为inetaddress1的报文
    public MembershipKey join(InetAddress inetaddress, NetworkInterface networkinterface, InetAddress inetaddress1)
        throws IOException
    {
        if(inetaddress1 == null)
            throw new NullPointerException("source address is null");
        else
            return innerJoin(inetaddress, networkinterface, inetaddress1);
    }

从上面可以看出加入多播组实际上的操作是由innerJoin来完成
 private MembershipKey innerJoin(InetAddress inetaddress, NetworkInterface networkinterface, InetAddress inetaddress1)
     throws IOException
 {
     //非多播地址抛出异常
     if(!inetaddress.isMulticastAddress())
         throw new IllegalArgumentException("Group not a multicast address");
     //如果地址为ip6,但加入的多播组地址为ip4,则抛出参数异常
     if(inetaddress instanceof Inet4Address)
     {
         if(family == StandardProtocolFamily.INET6 && !Net.canIPv6SocketJoinIPv4Group())
             throw new IllegalArgumentException("IPv6 socket cannot join IPv4 multicast group");
     } else
     if(inetaddress instanceof Inet6Address)
     {
         //如果多播地址为ip6,协议非INET6,抛出异常
         if(family != StandardProtocolFamily.INET6)
             throw new IllegalArgumentException("Only IPv6 sockets can join IPv6 multicast group");
     } else
     {
         throw new IllegalArgumentException("Address type not supported");
     }
     //如果多播组源地址不为空,则校验源地址
     if(inetaddress1 != null)
     {
         if(inetaddress1.isAnyLocalAddress())//源地址含通配符,address == 0;
             throw new IllegalArgumentException("Source address is a wildcard address");
         if(inetaddress1.isMulticastAddress())//源地址为多播地址
             throw new IllegalArgumentException("Source address is multicast address");
         if(inetaddress1.getClass() != inetaddress.getClass())//源地址与多播地址类型不同
             throw new IllegalArgumentException("Source address is different type to group");
     }
     SecurityManager securitymanager = System.getSecurityManager();
     if(securitymanager != null)
         //检查多播地址权限,接受和连接权限
         securitymanager.checkMulticast(inetaddress);
     Object obj = stateLock;
     JVM INSTR monitorenter ;
     Object obj1;
     if(!isOpen())//确保通道打开
         throw new ClosedChannelException();
     if(registry == null)
     {
         //多播关系注册器为null,则创建
         registry = new MembershipRegistry();
         break MISSING_BLOCK_LABEL_229;
     }
     //检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,
     //源地址为inetaddress1,多播成员关系key
     obj1 = registry.checkMembership(inetaddress, networkinterface, inetaddress1);
     if(obj1 != null)
         //有则直接返回
         return ((MembershipKey) (obj1));
     //否则根据网络协议族family,网络接口,源地址构造MembershipKeyImpl
     if(family == StandardProtocolFamily.INET6 && ((inetaddress instanceof Inet6Address) || Net.canJoin6WithIPv4Group()))
     {//Ip6
         int i = networkinterface.getIndex();
         if(i == -1)
             throw new IOException("Network interface cannot be identified");
         byte abyte0[] = Net.inet6AsByteArray(inetaddress);
         byte abyte1[] = inetaddress1 != null ? Net.inet6AsByteArray(inetaddress1) : null;
	 //加入多播组
         int l = Net.join6(fd, abyte0, i, abyte1);
         if(l == -2)
             throw new UnsupportedOperationException();
         obj1 = new MembershipKeyImpl.Type6(this, inetaddress, networkinterface, inetaddress1, abyte0, i, abyte1);
     } else
     {//Ip4
         Inet4Address inet4address = Net.anyInet4Address(networkinterface);
         if(inet4address == null)
             throw new IOException("Network interface not configured for IPv4");
         int j = Net.inet4AsInt(inetaddress);
         int k = Net.inet4AsInt(inet4address);
         int i1 = inetaddress1 != null ? Net.inet4AsInt(inetaddress1) : 0;
	 //加入多播组
         int j1 = Net.join4(fd, j, k, i1);
         if(j1 == -2)
             throw new UnsupportedOperationException();
         obj1 = new MembershipKeyImpl.Type4(this, inetaddress, networkinterface, inetaddress1, j, k, i1);
     }
     //添加多播成员关系key到注册器
     registry.add(((MembershipKeyImpl) (obj1)));
     obj1;
     obj;
     JVM INSTR monitorexit ;
     return;
     Exception exception;
     exception;
     throw exception;
 }

以上方法有两点要关注
1.
if(inetaddress1.isAnyLocalAddress())//源地址含通配符,address == 0;
     throw new IllegalArgumentException("Source address is a wildcard address");
 if(inetaddress1.isMulticastAddress())//源地址为多播地址
     throw new IllegalArgumentException("Source address is multicast address");

//Inet4Address
public final class Inet4Address extends InetAddress {
    final static int INADDRSZ = 4;
     /**是否为多播地址
     * Utility routine to check if the InetAddress is an
     * IP multicast address. IP multicast address is a Class D
     * address i.e first four bits of the address are 1110.
     * @return a <code>boolean</code> indicating if the InetAddress is
     * an IP multicast address
     * @since   JDK1.1
     */
    public boolean isMulticastAddress() {
        return ((address & 0xf0000000) == 0xe0000000);
    }

    /**
    是否为统配符地址
     * Utility routine to check if the InetAddress in a wildcard address.
     * @return a <code>boolean</code> indicating if the Inetaddress is
     *         a wildcard address.
     * @since 1.4
     */
    public boolean isAnyLocalAddress() {
        return address == 0;
    }

    /**
     * Utility routine to check if the InetAddress is a loopback address.
     *是否为环路地址
     * @return a <code>boolean</code> indicating if the InetAddress is
     * a loopback address; or false otherwise.
     * @since 1.4
     */
    private static final int loopback = 2130706433; /* 127.0.0.1 */
    public boolean isLoopbackAddress() {
        /* 127.x.x.x */
        byte[] byteAddr = getAddress();
        return byteAddr[0] == 127;
    }
    ...
}

//InetAddress
public class InetAddress implements java.io.Serializable {
    /**
     * Specify the address family: Internet Protocol, Version 4
     * @since 1.4
     */
    static final int IPv4 = 1;

    /**
     * Specify the address family: Internet Protocol, Version 6
     * @since 1.4
     */
    static final int IPv6 = 2;

    /* Specify address family preference */
    static transient boolean preferIPv6Address = false;

    /**
     * @serial
     */
    String hostName;

    /**
     * Holds a 32-bit IPv4 address.
     *
     * @serial
     */
    int address;

    /**
     * Specifies the address family type, for instance, '1' for IPv4
     * addresses, and '2' for IPv6 addresses.
     *
     * @serial
     */
    int family;
}

2.
//加入多播组
int j1 = Net.join4(fd, j, k, i1);

//Net
static int join4(FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException
    {
        return joinOrDrop4(true, filedescriptor, i, j, k);
    }
 private static native int joinOrDrop4(boolean flag, FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException;


//加入多播组
int l = Net.join6(fd, abyte0, i, abyte1);

//Net
static int join6(FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException
    {
        return joinOrDrop6(true, filedescriptor, abyte0, i, abyte1);
    }
 private static native int joinOrDrop6(boolean flag, FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException;

从上面可以看出,报文通道加入多播组,首先检查加入的多播组地址是否正确,然后校验源地址,检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,源地址为inetaddress1的多播成员关系key,有则直接返回,否则根据网络协议族family,网络接口,源地址构造多播成员关系MembershipKeyImpl,添加到注册器MembershipRegistry。
再来看block方法

void block(MembershipKeyImpl membershipkeyimpl, InetAddress inetaddress)
        throws IOException
    {
        //如果断言,开启,则判断多播关系key通道是否为本通道
        if(!$assertionsDisabled && membershipkeyimpl.channel() != this)
            throw new AssertionError();
	//断言源地址是否为null
        if(!$assertionsDisabled && membershipkeyimpl.sourceAddress() != null)
            throw new AssertionError();
        synchronized(stateLock)
        {
	    //如果多播成员关系无效
            if(!membershipkeyimpl.isValid())
                throw new IllegalStateException("key is no longer valid");
            if(inetaddress.isAnyLocalAddress())//如果源地址为统配地址
                throw new IllegalArgumentException("Source address is a wildcard address");
            if(inetaddress.isMulticastAddress())//如果源地址为多播地址
                throw new IllegalArgumentException("Source address is multicast address");
            if(inetaddress.getClass() != membershipkeyimpl.group().getClass())//如果多播地址与源地址类型不同
                throw new IllegalArgumentException("Source address is different type to group");
            int i;
	    //如果为多播组为IP6
            if(membershipkeyimpl instanceof MembershipKeyImpl.Type6)
            {
                MembershipKeyImpl.Type6 type6 = (MembershipKeyImpl.Type6)membershipkeyimpl;
		//委托给net
                i = Net.block6(fd, type6.groupAddress(), type6.index(), Net.inet6AsByteArray(inetaddress));
            } else
            {
	        //如果为多播组为IP4
                MembershipKeyImpl.Type4 type4 = (MembershipKeyImpl.Type4)membershipkeyimpl;
		//委托给net
                i = Net.block4(fd, type4.groupAddress(), type4.interfaceAddress(), Net.inet4AsInt(inetaddress));
            }
            if(i == -2)
                throw new UnsupportedOperationException();
        }
    }

上面一个方法需要关注的为
1.
i = Net.block4(fd, type4.groupAddress(), type4.interfaceAddress(), Net.inet4AsInt(inetaddress));

//Net
static int block4(FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException
    {
        return blockOrUnblock4(true, filedescriptor, i, j, k);
    }
 private static native int blockOrUnblock4(boolean flag, FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException;

2.
i = Net.block6(fd, type6.groupAddress(), type6.index(), Net.inet6AsByteArray(inetaddress));

//Net
static int block6(FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException
    {
        return blockOrUnblock6(true, filedescriptor, abyte0, i, abyte1);
    }
 static native int blockOrUnblock6(boolean flag, FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException;

再来看unblock方法
 void unblock(MembershipKeyImpl membershipkeyimpl, InetAddress inetaddress)
    {
        //如果断言,开启,则判断多播关系key通道是否为本通道
        if(!$assertionsDisabled && membershipkeyimpl.channel() != this)
            throw new AssertionError();
	//断言源地址是否为null
        if(!$assertionsDisabled && membershipkeyimpl.sourceAddress() != null)
            throw new AssertionError();
        synchronized(stateLock)
        {
            if(!membershipkeyimpl.isValid())//如果多播成员关系无效
                throw new IllegalStateException("key is no longer valid");
            try
            {
                if(membershipkeyimpl instanceof MembershipKeyImpl.Type6)
                {
		    //如果为多播组为IP6
                    MembershipKeyImpl.Type6 type6 = (MembershipKeyImpl.Type6)membershipkeyimpl;
                    Net.unblock6(fd, type6.groupAddress(), type6.index(), Net.inet6AsByteArray(inetaddress));
                } else
                {
		   //如果为多播组为IP4
                    MembershipKeyImpl.Type4 type4 = (MembershipKeyImpl.Type4)membershipkeyimpl;
                    Net.unblock4(fd, type4.groupAddress(), type4.interfaceAddress(), Net.inet4AsInt(inetaddress));
                }
            }
            catch(IOException ioexception)
            {
                throw new AssertionError(ioexception);
            }
        }
    }

上面方法我们需要关注的是
1.
Net.unblock4(fd, type4.groupAddress(), type4.interfaceAddress(), Net.inet4AsInt(inetaddress));

//Net
static void unblock4(FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException
    {
        blockOrUnblock4(false, filedescriptor, i, j, k);
    }

2.
Net.unblock6(fd, type6.groupAddress(), type6.index(), Net.inet6AsByteArray(inetaddress));

//Net
static void unblock6(FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException
    {
        blockOrUnblock6(false, filedescriptor, abyte0, i, abyte1);
    }

从上面可以看出阻塞源地址报文与解除源地址报文阻塞,首先检查源地址,再将实际的阻塞与解除阻塞工作委托给Net完成。
再来看drop方法
//drop报文通道多播成员关系key
void drop(MembershipKeyImpl membershipkeyimpl)
    {
label0:
        {
	    //如果断言,开启,则判断多播关系key通道是否为本通道
            if(!$assertionsDisabled && membershipkeyimpl.channel() != this)
                throw new AssertionError();
            synchronized(stateLock)
            {
	        //如果多播成员关系key无效,调到label0
                if(membershipkeyimpl.isValid())
                    break label0;
            }
            return;
        }
        try
        {
            if(membershipkeyimpl instanceof MembershipKeyImpl.Type6)
            {
	        //如果为多播组为IP6
                MembershipKeyImpl.Type6 type6 = (MembershipKeyImpl.Type6)membershipkeyimpl;
                Net.drop6(fd, type6.groupAddress(), type6.index(), type6.source());
            } else
            {
	        //如果为多播组为IP6
                MembershipKeyImpl.Type4 type4 = (MembershipKeyImpl.Type4)membershipkeyimpl;
                Net.drop4(fd, type4.groupAddress(), type4.interfaceAddress(), type4.source());
            }
        }
        catch(IOException ioexception)
        {
            throw new AssertionError(ioexception);
        }
	//使多播成员关系key无效
        membershipkeyimpl.invalidate();
	//从报文通道注册器移除多播成员关系key
        registry.remove(membershipkeyimpl);
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }

drop方法需要关注的为:
1.
Net.drop4(fd, type4.groupAddress(), type4.interfaceAddress(), type4.source());

//Net
static void drop4(FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException
    {
        joinOrDrop4(false, filedescriptor, i, j, k);
    }

    private static native int joinOrDrop4(boolean flag, FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException;

2.
Net.drop6(fd, type6.groupAddress(), type6.index(), type6.source());

//Net
  
 static void drop6(FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException
    {
        joinOrDrop6(false, filedescriptor, abyte0, i, abyte1);
    }

    private static native int joinOrDrop6(boolean flag, FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException;

从上面可以看出drop方法,首先判断多播成员关系key是否有效,如果有效,判断多播组为ip4还是ip6,然后委托给Net完成实际的drop工作。

总结:

       join(报文通道加入多播组)方法,首先检查加入的多播组地址是否正确,然后校验源地址,检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,源地址为inetaddress1的多播成员关系key,有则直接返回,否则根据网络协议族family,网络接口,源地址构造多播成员关系MembershipKeyImpl,添加到注册器MembershipRegistry。
    阻塞源地址报文与解除源地址报文阻塞,首先检查源地址,再将实际的阻塞与解除阻塞工作委托给Net完成。
    drop方法,首先判断多播成员关系key是否有效,如果有效,判断多播组为ip4还是ip6,然后委托给Net完成实际的drop工作。

DatagramChannelImpl 解析四(地址绑定,关闭通道等):http://donald-draper.iteye.com/blog/2373519
分享到:
评论
3 楼 Donald_Draper 2019-06-18  
Donald_Draper 写道
刘落落cici 写道
能给我发一份这个类的源码吗DatagramChannelImpl
我想看一下 这个recive函数返回的这个发送方address是怎么确定的



https://github.com/Donaldhan/netty



java nio 反编译即可
2 楼 Donald_Draper 2019-05-28  
刘落落cici 写道
能给我发一份这个类的源码吗DatagramChannelImpl
我想看一下 这个recive函数返回的这个发送方address是怎么确定的



https://github.com/Donaldhan/netty
1 楼 刘落落cici 2018-05-23  
能给我发一份这个类的源码吗DatagramChannelImpl
我想看一下 这个recive函数返回的这个发送方address是怎么确定的

相关推荐

Global site tag (gtag.js) - Google Analytics