论坛首页 Java企业应用论坛

关键应用服务的集群技术模拟

浏览 2269 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2010-01-08  
集群技术,也就是俗称的Cluster,是为了保证某种应用不间断的服务而采取的一种技术手段。

主旨

服务运行在A,B两台机器上,其中一台为主用机,一台为备用机。备用机不断检测主用机的心跳信息。当发现主用机宕机或不能提供服务的时候,会自动转变为主用机,继续提供服务。

实现细节包括主备用机之间的心跳协议,物理线缆连接方式,以及虚拟出来的服务接口。

其他都还好说,不过虚拟服务接口的确不是很容易实现的。本文对虚拟服务接口技术做了模拟。

虚拟接口,实际是指虚拟的IP地址以及在这个IP地址接口侦听的应用服务。

由于客户机并不会自动改变请求服务的IP地址和端口,因此集群必须提供一个唯一的对外服务接口,并实现主备之间的无缝转换。

方案

虚拟接口我以为有两种办法:
第一种,在计算机上虚拟一块网卡出来,然后把服务绑定到这块网卡上;

第二种,把指向虚拟IP的IP协议包以及ARP协议包转到主机实际存在的网卡上。

我这里是使用了JPCAP工具,实现了第二种方案。

具体要做两件事:
  • 实现ARP协议的ARP_REPLY;
  • 转发IP包。

实现的时候发现,如果使用JPCAP直接转发ICMP包的话会出现问题。

ECHO_REPLY不能被识别,造成Ping不通的现象。

之后研究了ICMP的头结构,发现是在REPLY的时候不能自动填充id和seq,修改之后运转正常。

/**
 * Network Utilities
 */
package cn.sh.huang;

import jpcap.NetworkInterface;
import jpcap.NetworkInterfaceAddress;

public final class NetworkUtilities
{
    private NetworkUtilities()
    {

    }

    public static byte[] getMacBytes(String mac)
    {
        byte[] bytes = new byte[6];
        String[] hex = mac.split("-");
        if (hex.length != 6)
        {
            throw new IllegalArgumentException("Invalid MAC address.");
        }
        try
        {
            for (int i = 0; i < 6; i++)
            {
                bytes[i] = (byte) Integer.parseInt(hex[i], 16);
            }
        }
        catch (NumberFormatException ex)
        {
            throw new IllegalArgumentException("Invalid hex digit in MAC address.");
        }
        return bytes;
    }

    public static String getMacString(byte[] mac)
    {
        StringBuffer sb = new StringBuffer();
        for (byte hex : mac) {
            sb.append('-').append(((int)hex) & 0xFF);
        }
        return sb.substring(1);
    }

    public static NetworkInterface fetchDevice(NetworkInterface[] devices, String ip)
    {
        for (NetworkInterface device : devices)
        {
            for (NetworkInterfaceAddress address : device.addresses)
            {
                if (ip.equals(address.address.getHostAddress()))
                {
                    return device;
                }
            }
        }
        return null;
    }

}

/**
 * ClusterDaemon
 */
package cn.sh.huang;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import jpcap.JpcapCaptor;
import jpcap.JpcapSender;
import jpcap.NetworkInterface;
import jpcap.packet.ARPPacket;
import jpcap.packet.EthernetPacket;
import jpcap.packet.ICMPPacket;
import jpcap.packet.IPPacket;
import jpcap.packet.Packet;
import jpcap.packet.TCPPacket;

public class ClusterDaemon
{
    private static final String      PRV_IP        = "192.168.56.1";
    private static final String      PUB_IP        = "10.61.97.207";
    private static final String      NAT_IP        = "10.61.96.7";
    private static final String      MAP_IP_PREFIX = "192.168.111.";
    private static final InetAddress PRV_IP_ADDR;
    private static final InetAddress NAT_IP_ADDR;
    static
    {
        PRV_IP_ADDR = getIpAddr(PRV_IP);
        NAT_IP_ADDR = getIpAddr(NAT_IP);
    }

    private static InetAddress getIpAddr(String ip)
    {
        InetAddress ipAddr = null;
        try
        {
            ipAddr = InetAddress.getByName(ip);
        }
        catch (UnknownHostException ex)
        {
            ex.printStackTrace();
        }
        return ipAddr;
    }

    private final NetworkInterface    pubInterf, prvInterf;
    private final JpcapSender         pubSender, prvSender;
    private final JpcapCaptor         pubCaptor, prvCaptor;

    private final Map<String, byte[]> macMap;

    private ClusterDaemon() throws IOException
    {
        NetworkInterface[] devices = JpcapCaptor.getDeviceList();
        pubInterf = NetworkUtilities.fetchDevice(devices, PUB_IP);
        pubCaptor = JpcapCaptor.openDevice(pubInterf, 65535, false, 20);
        pubSender = pubCaptor.getJpcapSenderInstance();

        prvInterf = NetworkUtilities.fetchDevice(devices, PRV_IP);
        prvCaptor = JpcapCaptor.openDevice(prvInterf, 65535, false, 20);
        prvSender = prvCaptor.getJpcapSenderInstance();
        macMap = new HashMap<String, byte[]>();

        Thread pubIfListener = new Thread(new Runnable() {

            @Override
            public void run()
            {
                Packet pack;
                while (true)
                {
                    pack = pubCaptor.getPacket();
                    if (pack instanceof ARPPacket)
                    {
                        ARPPacket arpPack = (ARPPacket) pack;
                        // 若arp的目的为请求newIpAddr的MAC地址,则回应
                        if (Arrays.equals(arpPack.target_protoaddr, NAT_IP_ADDR.getAddress())
                                && arpPack.operation == ARPPacket.ARP_REQUEST)
                        {
                            // if (!Arrays.equals(arpPack.sender_protoaddr, pubIpAddr.getAddress())) // 避免自问自答
                            // {
                            pubSender.sendPacket(createArpPack(pubInterf.mac_address, NAT_IP_ADDR.getAddress(),
                                    arpPack.sender_hardaddr, arpPack.sender_protoaddr, pubInterf.mac_address,
                                    arpPack.sender_hardaddr, ARPPacket.ARP_REPLY));
                            // }
                        }
                    }
                    else if (pack instanceof IPPacket)
                    {
                        IPPacket ipPack = (IPPacket) pack;
                        if (!(ipPack instanceof TCPPacket || ipPack instanceof ICMPPacket))
                        {
                            continue;
                        }
                        // else
                        // {
                        // System.out.println("PubIf " + ipPack);
                        // }
                        InetAddress dstIp = ipPack.dst_ip;
                        InetAddress srcIp = ipPack.src_ip;

                        if (dstIp.equals(NAT_IP_ADDR))
                        {
                            // 转发给prvIf
                            EthernetPacket eth = (EthernetPacket) ipPack.datalink;
                            String srcIpAddr = srcIp.getHostAddress();
                            macMap.put(srcIpAddr, eth.src_mac);
                            macMap.put(NetworkUtilities.getMacString(eth.src_mac), ipPack.src_ip.getAddress());
                            String mapIp = MAP_IP_PREFIX + srcIpAddr.substring(srcIpAddr.lastIndexOf('.') + 1);
                            macMap.put(mapIp, eth.src_mac);
                            try
                            {
                                ipPack.src_ip = InetAddress.getByName(mapIp);
                            }
                            catch (UnknownHostException ex)
                            {
                                ex.printStackTrace();
                            }
                            eth.dst_mac = prvInterf.mac_address;
                            ipPack.dst_ip = PRV_IP_ADDR;
                            if (ipPack instanceof ICMPPacket) // ICMP包要注意id和seq不要漏掉(大概是JPCAP的bug)
                            {
                                ICMPPacket icmp = (ICMPPacket) ipPack;
                                if (icmp.id == 0 && icmp.seq == 0)
                                {
                                    icmp.id = (short) (icmp.header[38] * 256 + icmp.header[39]);
                                    icmp.seq = (short) (icmp.header[40] * 256 + icmp.header[41]);
                                }
                            }
                            prvSender.sendPacket(ipPack);
                        }
                    }
                }
            }
        });
        pubIfListener.start();
        Thread prvIfListener = new Thread(new Runnable() {

            @Override
            public void run()
            {
                Packet pack;
                while (true)
                {
                    pack = prvCaptor.getPacket();
                    if (pack instanceof ARPPacket)
                    {
                        ARPPacket arpPack = (ARPPacket) pack;
                        // 若arp的目的为请求mapIp的MAC地址,则回应
                        String fakeAddr = null;
                        try
                        {
                            fakeAddr = InetAddress.getByAddress(arpPack.target_protoaddr).getHostAddress();
                        }
                        catch (UnknownHostException ex)
                        {
                            ex.printStackTrace();
                        }
                        if (fakeAddr.startsWith(MAP_IP_PREFIX) && arpPack.operation == ARPPacket.ARP_REQUEST)
                        {
                            byte[] fakeMAC = macMap.get(fakeAddr);
                            prvSender.sendPacket(createArpPack(fakeMAC, arpPack.target_protoaddr,
                                    arpPack.sender_hardaddr, arpPack.sender_protoaddr, fakeMAC,
                                    arpPack.sender_hardaddr, ARPPacket.ARP_REPLY));
                        }
                    }
                    else if (pack instanceof IPPacket)
                    {
                        IPPacket ipPack = (IPPacket) pack;
                        if (!(ipPack instanceof TCPPacket || ipPack instanceof ICMPPacket))
                        {
                            continue;
                        }
                        // else
                        // {
                        // System.out.println("PrvIf " + ipPack);
                        // }

                        if (ipPack.dst_ip.getHostAddress().startsWith(MAP_IP_PREFIX)) // mapIp
                        {
                            // 转发给pubIf
                            EthernetPacket eth = (EthernetPacket) ipPack.datalink;
                            eth.src_mac = pubInterf.mac_address;
                            ipPack.src_ip = NAT_IP_ADDR;
                            try
                            {
                                ipPack.dst_ip = InetAddress.getByAddress(macMap.get(NetworkUtilities
                                        .getMacString(eth.dst_mac)));
                            }
                            catch (UnknownHostException ex)
                            {
                                ex.printStackTrace();
                            }
                            if (ipPack instanceof ICMPPacket)
                            {
                                ICMPPacket icmp = (ICMPPacket) ipPack;
                                if (icmp.id == 0 && icmp.seq == 0)
                                {
                                    icmp.id = (short) (icmp.header[38] * 256 + icmp.header[39]);
                                    icmp.seq = (short) (icmp.header[40] * 256 + icmp.header[41]);
                                }
                            }
                            pubSender.sendPacket(ipPack);
                        }
                    }
                }
            }
        });
        prvIfListener.start();
    }

    public static void main(String[] args) throws IOException
    {

        /**
         * <pre>
         * Thread thread = new Thread(new Runnable() {
         * 
         *     &#064;Override
         *     public void run()
         *     {
         *         while (true)
         *         {
         *             if (state == ArpState.REQUEST)
         *             {
         *                 ARPPacket arp = createArpPack(pubIf.mac_address, pubIpAddr, new byte[] {
         *                         0, 0, 0, 0, 0, 0
         *                 }, newIpAddr, pubIf.mac_address, new byte[] {
         *                         -1, -1, -1, -1, -1, -1
         *                 }, ARPPacket.ARP_REQUEST);
         *                 pubSender.sendPacket(arp);
         *                 try
         *                 {
         *                     Thread.sleep(30000);
         *                 }
         *                 catch (InterruptedException ex)
         *                 {
         *                     ex.printStackTrace();
         *                 }
         *             }
         *         }
         *     }
         * });
         * thread.setDaemon(true);
         * thread.start();
         * </pre>
         */
        new ClusterDaemon();
    }

    private static ARPPacket createArpPack(byte[] sender_hardaddr, byte[] sender_protoaddr, byte[] target_hardaddr,
            byte[] target_protoaddr, byte[] eth_src_mac, byte[] eth_dst_mac, short operation)
    {
        ARPPacket arp = new ARPPacket();
        arp.hardtype = ARPPacket.HARDTYPE_ETHER;
        arp.prototype = ARPPacket.PROTOTYPE_IP;
        arp.operation = operation;
        arp.hlen = 6;
        arp.plen = 4;
        arp.sender_hardaddr = sender_hardaddr; // 发送方的MAC地址
        arp.sender_protoaddr = sender_protoaddr; // 发送方协议地址
        arp.target_hardaddr = target_hardaddr; // 目标MAC地址
        arp.target_protoaddr = target_protoaddr; // 目标协议地址
        EthernetPacket eth = new EthernetPacket(); // 创建以太网头
        eth.frametype = EthernetPacket.ETHERTYPE_ARP; // 以太包类型
        eth.src_mac = eth_src_mac; // 以太源 MAC地址
        eth.dst_mac = eth_dst_mac; // 以太汇 MAC地址
        arp.datalink = eth; // 将以太头放在ARP包前
        return arp;
    }
    /**
     * <pre>
     * private static void receivePack() throws UnknownHostException
     * {
     *     Packet pack = null;
     *     int ident = 10000;
     *     while (true)
     *     {
     *         pack = pubCaptor.getPacket();
     *         if (pack instanceof IPPacket)
     *         {
     *             IPPacket ipPack = (IPPacket) pack;
     *             byte[] srcIp = ipPack.src_ip.getAddress();
     *             byte[] dstIp = ipPack.dst_ip.getAddress();
     *             if (Arrays.equals(dstIp, newIpAddr))
     *             {
     * 
     *             }
     *             else if (!Arrays.equals(dstIp, pubIpAddr))
     *             {
     * 
     *             }
     *             if (Arrays.equals(dstIp, newIpAddr) || Arrays.equals(srcIp, newIpAddr))
     *             {
     *                 if (ipPack instanceof ICMPPacket)
     *                 {
     *                     // System.out.println(ipPack);
     *                     if (Arrays.equals(dstIp, newIpAddr))
     *                     {
     *                         ICMPPacket icmpPack = (ICMPPacket) ipPack;
     *                         ICMPPacket icmp = new ICMPPacket();
     *                         icmp.type = ICMPPacket.ICMP_ECHOREPLY;
     *                         icmp.setIPv4Parameter(0, false, false, false, 0, false, false, false, 0, ident++, 128,
     *                                 IPPacket.IPPROTO_ICMP, InetAddress.getByName(newIp), icmpPack.src_ip);
     *                         long millisec = Calendar.getInstance().getTimeInMillis();
     *                         icmp.sec = millisec / 1000000;
     *                         icmp.usec = millisec % 1000000;
     *                         icmp.data = icmpPack.data;
     *                         EthernetPacket echoEther = (EthernetPacket) icmpPack.datalink;
     *                         EthernetPacket ether = new EthernetPacket();
     *                         ether.frametype = EthernetPacket.ETHERTYPE_IP;
     *                         ether.dst_mac = echoEther.src_mac;
     *                         ether.src_mac = pubIf.mac_address;
     *                         icmp.datalink = ether;
     *                         icmp.id = icmpPack.id;
     *                         icmp.seq = icmpPack.seq;
     *                         if (icmp.id == 0 &amp;&amp; icmp.seq == 0)
     *                         {
     *                             icmp.id = (short) (icmpPack.header[38] * 256 + icmpPack.header[39]);
     *                             icmp.seq = (short) (icmpPack.header[40] * 256 + icmpPack.header[41]);
     *                         }
     *                         // System.out.println(icmpPack);
     *                         pubSender.sendPacket(icmp);
     *                     }
     *                 }
     *                 // System.out.println(&quot;IPPacket:&quot;);
     *                 // System.out.println(ipPack);
     *             }
     *         }
     *         else if (pack instanceof ARPPacket)
     *         {
     *             ARPPacket arpPack = (ARPPacket) pack;
     *             // if (Arrays.equals(arpPack.sender_protoaddr, newIpAddr)
     *             // || Arrays.equals(arpPack.target_protoaddr, newIpAddr))
     *             // {
     *             // System.out.println(arpPack);
     *             // }
     *             if (Arrays.equals(arpPack.target_protoaddr, newIpAddr))
     *             {
     *                 if (!Arrays.equals(arpPack.sender_protoaddr, pubIpAddr))
     *                 {
     *                     ARPPacket arp = createArpPack(pubIf.mac_address, newIpAddr, arpPack.sender_hardaddr,
     *                             arpPack.sender_protoaddr, pubIf.mac_address, arpPack.sender_hardaddr, ARPPacket.ARP_REPLY);
     *                     pubSender.sendPacket(arp);
     *                     state = ArpState.REPLY;
     *                 }
     *             }
     *         }
     *         else
     *         {
     *             if (pack == null)
     *                 continue;
     *             // System.out.println(&quot;Unknown:&quot;);
     *             // System.out.println(pack);
     *         }
     *     }
     * }
     * </pre>
     */
}
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics