论坛首页 Java企业应用论坛

介绍一个很好用的NIO Socket开发框架--Netty2

浏览 30062 次
该帖已经被评为精华帖
作者 正文
   发表时间:2004-12-02  
开发一套稳定高效客户端-服务端Socket通讯服务不是一件很简单的事。

这几天试用了一下Netty2(http://gleamynode.net/dev/projects/netty2/)
感觉很不错,希望能与大家分享。

我们在做短信应用开发中,经常会碰到与移动、联通、小灵通网关进行Socket通讯,厂商提供的API效率不高,而自己开发的API费时、费力,花费了不少时间,稳定性还经常无法保证。

使用了Netty2,一方面能使开发框架更加清晰,也减少了不少开发时间。下面简单举个开发与小灵通短信网关通讯API的例子。

小灵通短信协议于SMPP、CMPP类似:
有login,submit,deliver,exit等数据包
每个数据包都由包头和包体组成。
其中包头有3个字段totalLength,commandId,sequenceId,每个字段占4个byte,共12个byte。
而包体根据协议都有不同的定义,详细请参考小灵通通讯协议。

下面举例如何使用Netty实现此通讯协议:
   发表时间:2004-12-02  
1、实现Netty的MessageRecognizer 接口
package net.smsfan.smg.api.phs20;

import java.nio.*;
import net.gleamynode.netty2.*;

public class PHSMessageRecognizer
    implements MessageRecognizer {
  public PHSMessageRecognizer(); {
  }

  public Message recognize(ByteBuffer buf); throws MessageParseException {

    // return null if message type is not arrived yet.
    if (buf.remaining(); < PHSMessage.lenMessageHeader);
      return null;

    int totalLength = buf.getInt();;
    int commandId = buf.getInt();;
    int sequenceId = buf.getInt();;

    switch (commandId); {
      case PHSLoginRep.COMMAND_ID:
        return new PHSLoginRep(totalLength,commandId,sequenceId);;
      default:
        throw new MessageParseException("unknown type: " + commandId);;
    }
  }
}


由于精力原因,只在这实现Login,LoginRep数据包
0 请登录后投票
   发表时间:2004-12-02  
2、实现通讯的协议包(PHSLogin,PHSLoginRep),需要继承Netty的Message接口,在这里由于协议包中有很多地方可以被抽象,所以还定义了一个PHSMessage的抽象类(在被注释的代码中,还残留了一些未用Netty之前自己用jdk io类时残留的一些代码 
package net.smsfan.smg.api.phs20;

import net.gleamynode.netty2.Message;
import java.nio.ByteBuffer;
import net.gleamynode.netty2.MessageParseException;

public abstract class PHSMessage  implements Message{

  // Max Content Length
  public static final int LEN_MAX_CONTENT_ASCII = 160;
  public static final int LEN_MAX_CONTENT = 140;

  //Max DestUsr_Tl
  public static final int MAX_DESTUSR_TL = 99;

  public static final int MAX_MSG_LEVEL = 9;
  public static final int MIN_MSG_LEVEL = 0;

  public static final int MSG_TYPE_ASCII = 0;
  public static final int MSG_TYPE_WRITECARD = 3;
  public static final int MSG_TYPE_BINARY = 4;
  public static final int MSG_TYPE_UCS2 = 8;
  public static final int MSG_TYPE_CHINESE = 15;

  public static final int DELIVER_TYPE_COMMAN_DELIVER = 0;
  public static final int DELIVER_TYPE_REPORT_DELIVER = 1;

  /* 消息头定义 */
  public static final int lenMessageHeader = 12;

  private int totalLength;
  private int commandId;
  private int sequenceId;

  public PHSMessage(int h, int i, int j); {
    totalLength = h;
    commandId = i;
    sequenceId = j;
  }

  public PHSMessage(int i); {
    totalLength = 0;
    commandId = i;
    PHSSeq seq = new PHSSeq();;
    sequenceId = seq.getSeq();;
  }

//  public PHSMessage(PHSIO cmppio); throws PHSException {
//    super.getPacket(cmppio, lenMessageHeader);;
//
//    totalLength = getInteger();;
//    commandId = getInteger();;
//    sequenceId = getInteger();;
//    int i = totalLength - lenMessageHeader;
//    if (i > 0);
//      super.getPacket(cmppio, i);;
//  }
//
//  protected void encodePacket(PHSIO cmppio); throws PHSException {
//    totalLength = getLength(); + lenMessageHeader;
//    insertInteger(sequenceId);;
//    insertInteger(commandId);;
//    insertInteger(totalLength);;
//    super.encodePacket(cmppio);;
//  }

  public int getCommandId(); {
    return commandId;
  }

  public void setCommandId(int commandId); {
    this.commandId = commandId;
  }

  public int getTotalLength(); {
    return totalLength;
  }

  public int getSequenceId(); {
    return sequenceId;
  }

  public void setSequenceId(int i); {
    sequenceId = i;
  }

  public void setTotalLength(int totalLength); {
    this.totalLength = totalLength;
  }

  protected abstract void readBody(ByteBuffer buf); throws MessageParseException;
  protected abstract void writeBody(ByteBuffer buf);;
  protected abstract void setTotalLength();;

  private void readHeader(ByteBuffer buf); throws MessageParseException {
    totalLength = buf.getInt();;
    commandId = buf.getInt();;
    sequenceId = buf.getInt();;
  }


  public boolean read(ByteBuffer buf); throws MessageParseException {
    if (buf.remaining(); < totalLength);
      return false;
    readHeader(buf);;
    int i = totalLength - lenMessageHeader;
    if (i > 0);{
      readBody(buf);;
    }
    return true;
  }

  public boolean write(ByteBuffer buf); {

    if (buf.remaining(); < totalLength);
      return false;
    setTotalLength();;
    writeHeader(buf);;
    writeBody(buf);;
    return true;
  }

  private void writeHeader(ByteBuffer buf); {
    buf.putInt(totalLength);;
    buf.putInt(commandId);;
    buf.putInt(sequenceId);;
  }

  protected byte[] getBytes(String str0, int LenStr0);{
    if (str0 == null);
      throw new IllegalArgumentException("insertString : null String !");;
    if (LenStr0 < 0);
      throw new IllegalArgumentException("insertStrings : LenStr0 can't < 0 !");;

    byte abyte1[] = new byte[LenStr0];
    for (int i = 0; i < abyte1.length; i++); {
      abyte1[i] = 0;
    }

    byte strbyte[] = str0.getBytes();;

    int m = str0.length(); > LenStr0 ? LenStr0 : str0.length();;

    for (int j = 0; j < m; j++); {
      abyte1[j] = strbyte[j];
    }
    return abyte1;
  }


}


package net.smsfan.smg.api.phs20;

import java.nio.ByteBuffer;
import net.gleamynode.netty2.MessageParseException;

public class PHSLogin
    extends PHSMessage {

  static final int COMMAND_ID = 1;

  static final int lenSourceAddr = 8;
  static final int lenAuthenticatorSP = 16;
  static final int lenLoginMode = 1;
  static final int lenVersion = 1;
  static final int lenTimestamp = 4;
  static final int lenMessageBody = lenSourceAddr + lenAuthenticatorSP +
      lenLoginMode + lenTimestamp +
      lenVersion;

  String sourceAddr;
  String authenticatorSP;
  byte loginMode = 0;
  int timestamp = 0;
  byte version = 0;

  public PHSLogin(String SourceAddr, String AuthenticatorSP, byte loginMode, byte Version); throws
      PHSException {
    super(COMMAND_ID);;
    setClientId(SourceAddr);;
    setAuthenticatorSP(AuthenticatorSP);;
    setVersion(Version);;
    setLoginMode(loginMode);;
    setTimestamp();;
  }

  public void setClientId(String SourceAddr); throws PHSException {
    if (SourceAddr == null);
      throw new PHSException("setSource_Addr : Source_Addr is null !");;
    else {
      this.sourceAddr = SourceAddr;
      return;
    }
  }

  public void setAuthenticatorSP(String AuthenticatorSP); throws
      PHSException {
    if (AuthenticatorSP == null);
      throw new PHSException(
          "setAuthenticatorSP : AuthenticatorSP is null !");;
    else {
      this.authenticatorSP = AuthenticatorSP;
      return;
    }
  }

  public void setLoginMode(byte loginMode); throws PHSException {
    this.loginMode = loginMode;
    return;
  }

  public void setVersion(byte Version); throws PHSException {

    this.version = Version;
    return;
  }

  private void setTimestamp(); throws PHSException {
    timestamp = PHSUtil.getTimestamp();;
    return;
  }

  public String getSourceAddr(); throws PHSException {
    return sourceAddr;
  }

  public String getAuthenticatorSP(); throws PHSException {
    return authenticatorSP;
  }

  public byte getVersion(); throws PHSException {
    return version;
  }

  public int getTimestamp(); throws PHSException {
    return timestamp;
  }

  protected void readBody(ByteBuffer buf); throws MessageParseException {
    throw new UnsupportedOperationException();;
  }

  protected void writeBody(ByteBuffer buf); {
//    if (sourceAddr == null);
//      throw new IllegalArgumentException("encodePacket : Source_Addr is null !");;
//    if (authenticatorSP == null);
//      throw new IllegalArgumentException("encodePacket : AuthenticatorSP is null !");;

    byte[] spaceString = {0x00,0x00,0x00,0x00,0x00,0x00,0x00};
    byte[] md5authenticatorSP = PHSUtil.hash(sourceAddr+new String(spaceString);+ authenticatorSP +  PHSUtil.zeroPadString(String.valueOf(timestamp);,10););;

    buf.put(getBytes(sourceAddr,this.lenSourceAddr););;
    buf.put(md5authenticatorSP);;
    buf.put(loginMode);;
    buf.putInt(timestamp);;
    buf.put(version);;
  }

  protected void setTotalLength(); {
    setTotalLength(lenMessageBody+lenMessageHeader);;
  }
}



package net.smsfan.smg.api.phs20;

import java.nio.ByteBuffer;
import net.gleamynode.netty2.MessageParseException;

public class PHSLoginRep
    extends PHSMessage {

  public static final int COMMAND_ID = 0x80000001;

  private static final int lenStatus = 4;
  private static final int lenAuthenticatorISMG = 16;
  private static final int lenVersion = 1;
  private static final int lenMessageBody = lenStatus + lenAuthenticatorISMG +
      lenVersion;

  private int status;
  private String authenticatorISMG;
  private byte version;

  public int getStatus(); {
    return status;
  }

  public String getAuthenticatorISMG(); {
    return authenticatorISMG;
  }

  public byte getVersion(); {
    return version;
  }

  public PHSLoginRep(int h, int i, int j); {
    super(h, i, j);;
  }

  protected void readBody(ByteBuffer buf); throws MessageParseException {
    status = buf.getInt();;
    byte[] auth = new byte[lenAuthenticatorISMG];
    buf.get(auth);;
    authenticatorISMG = new String(auth);;
    version = buf.get();;
  }

  protected void writeBody(ByteBuffer buf); {
  }

  protected void setTotalLength(); {
    throw new UnsupportedOperationException();;
  }
}

0 请登录后投票
   发表时间:2004-12-02  
OK,完成这三个类,对于Socket流数据与Message对象的转换都已完成。以后就是根据协议增加相应的Message类即可。

下面看看如何运行让它工作起来,实现Netty的SessionListener 接口,网络连接成功、信息接收、网络断开事件都可知晓:
package net.smsfan.smg.api.phs20;

import java.io.*;
import java.net.*;
import java.util.*;

import net.gleamynode.netty2.*;

public class PHSClient
    implements SessionListener {

  private static final int CONNECT_TIMEOUT = 30; // seconds
  private static final int DISPATCHER_THREAD_POOL_SIZE = 10;

  private Hashtable lockMap = new Hashtable();; //  Packet.seq --> Object
  private Hashtable waitPackets = new Hashtable();;
  private LinkedList listeners = new LinkedList();; //数据包接收者
  private boolean debug;
  private int status = 0; // 0 -- unlogin ; 1 -- login

  private IoProcessor ioProcessor;
  private ThreadPooledEventDispatcher eventDispatcher;
  private Session session;
  

  public PHSClient(String host, int port); throws IOException {
    debug = true;
    PHSClientInit(host, port);;
  }

  public void PHSClientInit(String host, int port); throws IOException {

    // initialize I/O processor and event dispatcher
    ioProcessor = new IoProcessor();;
    eventDispatcher = new OrderedEventDispatcher();;

    // start with the default number of I/O worker threads
    ioProcessor.start();;

    // start with a few event dispatcher threads
    eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE);;
    eventDispatcher.start();;

    // prepare message recognizer
    MessageRecognizer recognizer = new PHSMessageRecognizer();;

    // create a client session
    session = new Session(ioProcessor, new InetSocketAddress(
        host, port);, recognizer, eventDispatcher);;

    // set configuration
    session.getConfig();.setConnectTimeout(CONNECT_TIMEOUT);;

    // suscribe and start communication
    session.addSessionListener(this);;

    log("Connecting to " + session.getSocketAddress();, debug);;
    session.start();;
  }

  public PHSLoginRep login(PHSLogin login, long waitTime); throws
      PHSException {
    PHSLoginRep rep = (PHSLoginRep); putRequestForRep(login, waitTime);;
    if (rep == null);return null;
    if (rep.getStatus(); == 0); {
      this.status = 1;
    }
    return rep;
  }

  public void close(); {
    try {
      lockMap.clear();;
      waitPackets.clear();;
      listeners.clear();;
    }
    catch (Exception ex1); {
    }
    // stop I/O processor and event dispatcher
    eventDispatcher.stop();;
    ioProcessor.stop();;
  }

  private PHSMessage putRequestForRep(PHSMessage msg, long waitTime); throws
      PHSException {
    if (msg == null || waitTime < 0);return null;

    Object lock = new Object();;
    lockMap.put(new Integer(msg.getSequenceId(););, lock);;
    putRequest(msg);;

    PHSMessage resp = removeWaitPacket(new Integer(msg.getSequenceId();););;
    if (resp == null); {
      synchronized (lock); {
        try {
          lock.wait(waitTime);;
        }
        catch (InterruptedException ex); {
        }
      }
      resp = removeWaitPacket(new Integer(msg.getSequenceId();););;
    }
    lockMap.remove(new Integer(msg.getSequenceId();););;
    return resp;
  }

  public void addWaitPacket(PHSMessage packet); {
    waitPackets.put(new Integer(packet.getSequenceId(););, packet);;
  }

  public PHSMessage removeWaitPacket(Integer seq); {
    return (PHSMessage); waitPackets.remove(seq);;
  }

  private void putRequest(PHSMessage msg); throws PHSException {
    session.write(msg);;
  }

  protected void finalize(); {
    close();;
  }

  public boolean addListener(PHSClientListener listener); {
    return listeners.add(listener);;
  }

  public boolean removeListener(PHSClientListener listener); {
    return listeners.remove(listener);;
  }

  private void log(String str, boolean debug); {
    if (debug);
      System.out.println(str);;
  }

  private void fireConnected(); {
    final Object[] alisteners = listeners.toArray();;
    final int size = alisteners.length;
    for (int i = 0; i < size; i++); {
      try {
        ( (PHSClientListener); alisteners[i]);.onClientConnected();;
      }
      catch (Exception ex1); {
      }
    }
  }

  private void fireDisconnected(); {
    final Object[] alisteners = listeners.toArray();;
    final int size = alisteners.length;
    for (int i = 0; i < size; i++); {
      try {
        ( (PHSClientListener); alisteners[i]);.onClientDisconnected();;
      }
      catch (Exception ex1); {
      }
    }
  }

  public boolean isConnected(); {
    return session.isConnected();;
  }

  // 实现 SessionListener 接口
  public void connectionEstablished(Session session); {
     fireConnected();;
   }
 
   public void connectionClosed(Session session); {
     fireDisconnected();;
   }
 
   public void messageReceived(Session session, Message message); {
     PHSMessage msg = (PHSMessage); message;
     if ( (msg.getCommandId(); & 0x80000000); != 0); {
       if (msg != null &&
           lockMap.containsKey(new Integer(msg.getSequenceId();););); {
         addWaitPacket(msg);;
         Object lock = lockMap.remove(new Integer(msg.getSequenceId();););;
         synchronized (lock); {
           lock.notify();;
         }
         return;
       }
     }
 
     switch (msg.getCommandId();); {
       default: {
         log("unknow Message=" + msg.getCommandId();, debug);;
       }
     }
   }
 
   public void messageSent(Session session, Message message); {
   }
 
   public void sessionIdle(Session session); {
   }
 
   public void exceptionCaught(Session session, Throwable throwable); {
     throwable.printStackTrace();;
   }  
  
}

0 请登录后投票
   发表时间:2004-12-02  
最后。写个Example类去运行:
package net.smsfan.smg.api.phs20.example;

import java.io.*;

import net.smsfan.smg.api.phs20.*;
import java.util.Properties;

public class TestClient
    implements PHSClientListener {

  private String serverHost = "218.66.104.104";
  private int serverPort = 8890;
  private String name = "";
  private String password = "";

  private PHSClient client;

  public TestClient(); throws IOException, PHSException {
    loadProperties();;
    makeConnection();;
    client.addListener(this);;
  }

  /**
   * loadProperties
   */
  private void loadProperties(); {
    Properties props = new Properties();;
    try {
      FileInputStream fileinputstream = new FileInputStream("smg.properties");;
      props = new Properties();;
      props.load(fileinputstream);;
      fileinputstream.close();;
    }
    catch (Exception e); {
      System.out.println(
          "Unable to read configuration file,Please set smg.properties in work dir!");;
      System.exit(0);;
    }
    serverHost = props.getProperty("serverHost");;
    serverPort = Integer.parseInt(props.getProperty("serverPort"););;
    name = props.getProperty("name");;
    password = props.getProperty("password");;
  }

  public void connect(); throws Exception {
    String strSource_Addr = name;
    String strAuthenticatorSP = password;
    byte version = 0;
    byte loginMode = 2;
    PHSLogin login = new PHSLogin(strSource_Addr, strAuthenticatorSP, loginMode,
                                  version);;
    PHSLoginRep rep = client.login(login, 10000);;
    if (rep == null); {
      System.out.println("No response packet !");;
      return;
    }
    if (rep.getCommandId(); != 0x80000001);
      System.out.println("Invalid command id !");;
    if (rep.getSequenceId(); != login.getSequenceId(););
      System.out.println("invalid sequence id !");;
    System.out.println("Status = " + rep.getStatus(););;
    System.out.println("AuthenticatorISMG = " +
                       rep.getAuthenticatorISMG(););;
    System.out.println("Version = " + rep.getVersion(););;
  }

  public static void main(String args[]); {
    InputStreamReader CMPPCommand = new InputStreamReader(System.in);;
    BufferedReader buffCMPPCommand = new BufferedReader(CMPPCommand);;
    try {
      TestClient cmppclient = new TestClient();;
      String s;
      while ( (s = buffCMPPCommand.readLine();); != null); {
        if (s.trim();.equals("connect");); {
          System.out.println(
              "Attempting to Send PHS_Login Command to Server ......");;
          cmppclient.connect();;
          continue;
        }

        if (s.trim();.equals("quit");); {
          System.out.println("CMPP Client quit .");;
          System.exit(1);;
        }

        System.out.println("unrecognised command !");;

      }
    }
    catch (Exception exception); {
      exception.printStackTrace();;
    }
  }

  private synchronized void makeConnection(); {
    if (client != null); {
      if (client.isConnected(););return;
      client.close();;
      client = null;
    }

    boolean isConnected = false;
    try {
      while (!isConnected); {
        try {
          client = new PHSClient(serverHost, serverPort);;
          isConnected = true;
        }
        catch (Exception ex1); {
          ex1.printStackTrace();;
        }
      }
      client.addListener(this);;

      boolean isSentConnected = false;
      while (!isSentConnected); {
        if (client.isConnected();); {
          connect();;
          isSentConnected = true;
        }
      }
    }
    catch (Exception ex); {
      ex.printStackTrace();;
    }
  }

  public void onClientDisconnected(); {
    System.out.println("Client Disconnected!!!Reconnect ... ");;
    makeConnection();;
  }

  public void onClientConnected(); {
  }

}


0 请登录后投票
   发表时间:2004-12-02  
下一步,考虑如何将它 与 Spring 结合起来,成为Spring的一个组件。相信这做起来一定很简单
0 请登录后投票
   发表时间:2004-12-03  
Netty2是一个不错的NIO框架。另外如果有兴趣的话,你可以试一下http://sourceforge.net/projects/cindy/,支持SocketChannel/ServerSocketChannel/DatagramChannel/Pipe,还模拟了MulticastChannel,使得应用可以用同一个模型访问TCP/UDP。
0 请登录后投票
   发表时间:2004-12-04  
THX,我去试试看看,主要目的是想找到一个开发IM软件服务端的网络通讯框架。
0 请登录后投票
   发表时间:2004-12-04  
Cindy 怎么这么象 Netty?
0 请登录后投票
   发表时间:2004-12-04  
Cindy最主要的构想是读完Java NIO这本书中的一个例子产生的,随后查找了当时所能够找到的Opensource的NIO实现,Netty2也是其中之一,所以Cindy中MessageRecognizer这个类其实是从Netty2中学习到的。

但是我对NIO的理解和Netty2有所不一样,要不就加入它的开发组了:)从Cindy 1.0版本可以看出,其实Cindy 1.0支持多种基于NIO的模型。但经过几个实际项目的考验,我发觉一些模型在实际使用中并没有什么优势,所以经过精简后保留了最常用的,并加入了UDP和Pipe的支持。

Cindy是从http://sourceforge.net/projects/java-jml中抽取出来的,最早它只是用来做这个MSN类库的基础平台,因为我的项目中单独使用这个NIO框架比较多,就把它抽取到Cindy这个项目中来了。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics