- 浏览: 25204 次
- 性别:
- 来自: 深圳
文章分类
- 全部博客 (18)
- Ajax+DWR (0)
- CallCenter+Asterisk (0)
- EJB+RMI (0)
- Hadoop并行计算 (0)
- HTML+CSS (0)
- J2EE (0)
- Java基础 (2)
- JMS (0)
- JS+jQuery (2)
- JSF (0)
- Linux+Shell+Python (0)
- NoSQL+MongoDB+Cassandra (0)
- Spider+Lucene+solr+nutch (2)
- Spring (0)
- SQL+MySQL (0)
- Web Service+Hessian (0)
- 个人生活 (0)
- 其他 (0)
- 学习心得 (1)
- 工作感悟 (5)
- 服务器+Tomcat+Jetty (0)
- 缓存+Memcached+Redis (1)
- 网络开发+Mina+Netty (3)
- 设计模式 (2)
- 重构 (0)
最新评论
-
xuzhou530:
为什么int length = buffer.readInt( ...
使用Netty实现通用二进制协议的高效数据传输 -
xuzhou530:
我使用你上面的代码启动服务端没有问题,启动客户端出现了下面问题 ...
使用Netty实现通用二进制协议的高效数据传输 -
xuzhou530:
哦哦,谢谢,了
使用Netty实现通用二进制协议的高效数据传输 -
drager:
<div class="quote_title ...
使用Netty实现通用二进制协议的高效数据传输 -
xuzhou530:
XLCharSetFactory这个类没有呀,这个类干嘛的
使用Netty实现通用二进制协议的高效数据传输
Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。
Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty
本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。
一个好的协议有两个标准:
(1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。
(2)传输数据和业务对象之间的转换速度要快。
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
一、协议的定义
无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。
(1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte)
(2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
数据格式定义:
字段1键名长度 字段1键名 字段1值长度 字段1值
字段2键名长度 字段2键名 字段2值长度 字段2值
字段3键名长度 字段3键名 字段3值长度 字段3值
… … … …
长度为整型,占4个字节
代码中用两个Vo对象来表示:XLRequest和XLResponse。
2
3import java.util.HashMap;
4import java.util.Map;
5
6/** *//**
7 * @author hankchen
10 * 2012-2-3 下午02:46:52
11 */
12
13
14/** *//**
15 * 响应数据
16 */
17
18/** *//**
19 * 通用协议介绍
20 *
21 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
22 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
23 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
24 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
25 * 数据格式定义:
26 * 字段1键名长度 字段1键名 字段1值长度 字段1值
27 * 字段2键名长度 字段2键名 字段2值长度 字段2值
28 * 字段3键名长度 字段3键名 字段3值长度 字段3值
29 * … … … …
30 * 长度为整型,占4个字节
31 */
32public class XLResponse {
33 private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
34 private byte encrypt;// 加密类型。0表示不加密
35 private byte extend1;// 用于扩展协议。暂未定义任何值
36 private byte extend2;// 用于扩展协议。暂未定义任何值
37 private int sessionid;// 会话ID
38 private int result;// 结果码
39 private int length;// 数据包长
40
41 private Map<String,String> values=new HashMap<String, String>();
42
43 private String ip;
44
45 public void setValue(String key,String value){
46 values.put(key, value);
47 }
48
49 public String getValue(String key){
50 if (key==null) {
51 return null;
52 }
53 return values.get(key);
54 }
55
56 public byte getEncode() {
57 return encode;
58 }
59
60 public void setEncode(byte encode) {
61 this.encode = encode;
62 }
63
64 public byte getEncrypt() {
65 return encrypt;
66 }
67
68 public void setEncrypt(byte encrypt) {
69 this.encrypt = encrypt;
70 }
71
72 public byte getExtend1() {
73 return extend1;
74 }
75
76 public void setExtend1(byte extend1) {
77 this.extend1 = extend1;
78 }
79
80 public byte getExtend2() {
81 return extend2;
82 }
83
84 public void setExtend2(byte extend2) {
85 this.extend2 = extend2;
86 }
87
88 public int getSessionid() {
89 return sessionid;
90 }
91
92 public void setSessionid(int sessionid) {
93 this.sessionid = sessionid;
94 }
95
96 public int getResult() {
97 return result;
98 }
99
100 public void setResult(int result) {
101 this.result = result;
102 }
103
104 public int getLength() {
105 return length;
106 }
107
108 public void setLength(int length) {
109 this.length = length;
110 }
111
112 public Map<String, String> getValues() {
113 return values;
114 }
115
116 public String getIp() {
117 return ip;
118 }
119
120 public void setIp(String ip) {
121 this.ip = ip;
122 }
123
124 public void setValues(Map<String, String> values) {
125 this.values = values;
126 }
127
128 @Override
129 public String toString() {
130 return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
131 + ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]";
132 }
133}
2
3import java.util.HashMap;
4import java.util.Map;
5
6/** *//**
7 * @author hankchen
8 * 2012-2-3 下午02:46:41
9 */
10
11/** *//**
12 * 请求数据
13 */
14
15/** *//**
16 * 通用协议介绍
17 *
18 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
19 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
20 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
21 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
22 * 数据格式定义:
23 * 字段1键名长度 字段1键名 字段1值长度 字段1值
24 * 字段2键名长度 字段2键名 字段2值长度 字段2值
25 * 字段3键名长度 字段3键名 字段3值长度 字段3值
26 * … … … …
27 * 长度为整型,占4个字节
28 */
29public class XLRequest {
30 private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
31 private byte encrypt;// 加密类型。0表示不加密
32 private byte extend1;// 用于扩展协议。暂未定义任何值
33 private byte extend2;// 用于扩展协议。暂未定义任何值
34 private int sessionid;// 会话ID
35 private int command;// 命令
36 private int length;// 数据包长
37
38 private Map<String,String> params=new HashMap<String, String>(); //参数
39
40 private String ip;
41
42 public byte getEncode() {
43 return encode;
44 }
45
46 public void setEncode(byte encode) {
47 this.encode = encode;
48 }
49
50 public byte getEncrypt() {
51 return encrypt;
52 }
53
54 public void setEncrypt(byte encrypt) {
55 this.encrypt = encrypt;
56 }
57
58 public byte getExtend1() {
59 return extend1;
60 }
61
62 public void setExtend1(byte extend1) {
63 this.extend1 = extend1;
64 }
65
66 public byte getExtend2() {
67 return extend2;
68 }
69
70 public void setExtend2(byte extend2) {
71 this.extend2 = extend2;
72 }
73
74 public int getSessionid() {
75 return sessionid;
76 }
77
78 public void setSessionid(int sessionid) {
79 this.sessionid = sessionid;
80 }
81
82 public int getCommand() {
83 return command;
84 }
85
86 public void setCommand(int command) {
87 this.command = command;
88 }
89
90 public int getLength() {
91 return length;
92 }
93
94 public void setLength(int length) {
95 this.length = length;
96 }
97
98 public Map<String, String> getParams() {
99 return params;
100 }
101
102 public void setValue(String key,String value){
103 params.put(key, value);
104 }
105
106 public String getValue(String key){
107 if (key==null) {
108 return null;
109 }
110 return params.get(key);
111 }
112
113 public String getIp() {
114 return ip;
115 }
116
117 public void setIp(String ip) {
118 this.ip = ip;
119 }
120
121 public void setParams(Map<String, String> params) {
122 this.params = params;
123 }
124
125 @Override
126 public String toString() {
127 return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
128 + ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]";
129 }
130}
131
二、协议的编码和解码
对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。
2
3import java.nio.ByteBuffer;
4
5import org.jboss.netty.buffer.ChannelBuffer;
6import org.jboss.netty.buffer.ChannelBuffers;
7import org.jboss.netty.channel.ChannelHandlerContext;
8import org.jboss.netty.channel.Channels;
9import org.jboss.netty.channel.MessageEvent;
10import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
11import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
12import org.jboss.netty.example.xlsvr.vo.XLResponse;
13import org.slf4j.Logger;
14import org.slf4j.LoggerFactory;
15
16/** *//**
17 * @author hankchen
18 * 2012-2-3 上午10:48:15
19 */
20
21/** *//**
22 * 服务器端编码器
23 */
24public class XLServerEncoder extends SimpleChannelDownstreamHandler {
25 Logger logger=LoggerFactory.getLogger(XLServerEncoder.class);
26
27 @Override
28 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
29 XLResponse response=(XLResponse)e.getMessage();
30 ByteBuffer headBuffer=ByteBuffer.allocate(16);
31 /** *//**
32 * 先组织报文头
33 */
34 headBuffer.put(response.getEncode());
35 headBuffer.put(response.getEncrypt());
36 headBuffer.put(response.getExtend1());
37 headBuffer.put(response.getExtend2());
38 headBuffer.putInt(response.getSessionid());
39 headBuffer.putInt(response.getResult());
40
41 /** *//**
42 * 组织报文的数据部分
43 */
44 ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues());
45 int length=dataBuffer.readableBytes();
46 headBuffer.putInt(length);
47 /** *//**
48 * 非常重要
49 * ByteBuffer需要手动flip(),ChannelBuffer不需要
50 */
51 headBuffer.flip();
52 ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer();
53 totalBuffer.writeBytes(headBuffer);
54 logger.info("totalBuffer size="+totalBuffer.readableBytes());
55 totalBuffer.writeBytes(dataBuffer);
56 logger.info("totalBuffer size="+totalBuffer.readableBytes());
57 Channels.write(ctx, e.getFuture(), totalBuffer);
58 }
59
60}
61
2
3import org.jboss.netty.buffer.ChannelBuffer;
4import org.jboss.netty.buffer.ChannelBuffers;
5import org.jboss.netty.channel.Channel;
6import org.jboss.netty.channel.ChannelHandlerContext;
7import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
8import org.jboss.netty.example.xlsvr.vo.XLResponse;
9import org.jboss.netty.handler.codec.frame.FrameDecoder;
10
11/** *//**
12 * @author hankchen
13 * 2012-2-3 上午10:47:54
14 */
15
16/** *//**
17 * 客户端解码器
18 */
19public class XLClientDecoder extends FrameDecoder {
20
21 @Override
22 protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception {
23 if (buffer.readableBytes()<16) {
24 return null;
25 }
26 buffer.markReaderIndex();
27 byte encode=buffer.readByte();
28 byte encrypt=buffer.readByte();
29 byte extend1=buffer.readByte();
30 byte extend2=buffer.readByte();
31 int sessionid=buffer.readInt();
32 int result=buffer.readInt();
33 int length=buffer.readInt(); // 数据包长
34 if (buffer.readableBytes()<length) {
35 buffer.resetReaderIndex();
36 return null;
37 }
38 ChannelBuffer dataBuffer=ChannelBuffers.buffer(length);
39 buffer.readBytes(dataBuffer, length);
40
41 XLResponse response=new XLResponse();
42 response.setEncode(encode);
43 response.setEncrypt(encrypt);
44 response.setExtend1(extend1);
45 response.setExtend2(extend2);
46 response.setSessionid(sessionid);
47 response.setResult(result);
48 response.setLength(length);
49 response.setValues(ProtocolUtil.decode(encode, dataBuffer));
50 response.setIp(ProtocolUtil.getClientIp(channel));
51 return response;
52 }
53
54}
2
3import java.net.SocketAddress;
4import java.nio.charset.Charset;
5import java.util.HashMap;
6import java.util.Map;
7import java.util.Map.Entry;
8
9import org.jboss.netty.buffer.ChannelBuffer;
10import org.jboss.netty.buffer.ChannelBuffers;
11import org.jboss.netty.channel.Channel;
12
13/** *//**
14 * @author hankchen
15 * 2012-2-4 下午01:57:33
16 */
17public class ProtocolUtil {
18
19 /** *//**
20 * 编码报文的数据部分
21 * @param encode
22 * @param values
23 * @return
24 */
25 public static ChannelBuffer encode(int encode,Map<String,String> values){
26 ChannelBuffer totalBuffer=null;
27 if (values!=null && values.size()>0) {
28 totalBuffer=ChannelBuffers.dynamicBuffer();
29 int length=0,index=0;
30 ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()];
31 Charset charset=XLCharSetFactory.getCharset(encode);
32 for(Entry<String,String> entry:values.entrySet()){
33 String key=entry.getKey();
34 String value=entry.getValue();
35 ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();
36 buffer.writeInt(key.length());
37 buffer.writeBytes(key.getBytes(charset));
38 buffer.writeInt(value.length());
39 buffer.writeBytes(value.getBytes(charset));
40 channelBuffers[index++]=buffer;
41 length+=buffer.readableBytes();
42 }
43
44 for (int i = 0; i < channelBuffers.length; i++) {
45 totalBuffer.writeBytes(channelBuffers[i]);
46 }
47 }
48 return totalBuffer;
49 }
50
51 /** *//**
52 * 解码报文的数据部分
53 * @param encode
54 * @param dataBuffer
55 * @return
56 */
57 public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){
58 Map<String,String> dataMap=new HashMap<String, String>();
59 if (dataBuffer!=null && dataBuffer.readableBytes()>0) {
60 int processIndex=0,length=dataBuffer.readableBytes();
61 Charset charset=XLCharSetFactory.getCharset(encode);
62 while(processIndex<length){
63 /** *//**
64 * 获取Key
65 */
66 int size=dataBuffer.readInt();
67 byte [] contents=new byte [size];
68 dataBuffer.readBytes(contents);
69 String key=new String(contents, charset);
70 processIndex=processIndex+size+4;
71 /** *//**
72 * 获取Value
73 */
74 size=dataBuffer.readInt();
75 contents=new byte [size];
76 dataBuffer.readBytes(contents);
77 String value=new String(contents, charset);
78 dataMap.put(key, value);
79 processIndex=processIndex+size+4;
80 }
81 }
82 return dataMap;
83 }
84
85 /** *//**
86 * 获取客户端IP
87 * @param channel
88 * @return
89 */
90 public static String getClientIp(Channel channel){
91 /** *//**
92 * 获取客户端IP
93 */
94 SocketAddress address = channel.getRemoteAddress();
95 String ip = "";
96 if (address != null) {
97 ip = address.toString().trim();
98 int index = ip.lastIndexOf(':');
99 if (index < 1) {
100 index = ip.length();
101 }
102 ip = ip.substring(1, index);
103 }
104 if (ip.length() > 15) {
105 ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15));
106 }
107 return ip;
108 }
109}
110
三、服务器端实现
服务器端提供的功能是:
1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。
2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。
为了展示多协议的运用,这里客户端的请求采用的是基于问本行(\n\r)的协议。
具体代码如下:
2
3import java.net.InetSocketAddress;
4import java.util.concurrent.Executors;
5
6import org.jboss.netty.bootstrap.ServerBootstrap;
7import org.jboss.netty.channel.Channel;
8import org.jboss.netty.channel.ChannelPipeline;
9import org.jboss.netty.channel.group.ChannelGroup;
10import org.jboss.netty.channel.group.ChannelGroupFuture;
11import org.jboss.netty.channel.group.DefaultChannelGroup;
12import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
13import org.jboss.netty.example.xlsvr.codec.XLServerEncoder;
14import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
15import org.jboss.netty.handler.codec.frame.Delimiters;
16import org.jboss.netty.handler.codec.string.StringDecoder;
17import org.jboss.netty.util.CharsetUtil;
18import org.slf4j.Logger;
19import org.slf4j.LoggerFactory;
20
21/** *//**
22 * @author hankchen
23 * 2012-1-30 下午03:21:38
24 */
25
26public class XLServer {
27 public static final int port =8080;
28 public static final Logger logger=LoggerFactory.getLogger(XLServer.class);
29 public static final ChannelGroup allChannels=new DefaultChannelGroup("XLServer");
30 private static final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
31
32 public static void main(String [] args){
33 try {
34 XLServer.startup();
35 } catch (Exception e) {
36 e.printStackTrace();
37 }
38 }
39
40 public static boolean startup() throws Exception{
41 /** *//**
42 * 采用默认ChannelPipeline管道
43 * 这意味着同一个XLServerHandler实例将被多个Channel通道共享
44 * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能!
45 */
46 ChannelPipeline pipeline=serverBootstrap.getPipeline();
47 /** *//**
48 * 解码器是基于文本行的协议,\r\n或者\n\r
49 */
50 pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
51 pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
52 pipeline.addLast("encoder", new XLServerEncoder());
53 pipeline.addLast("handler", new XLServerHandler());
54
55 serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀
56 serverBootstrap.setOption("child.keepAlive", true); //注意child前缀
57
58 /** *//**
59 * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象
60 */
61 Channel channel=serverBootstrap.bind(new InetSocketAddress(port));
62 allChannels.add(channel);
63 logger.info("server is started on port "+port);
64 return false;
65 }
66
67 public static void shutdown() throws Exception{
68 try {
69 /** *//**
70 * 主动关闭服务器
71 */
72 ChannelGroupFuture future=allChannels.close();
73 future.awaitUninterruptibly();//阻塞,直到服务器关闭
74 //serverBootstrap.releaseExternalResources();
75 } catch (Exception e) {
76 e.printStackTrace();
77 logger.error(e.getMessage(),e);
78 }
79 finally{
80 logger.info("server is shutdown on port "+port);
81 System.exit(1);
82 }
83 }
84}
85
2
3import java.util.Random;
4
5import org.jboss.netty.channel.Channel;
6import org.jboss.netty.channel.ChannelFuture;
7import org.jboss.netty.channel.ChannelHandlerContext;
8import org.jboss.netty.channel.ChannelHandler.Sharable;
9import org.jboss.netty.channel.ChannelStateEvent;
10import org.jboss.netty.channel.ExceptionEvent;
11import org.jboss.netty.channel.MessageEvent;
12import org.jboss.netty.channel.SimpleChannelHandler;
13import org.jboss.netty.example.xlsvr.vo.XLResponse;
14import org.slf4j.Logger;
15import org.slf4j.LoggerFactory;
16
17/** *//**
18 * @author hankchen
19 * 2012-1-30 下午03:22:24
20 */
21
22@Sharable
23public class XLServerHandler extends SimpleChannelHandler {
24 private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class);
25
26 @Override
27 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
28 logger.info("messageReceived");
29 if (e.getMessage() instanceof String) {
30 String content=(String)e.getMessage();
31 logger.info("content is "+content);
32 if ("shutdown".equalsIgnoreCase(content)) {
33 //e.getChannel().close();
34 XLServer.shutdown();
35 }else {
36 sendResponse(ctx);
37 }
38 }else {
39 logger.error("message is not a String.");
40 e.getChannel().close();
41 }
42 }
43
44 @Override
45 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
46 logger.error(e.getCause().getMessage(),e.getCause());
47 e.getCause().printStackTrace();
48 e.getChannel().close();
49 }
50
51 @Override
52 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
53 logger.info("channelConnected");
54 sendResponse(ctx);
55 }
56
57 @Override
58 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
59 logger.info("channelClosed");
60 //删除通道
61 XLServer.allChannels.remove(e.getChannel());
62 }
63
64 @Override
65 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
66 logger.info("channelDisconnected");
67 super.channelDisconnected(ctx, e);
68 }
69
70 @Override
71 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
72 logger.info("channelOpen");
73 //增加通道
74 XLServer.allChannels.add(e.getChannel());
75 }
76
77 /** *//**
78 * 发送响应内容
79 * @param ctx
80 * @param e
81 * @return
82 */
83 private ChannelFuture sendResponse(ChannelHandlerContext ctx){
84 Channel channel=ctx.getChannel();
85 Random random=new Random();
86 XLResponse response=new XLResponse();
87 response.setEncode((byte)0);
88 response.setResult(1);
89 response.setValue("name","hankchen");
90 response.setValue("time", String.valueOf(System.currentTimeMillis()));
91 response.setValue("age",String.valueOf(random.nextInt()));
92 /** *//**
93 * 发送接收信息的时间戳到客户端
94 * 注意:Netty中所有的IO操作都是异步的!
95 */
96 ChannelFuture future=channel.write(response); //发送内容
97 return future;
98 }
99}
100
四、客户端实现
客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。
关键代码如下:
2 * Copyright (C): 2012
3 * @author hankchen
4 * 2012-1-30 下午03:21:26
5 */
6
7/** *//**
8 * 服务器特征:
9 * 1、使用专用解码器解析服务器发过来的数据
10 * 2、客户端主动关闭连接
11 */
12public class XLClient {
13 public static final int port =XLServer.port;
14 public static final String host ="localhost";
15 private static final Logger logger=LoggerFactory.getLogger(XLClient.class);
16 private static final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
17 private static final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory);
18
19 /** *//**
20 * @param args
21 * @throws Exception
22 */
23 public static void main(String[] args) throws Exception {
24 ChannelFuture future=XLClient.startup();
25 logger.info("future state is "+future.isSuccess());
26 }
27
28 /** *//**
29 * 启动客户端
30 * @return
31 * @throws Exception
32 */
33 public static ChannelFuture startup() throws Exception {
34 /** *//**
35 * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式
36 * 例如,下面的代码形式是错误的:
37 * ChannelPipeline pipeline=clientBootstrap.getPipeline();
38 * pipeline.addLast("handler", new XLClientHandler());
39 */
40 clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置
41 /** *//**
42 * 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象
43 */
44 clientBootstrap.setOption("tcpNoDelay", true);
45 clientBootstrap.setOption("keepAlive", true);
46
47 ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port));
48 /** *//**
49 * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态
50 */
51 future.awaitUninterruptibly();
52 /** *//**
53 * 如果连接失败,我们将打印连接失败的原因。
54 * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。
55 */
56 if (!future.isSuccess()) {
57 future.getCause().printStackTrace();
58 }else {
59 logger.info("client is connected to server "+host+":"+port);
60 }
61 return future;
62 }
63
64 /** *//**
65 * 关闭客户端
66 * @param future
67 * @throws Exception
68 */
69 public static void shutdown(ChannelFuture future) throws Exception{
70 try {
71 /** *//**
72 * 主动关闭客户端连接,会阻塞等待直到通道关闭
73 */
74 future.getChannel().close().awaitUninterruptibly();
75 //future.getChannel().getCloseFuture().awaitUninterruptibly();
76 /** *//**
77 * 释放ChannelFactory通道工厂使用的资源。
78 * 这一步仅需要调用 releaseExternalResources()方法即可。
79 * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。
80 */
81 clientBootstrap.releaseExternalResources();
82 } catch (Exception e) {
83 e.printStackTrace();
84 logger.error(e.getMessage(),e);
85 }
86 finally{
87 System.exit(1);
88 logger.info("client is shutdown to server "+host+":"+port);
89 }
90 }
91}
2
3 @Override
4 public ChannelPipeline getPipeline() throws Exception {
5 ChannelPipeline pipeline=Channels.pipeline();
6 /** *//**
7 * 使用专用的解码器,解决数据分段的问题
8 * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。
9 */
10 pipeline.addLast("decoder", new XLClientDecoder());
11 /** *//**
12 * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了!
13 */
14 pipeline.addLast("handler", new XLClientHandler());
15 return pipeline;
16 }
17
18}
2 * Copyright (C): 2012
3 * @author hankchen
4 * 2012-1-30 下午03:21:52
5 */
6
7/** *//**
8 * 服务器特征:
9 * 1、使用专用的编码解码器,解决数据分段的问题
10 * 2、使用POJO替代ChannelBuffer传输
11 */
12public class XLClientHandler extends SimpleChannelHandler {
13 private static final Logger logger=LoggerFactory.getLogger(XLClientHandler.class);
14 private final AtomicInteger count=new AtomicInteger(0); //计数器
15
16 @Override
17 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
18 processMethod1(ctx, e); //处理方式一
19 }
20
21 /** *//**
22 * @param ctx
23 * @param e
24 * @throws Exception
25 */
26 public void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
27 logger.info("processMethod1……,count="+count.addAndGet(1));
28 XLResponse serverTime=(XLResponse)e.getMessage();
29 logger.info("messageReceived,content:"+serverTime.toString());
30 Thread.sleep(1000);
31
32 if (count.get()<10) {
33 //从新发送请求获取最新的服务器时间
34 ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again\r\n".getBytes()));
35 }else{
36 //从新发送请求关闭服务器
37 ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown\r\n".getBytes()));
38 }
39 }
40
41 @Override
42 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
43 logger.info("exceptionCaught");
44 e.getCause().printStackTrace();
45 ctx.getChannel().close();
46 }
47
48 @Override
49 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
50 logger.info("channelClosed");
51 super.channelClosed(ctx, e);
52 }
53
54
55}
全文代码较多,写了很多注释,希望对读者有用,谢谢!
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
评论
int length = buffer.readInt(); // 数据包长
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return null;
}
一直都会进入这里?
INFO ( XLServer) server is started on port 8080
INFO (XLServerHandler) channelOpen
INFO (XLServerHandler) channelConnected
接受到信息writeRequested:XLResponse [encode=0, encrypt=0, extend=0, extend =0, sessionid=0, result=1, length=0, values={time=1374653003832, age=-182129588, name=hankchen}, ip=null]
ERROR(XLServerHandler) charsetName
java.lang.NullPointerException: charsetName
at com.factory.XLCharSetFactory.getCharset(XLCharSetFactory.java:25)
at com.util.ProtocolUtil.encode(ProtocolUtil.java:33)
at com.codec.XLServerEncoder.writeRequested(XLServerEncoder.java:46)
at org.jboss.netty.channel.Channels.write(Channels.java:611)
at org.jboss.netty.channel.Channels.write(Channels.java:578)
at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
at com.handle.XLServerHandler.sendResponse(XLServerHandler.java:104)
at com.handle.XLServerHandler.channelConnected(XLServerHandler.java:58)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66)
at org.jboss.netty.channel.Channels.fireChannelConnected(Channels.java:233)
at org.jboss.netty.channel.socket.nio.NioAcceptedSocketChannel.<init>(NioAcceptedSocketChannel.java:53)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink$Boss.registerAcceptedChannel(NioServerSocketPipelineSink.java:285)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink$Boss.run(NioServerSocketPipelineSink.java:249)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
我给你补充一下这个类吧
package org.jboss.netty.example.xlsvr.factory;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
public class XLCharSetFactory {
static String[] charsetArray={"UTF-8","GBK","GB2312","ISO8859-1"};
/**
* :0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
* @param encode
* @return
* @throws InstantiationException
* @throws IllegalAccessException
* @throws ClassNotFoundException
*/
public static Charset getCharset(int encode) {
Charset charset=null;
if (encode == 0)
throw new NullPointerException("charsetName");
try {
switch(encode)
{
case 0:
charset = Charset.forName(charsetArray[0]);
break;
case 1:
charset = Charset.forName(charsetArray[1]);
break;
case 2:
charset = Charset.forName(charsetArray[2]);
break;
case 3:
charset = Charset.forName(charsetArray[3]);
break;
}
} catch (IllegalCharsetNameException e) {
e.printStackTrace();
} catch (UnsupportedCharsetException e) {
e.printStackTrace();
}
return charset;
}
}
相关推荐
在面对需要处理大量并发连接和高效数据传输的场景时,传统的通用协议和实现往往无法满足需求。例如,HTTP 服务器可能不适应大规模文件传输或实时消息传递的需求。这时,Netty 提供了一个解决方案,它允许开发者定制...
RTU模式是MODBUS协议的一种变体,它使用二进制格式传输数据,不包含每个字符的起始和结束标识,因此具有更高的传输效率。在RTU模式下,数据以连续的8位字节流形式发送,每两个字节之间用1.5个停止位隔开,以确保数据...
Netty的出现解决了通用协议库在大规模、高并发场景下的局限性,使得开发者能够快速构建针对特定需求的高度优化的协议实现。 Netty的核心特性在于它的异步事件驱动模型,这一模型基于Java的非阻塞I/O(NIO)API。...
在Dubbo中,传输层扮演着数据传输媒介的角色,确保请求和响应能够在分布式环境中准确无误地传递。在这个实现中,Dubbo选择了Netty作为基础网络库,因为Netty是一个高性能、异步事件驱动的网络应用框架,适用于快速...
- **JSON**:通用的数据交换格式,易于阅读和编写,但相对于二进制序列化协议,效率较低。 - **Hessian**:轻量级的二进制RPC协议,适合内部服务间通信。 - **Msgpack**:另一种高效的二进制序列化协议。 在选择...
Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...
Java 中将对象转化为 byte 数组的方法是指将 Java 对象序列化为二进制数据流,以便在网络上传输或存储。这种方法可以应用于各种领域,如分布式计算、网络通信、数据存储等。 在 Java 中,对象转化为 byte 数组通常...
1. **网络通信**:Hadoop Common包含了一套基于Socket的网络通信组件,如Netty,用于节点间的高效数据传输。这些组件支持TCP/IP协议,确保了Hadoop集群内的节点能够快速、稳定地进行数据交互。 2. **I/O处理**:...
Dubbo默认使用Hessian作为序列化框架,这是一种二进制协议,具有轻量级、快速的特点。此外,还可以选择Duddo、FastJson和Java自带的序列化机制。Hessian通过HTTP POST方式传输数据,将辅助信息封装在HTTP头中,核心...
- **dubbo-common**:这是一个通用模块,包含了Dubbo框架共用的工具类,如IO处理、日志、配置、类处理等,以及线程池、二进制编码、JSON处理、数据存储接口等。这个模块为其他模块提供了基础支持。 - **dubbo-rpc-...
protobuf是Google的一种数据序列化协议,可以将结构化数据转化为二进制格式,便于网络传输。Java开发者可以通过protobuf编译器(protoc)将.proto文件转换为Java代码,其中包含了用于GRPC通信的Stub类。 2. **...
- **网络通信**: Hadoop Common提供了网络通信库,如Socket和Netty,用于节点间的通信,如数据传输、心跳检查和任务调度。 - **安全机制**: 支持Kerberos等认证和授权机制,以确保Hadoop集群的安全性。 - **日志...
Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...
Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...
Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...
Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...