该帖已经被评为精华帖
|
|
---|---|
作者 | 正文 |
发表时间:2004-12-02
这几天试用了一下Netty2(http://gleamynode.net/dev/projects/netty2/) 感觉很不错,希望能与大家分享。 我们在做短信应用开发中,经常会碰到与移动、联通、小灵通网关进行Socket通讯,厂商提供的API效率不高,而自己开发的API费时、费力,花费了不少时间,稳定性还经常无法保证。 使用了Netty2,一方面能使开发框架更加清晰,也减少了不少开发时间。下面简单举个开发与小灵通短信网关通讯API的例子。 小灵通短信协议于SMPP、CMPP类似: 有login,submit,deliver,exit等数据包 每个数据包都由包头和包体组成。 其中包头有3个字段totalLength,commandId,sequenceId,每个字段占4个byte,共12个byte。 而包体根据协议都有不同的定义,详细请参考小灵通通讯协议。 下面举例如何使用Netty实现此通讯协议: 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2004-12-02
1、实现Netty的MessageRecognizer 接口
package net.smsfan.smg.api.phs20; import java.nio.*; import net.gleamynode.netty2.*; public class PHSMessageRecognizer implements MessageRecognizer { public PHSMessageRecognizer(); { } public Message recognize(ByteBuffer buf); throws MessageParseException { // return null if message type is not arrived yet. if (buf.remaining(); < PHSMessage.lenMessageHeader); return null; int totalLength = buf.getInt();; int commandId = buf.getInt();; int sequenceId = buf.getInt();; switch (commandId); { case PHSLoginRep.COMMAND_ID: return new PHSLoginRep(totalLength,commandId,sequenceId);; default: throw new MessageParseException("unknown type: " + commandId);; } } } 由于精力原因,只在这实现Login,LoginRep数据包 |
|
返回顶楼 | |
发表时间:2004-12-02
2、实现通讯的协议包(PHSLogin,PHSLoginRep),需要继承Netty的Message接口,在这里由于协议包中有很多地方可以被抽象,所以还定义了一个PHSMessage的抽象类(在被注释的代码中,还残留了一些未用Netty之前自己用jdk io类时残留的一些代码 )
package net.smsfan.smg.api.phs20; import net.gleamynode.netty2.Message; import java.nio.ByteBuffer; import net.gleamynode.netty2.MessageParseException; public abstract class PHSMessage implements Message{ // Max Content Length public static final int LEN_MAX_CONTENT_ASCII = 160; public static final int LEN_MAX_CONTENT = 140; //Max DestUsr_Tl public static final int MAX_DESTUSR_TL = 99; public static final int MAX_MSG_LEVEL = 9; public static final int MIN_MSG_LEVEL = 0; public static final int MSG_TYPE_ASCII = 0; public static final int MSG_TYPE_WRITECARD = 3; public static final int MSG_TYPE_BINARY = 4; public static final int MSG_TYPE_UCS2 = 8; public static final int MSG_TYPE_CHINESE = 15; public static final int DELIVER_TYPE_COMMAN_DELIVER = 0; public static final int DELIVER_TYPE_REPORT_DELIVER = 1; /* 消息头定义 */ public static final int lenMessageHeader = 12; private int totalLength; private int commandId; private int sequenceId; public PHSMessage(int h, int i, int j); { totalLength = h; commandId = i; sequenceId = j; } public PHSMessage(int i); { totalLength = 0; commandId = i; PHSSeq seq = new PHSSeq();; sequenceId = seq.getSeq();; } // public PHSMessage(PHSIO cmppio); throws PHSException { // super.getPacket(cmppio, lenMessageHeader);; // // totalLength = getInteger();; // commandId = getInteger();; // sequenceId = getInteger();; // int i = totalLength - lenMessageHeader; // if (i > 0); // super.getPacket(cmppio, i);; // } // // protected void encodePacket(PHSIO cmppio); throws PHSException { // totalLength = getLength(); + lenMessageHeader; // insertInteger(sequenceId);; // insertInteger(commandId);; // insertInteger(totalLength);; // super.encodePacket(cmppio);; // } public int getCommandId(); { return commandId; } public void setCommandId(int commandId); { this.commandId = commandId; } public int getTotalLength(); { return totalLength; } public int getSequenceId(); { return sequenceId; } public void setSequenceId(int i); { sequenceId = i; } public void setTotalLength(int totalLength); { this.totalLength = totalLength; } protected abstract void readBody(ByteBuffer buf); throws MessageParseException; protected abstract void writeBody(ByteBuffer buf);; protected abstract void setTotalLength();; private void readHeader(ByteBuffer buf); throws MessageParseException { totalLength = buf.getInt();; commandId = buf.getInt();; sequenceId = buf.getInt();; } public boolean read(ByteBuffer buf); throws MessageParseException { if (buf.remaining(); < totalLength); return false; readHeader(buf);; int i = totalLength - lenMessageHeader; if (i > 0);{ readBody(buf);; } return true; } public boolean write(ByteBuffer buf); { if (buf.remaining(); < totalLength); return false; setTotalLength();; writeHeader(buf);; writeBody(buf);; return true; } private void writeHeader(ByteBuffer buf); { buf.putInt(totalLength);; buf.putInt(commandId);; buf.putInt(sequenceId);; } protected byte[] getBytes(String str0, int LenStr0);{ if (str0 == null); throw new IllegalArgumentException("insertString : null String !");; if (LenStr0 < 0); throw new IllegalArgumentException("insertStrings : LenStr0 can't < 0 !");; byte abyte1[] = new byte[LenStr0]; for (int i = 0; i < abyte1.length; i++); { abyte1[i] = 0; } byte strbyte[] = str0.getBytes();; int m = str0.length(); > LenStr0 ? LenStr0 : str0.length();; for (int j = 0; j < m; j++); { abyte1[j] = strbyte[j]; } return abyte1; } } package net.smsfan.smg.api.phs20; import java.nio.ByteBuffer; import net.gleamynode.netty2.MessageParseException; public class PHSLogin extends PHSMessage { static final int COMMAND_ID = 1; static final int lenSourceAddr = 8; static final int lenAuthenticatorSP = 16; static final int lenLoginMode = 1; static final int lenVersion = 1; static final int lenTimestamp = 4; static final int lenMessageBody = lenSourceAddr + lenAuthenticatorSP + lenLoginMode + lenTimestamp + lenVersion; String sourceAddr; String authenticatorSP; byte loginMode = 0; int timestamp = 0; byte version = 0; public PHSLogin(String SourceAddr, String AuthenticatorSP, byte loginMode, byte Version); throws PHSException { super(COMMAND_ID);; setClientId(SourceAddr);; setAuthenticatorSP(AuthenticatorSP);; setVersion(Version);; setLoginMode(loginMode);; setTimestamp();; } public void setClientId(String SourceAddr); throws PHSException { if (SourceAddr == null); throw new PHSException("setSource_Addr : Source_Addr is null !");; else { this.sourceAddr = SourceAddr; return; } } public void setAuthenticatorSP(String AuthenticatorSP); throws PHSException { if (AuthenticatorSP == null); throw new PHSException( "setAuthenticatorSP : AuthenticatorSP is null !");; else { this.authenticatorSP = AuthenticatorSP; return; } } public void setLoginMode(byte loginMode); throws PHSException { this.loginMode = loginMode; return; } public void setVersion(byte Version); throws PHSException { this.version = Version; return; } private void setTimestamp(); throws PHSException { timestamp = PHSUtil.getTimestamp();; return; } public String getSourceAddr(); throws PHSException { return sourceAddr; } public String getAuthenticatorSP(); throws PHSException { return authenticatorSP; } public byte getVersion(); throws PHSException { return version; } public int getTimestamp(); throws PHSException { return timestamp; } protected void readBody(ByteBuffer buf); throws MessageParseException { throw new UnsupportedOperationException();; } protected void writeBody(ByteBuffer buf); { // if (sourceAddr == null); // throw new IllegalArgumentException("encodePacket : Source_Addr is null !");; // if (authenticatorSP == null); // throw new IllegalArgumentException("encodePacket : AuthenticatorSP is null !");; byte[] spaceString = {0x00,0x00,0x00,0x00,0x00,0x00,0x00}; byte[] md5authenticatorSP = PHSUtil.hash(sourceAddr+new String(spaceString);+ authenticatorSP + PHSUtil.zeroPadString(String.valueOf(timestamp);,10););; buf.put(getBytes(sourceAddr,this.lenSourceAddr););; buf.put(md5authenticatorSP);; buf.put(loginMode);; buf.putInt(timestamp);; buf.put(version);; } protected void setTotalLength(); { setTotalLength(lenMessageBody+lenMessageHeader);; } } package net.smsfan.smg.api.phs20; import java.nio.ByteBuffer; import net.gleamynode.netty2.MessageParseException; public class PHSLoginRep extends PHSMessage { public static final int COMMAND_ID = 0x80000001; private static final int lenStatus = 4; private static final int lenAuthenticatorISMG = 16; private static final int lenVersion = 1; private static final int lenMessageBody = lenStatus + lenAuthenticatorISMG + lenVersion; private int status; private String authenticatorISMG; private byte version; public int getStatus(); { return status; } public String getAuthenticatorISMG(); { return authenticatorISMG; } public byte getVersion(); { return version; } public PHSLoginRep(int h, int i, int j); { super(h, i, j);; } protected void readBody(ByteBuffer buf); throws MessageParseException { status = buf.getInt();; byte[] auth = new byte[lenAuthenticatorISMG]; buf.get(auth);; authenticatorISMG = new String(auth);; version = buf.get();; } protected void writeBody(ByteBuffer buf); { } protected void setTotalLength(); { throw new UnsupportedOperationException();; } } |
|
返回顶楼 | |
发表时间:2004-12-02
OK,完成这三个类,对于Socket流数据与Message对象的转换都已完成。以后就是根据协议增加相应的Message类即可。
下面看看如何运行让它工作起来,实现Netty的SessionListener 接口,网络连接成功、信息接收、网络断开事件都可知晓: package net.smsfan.smg.api.phs20; import java.io.*; import java.net.*; import java.util.*; import net.gleamynode.netty2.*; public class PHSClient implements SessionListener { private static final int CONNECT_TIMEOUT = 30; // seconds private static final int DISPATCHER_THREAD_POOL_SIZE = 10; private Hashtable lockMap = new Hashtable();; // Packet.seq --> Object private Hashtable waitPackets = new Hashtable();; private LinkedList listeners = new LinkedList();; //数据包接收者 private boolean debug; private int status = 0; // 0 -- unlogin ; 1 -- login private IoProcessor ioProcessor; private ThreadPooledEventDispatcher eventDispatcher; private Session session; public PHSClient(String host, int port); throws IOException { debug = true; PHSClientInit(host, port);; } public void PHSClientInit(String host, int port); throws IOException { // initialize I/O processor and event dispatcher ioProcessor = new IoProcessor();; eventDispatcher = new OrderedEventDispatcher();; // start with the default number of I/O worker threads ioProcessor.start();; // start with a few event dispatcher threads eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE);; eventDispatcher.start();; // prepare message recognizer MessageRecognizer recognizer = new PHSMessageRecognizer();; // create a client session session = new Session(ioProcessor, new InetSocketAddress( host, port);, recognizer, eventDispatcher);; // set configuration session.getConfig();.setConnectTimeout(CONNECT_TIMEOUT);; // suscribe and start communication session.addSessionListener(this);; log("Connecting to " + session.getSocketAddress();, debug);; session.start();; } public PHSLoginRep login(PHSLogin login, long waitTime); throws PHSException { PHSLoginRep rep = (PHSLoginRep); putRequestForRep(login, waitTime);; if (rep == null);return null; if (rep.getStatus(); == 0); { this.status = 1; } return rep; } public void close(); { try { lockMap.clear();; waitPackets.clear();; listeners.clear();; } catch (Exception ex1); { } // stop I/O processor and event dispatcher eventDispatcher.stop();; ioProcessor.stop();; } private PHSMessage putRequestForRep(PHSMessage msg, long waitTime); throws PHSException { if (msg == null || waitTime < 0);return null; Object lock = new Object();; lockMap.put(new Integer(msg.getSequenceId(););, lock);; putRequest(msg);; PHSMessage resp = removeWaitPacket(new Integer(msg.getSequenceId();););; if (resp == null); { synchronized (lock); { try { lock.wait(waitTime);; } catch (InterruptedException ex); { } } resp = removeWaitPacket(new Integer(msg.getSequenceId();););; } lockMap.remove(new Integer(msg.getSequenceId();););; return resp; } public void addWaitPacket(PHSMessage packet); { waitPackets.put(new Integer(packet.getSequenceId(););, packet);; } public PHSMessage removeWaitPacket(Integer seq); { return (PHSMessage); waitPackets.remove(seq);; } private void putRequest(PHSMessage msg); throws PHSException { session.write(msg);; } protected void finalize(); { close();; } public boolean addListener(PHSClientListener listener); { return listeners.add(listener);; } public boolean removeListener(PHSClientListener listener); { return listeners.remove(listener);; } private void log(String str, boolean debug); { if (debug); System.out.println(str);; } private void fireConnected(); { final Object[] alisteners = listeners.toArray();; final int size = alisteners.length; for (int i = 0; i < size; i++); { try { ( (PHSClientListener); alisteners[i]);.onClientConnected();; } catch (Exception ex1); { } } } private void fireDisconnected(); { final Object[] alisteners = listeners.toArray();; final int size = alisteners.length; for (int i = 0; i < size; i++); { try { ( (PHSClientListener); alisteners[i]);.onClientDisconnected();; } catch (Exception ex1); { } } } public boolean isConnected(); { return session.isConnected();; } // 实现 SessionListener 接口 public void connectionEstablished(Session session); { fireConnected();; } public void connectionClosed(Session session); { fireDisconnected();; } public void messageReceived(Session session, Message message); { PHSMessage msg = (PHSMessage); message; if ( (msg.getCommandId(); & 0x80000000); != 0); { if (msg != null && lockMap.containsKey(new Integer(msg.getSequenceId();););); { addWaitPacket(msg);; Object lock = lockMap.remove(new Integer(msg.getSequenceId();););; synchronized (lock); { lock.notify();; } return; } } switch (msg.getCommandId();); { default: { log("unknow Message=" + msg.getCommandId();, debug);; } } } public void messageSent(Session session, Message message); { } public void sessionIdle(Session session); { } public void exceptionCaught(Session session, Throwable throwable); { throwable.printStackTrace();; } } |
|
返回顶楼 | |
发表时间:2004-12-02
最后。写个Example类去运行:
package net.smsfan.smg.api.phs20.example; import java.io.*; import net.smsfan.smg.api.phs20.*; import java.util.Properties; public class TestClient implements PHSClientListener { private String serverHost = "218.66.104.104"; private int serverPort = 8890; private String name = ""; private String password = ""; private PHSClient client; public TestClient(); throws IOException, PHSException { loadProperties();; makeConnection();; client.addListener(this);; } /** * loadProperties */ private void loadProperties(); { Properties props = new Properties();; try { FileInputStream fileinputstream = new FileInputStream("smg.properties");; props = new Properties();; props.load(fileinputstream);; fileinputstream.close();; } catch (Exception e); { System.out.println( "Unable to read configuration file,Please set smg.properties in work dir!");; System.exit(0);; } serverHost = props.getProperty("serverHost");; serverPort = Integer.parseInt(props.getProperty("serverPort"););; name = props.getProperty("name");; password = props.getProperty("password");; } public void connect(); throws Exception { String strSource_Addr = name; String strAuthenticatorSP = password; byte version = 0; byte loginMode = 2; PHSLogin login = new PHSLogin(strSource_Addr, strAuthenticatorSP, loginMode, version);; PHSLoginRep rep = client.login(login, 10000);; if (rep == null); { System.out.println("No response packet !");; return; } if (rep.getCommandId(); != 0x80000001); System.out.println("Invalid command id !");; if (rep.getSequenceId(); != login.getSequenceId();); System.out.println("invalid sequence id !");; System.out.println("Status = " + rep.getStatus(););; System.out.println("AuthenticatorISMG = " + rep.getAuthenticatorISMG(););; System.out.println("Version = " + rep.getVersion(););; } public static void main(String args[]); { InputStreamReader CMPPCommand = new InputStreamReader(System.in);; BufferedReader buffCMPPCommand = new BufferedReader(CMPPCommand);; try { TestClient cmppclient = new TestClient();; String s; while ( (s = buffCMPPCommand.readLine();); != null); { if (s.trim();.equals("connect");); { System.out.println( "Attempting to Send PHS_Login Command to Server ......");; cmppclient.connect();; continue; } if (s.trim();.equals("quit");); { System.out.println("CMPP Client quit .");; System.exit(1);; } System.out.println("unrecognised command !");; } } catch (Exception exception); { exception.printStackTrace();; } } private synchronized void makeConnection(); { if (client != null); { if (client.isConnected(););return; client.close();; client = null; } boolean isConnected = false; try { while (!isConnected); { try { client = new PHSClient(serverHost, serverPort);; isConnected = true; } catch (Exception ex1); { ex1.printStackTrace();; } } client.addListener(this);; boolean isSentConnected = false; while (!isSentConnected); { if (client.isConnected();); { connect();; isSentConnected = true; } } } catch (Exception ex); { ex.printStackTrace();; } } public void onClientDisconnected(); { System.out.println("Client Disconnected!!!Reconnect ... ");; makeConnection();; } public void onClientConnected(); { } } |
|
返回顶楼 | |
发表时间:2004-12-02
下一步,考虑如何将它 与 Spring 结合起来,成为Spring的一个组件。相信这做起来一定很简单
|
|
返回顶楼 | |
发表时间:2004-12-03
Netty2是一个不错的NIO框架。另外如果有兴趣的话,你可以试一下http://sourceforge.net/projects/cindy/,支持SocketChannel/ServerSocketChannel/DatagramChannel/Pipe,还模拟了MulticastChannel,使得应用可以用同一个模型访问TCP/UDP。
|
|
返回顶楼 | |
发表时间:2004-12-04
THX,我去试试看看,主要目的是想找到一个开发IM软件服务端的网络通讯框架。
|
|
返回顶楼 | |
发表时间:2004-12-04
Cindy 怎么这么象 Netty?
|
|
返回顶楼 | |
发表时间:2004-12-04
Cindy最主要的构想是读完Java NIO这本书中的一个例子产生的,随后查找了当时所能够找到的Opensource的NIO实现,Netty2也是其中之一,所以Cindy中MessageRecognizer这个类其实是从Netty2中学习到的。
但是我对NIO的理解和Netty2有所不一样,要不就加入它的开发组了:)从Cindy 1.0版本可以看出,其实Cindy 1.0支持多种基于NIO的模型。但经过几个实际项目的考验,我发觉一些模型在实际使用中并没有什么优势,所以经过精简后保留了最常用的,并加入了UDP和Pipe的支持。 Cindy是从http://sourceforge.net/projects/java-jml中抽取出来的,最早它只是用来做这个MSN类库的基础平台,因为我的项目中单独使用这个NIO框架比较多,就把它抽取到Cindy这个项目中来了。 |
|
返回顶楼 | |