`
菜菜土人
  • 浏览: 12619 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

使用Netty实现通用二进制协议的高效数据传输

 
阅读更多

http://www.blogjava.net/hankchen/archive/2012/02/04/369378.html

Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。

Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty

本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。

一个好的协议有两个标准:

(1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。

(2)传输数据和业务对象之间的转换速度要快。

(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen

一、协议的定义

无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。

(1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte)
(2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
数据格式定义:
字段1键名长度 字段1键名 字段1值长度 字段1值
字段2键名长度 字段2键名 字段2值长度 字段2值
字段3键名长度 字段3键名 字段3值长度 字段3值
… … … …
长度为整型,占4个字节

代码中用两个Vo对象来表示:XLRequest和XLResponse。

1packageorg.jboss.netty.example.xlsvr.vo;
2
3importjava.util.HashMap;
4importjava.util.Map;
5
6/**
7*@authorhankchen
10* 2012-2-3 下午02:46:52
11*/

12
13
14/**
15* 响应数据
16*/

17
18/**
19* 通用协议介绍
20*
21* 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
22* (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
23* 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
24* (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
25* 数据格式定义:
26* 字段1键名长度 字段1键名 字段1值长度 字段1值
27* 字段2键名长度 字段2键名 字段2值长度 字段2值
28* 字段3键名长度 字段3键名 字段3值长度 字段3值
29* … … … …
30* 长度为整型,占4个字节
31*/

32publicclassXLResponse{
33privatebyteencode;//数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
34privatebyteencrypt;//加密类型。0表示不加密
35privatebyteextend1;//用于扩展协议。暂未定义任何值
36privatebyteextend2;//用于扩展协议。暂未定义任何值
37privateintsessionid;//会话ID
38privateintresult;//结果码
39privateintlength;//数据包长
40
41privateMap<String,String>values=newHashMap<String, String>();
42
43privateString ip;
44
45publicvoidsetValue(String key,String value){
46 values.put(key, value);
47 }

48
49publicString getValue(String key){
50if(key==null){
51returnnull;
52 }

53returnvalues.get(key);
54 }

55
56publicbytegetEncode(){
57returnencode;
58 }

59
60publicvoidsetEncode(byteencode){
61this.encode=encode;
62 }

63
64publicbytegetEncrypt(){
65returnencrypt;
66 }

67
68publicvoidsetEncrypt(byteencrypt){
69this.encrypt=encrypt;
70 }

71
72publicbytegetExtend1(){
73returnextend1;
74 }

75
76publicvoidsetExtend1(byteextend1){
77this.extend1=extend1;
78 }

79
80publicbytegetExtend2(){
81returnextend2;
82 }

83
84publicvoidsetExtend2(byteextend2){
85this.extend2=extend2;
86 }

87
88publicintgetSessionid(){
89returnsessionid;
90 }

91
92publicvoidsetSessionid(intsessionid){
93this.sessionid=sessionid;
94 }

95
96publicintgetResult(){
97returnresult;
98 }

99
100publicvoidsetResult(intresult){
101this.result=result;
102 }

103
104publicintgetLength(){
105returnlength;
106 }

107
108publicvoidsetLength(intlength){
109this.length=length;
110 }

111
112publicMap<String, String>getValues(){
113returnvalues;
114 }

115
116publicString getIp(){
117returnip;
118 }

119
120publicvoidsetIp(String ip){
121this.ip=ip;
122 }

123
124publicvoidsetValues(Map<String, String>values){
125this.values=values;
126 }

127
128 @Override
129publicString toString(){
130return"XLResponse [encode="+encode+", encrypt="+encrypt+", extend1="+extend1+", extend2="+extend2
131+", sessionid="+sessionid+", result="+result+", length="+length+", values="+values+", ip="+ip+"]";
132 }

133}

1packageorg.jboss.netty.example.xlsvr.vo;
2
3importjava.util.HashMap;
4importjava.util.Map;
5
6/**
7*@authorhankchen
8* 2012-2-3 下午02:46:41
9*/

10
11/**
12* 请求数据
13*/

14
15/**
16* 通用协议介绍
17*
18* 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
19* (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
20* 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
21* (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
22* 数据格式定义:
23* 字段1键名长度 字段1键名 字段1值长度 字段1值
24* 字段2键名长度 字段2键名 字段2值长度 字段2值
25* 字段3键名长度 字段3键名 字段3值长度 字段3值
26* … … … …
27* 长度为整型,占4个字节
28*/

29publicclassXLRequest{
30privatebyteencode;//数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
31privatebyteencrypt;//加密类型。0表示不加密
32privatebyteextend1;//用于扩展协议。暂未定义任何值
33privatebyteextend2;//用于扩展协议。暂未定义任何值
34privateintsessionid;//会话ID
35privateintcommand;//命令
36privateintlength;//数据包长
37
38privateMap<String,String>params=newHashMap<String, String>();//参数
39
40privateString ip;
41
42publicbytegetEncode(){
43returnencode;
44 }

45
46publicvoidsetEncode(byteencode){
47this.encode=encode;
48 }

49
50publicbytegetEncrypt(){
51returnencrypt;
52 }

53
54publicvoidsetEncrypt(byteencrypt){
55this.encrypt=encrypt;
56 }

57
58publicbytegetExtend1(){
59returnextend1;
60 }

61
62publicvoidsetExtend1(byteextend1){
63this.extend1=extend1;
64 }

65
66publicbytegetExtend2(){
67returnextend2;
68 }

69
70publicvoidsetExtend2(byteextend2){
71this.extend2=extend2;
72 }

73
74publicintgetSessionid(){
75returnsessionid;
76 }

77
78publicvoidsetSessionid(intsessionid){
79this.sessionid=sessionid;
80 }

81
82publicintgetCommand(){
83returncommand;
84 }

85
86publicvoidsetCommand(intcommand){
87this.command=command;
88 }

89
90publicintgetLength(){
91returnlength;
92 }

93
94publicvoidsetLength(intlength){
95this.length=length;
96 }

97
98publicMap<String, String>getParams(){
99returnparams;
100 }

101
102publicvoidsetValue(String key,String value){
103 params.put(key, value);
104 }

105
106publicString getValue(String key){
107if(key==null){
108returnnull;
109 }

110returnparams.get(key);
111 }

112
113publicString getIp(){
114returnip;
115 }

116
117publicvoidsetIp(String ip){
118this.ip=ip;
119 }

120
121publicvoidsetParams(Map<String, String>params){
122this.params=params;
123 }

124
125 @Override
126publicString toString(){
127return"XLRequest [encode="+encode+", encrypt="+encrypt+", extend1="+extend1+", extend2="+extend2
128+", sessionid="+sessionid+", command="+command+", length="+length+", params="+params+", ip="+ip+"]";
129 }

130}

131

二、协议的编码和解码

对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。

1packageorg.jboss.netty.example.xlsvr.codec;
2
3importjava.nio.ByteBuffer;
4
5importorg.jboss.netty.buffer.ChannelBuffer;
6importorg.jboss.netty.buffer.ChannelBuffers;
7importorg.jboss.netty.channel.ChannelHandlerContext;
8importorg.jboss.netty.channel.Channels;
9importorg.jboss.netty.channel.MessageEvent;
10importorg.jboss.netty.channel.SimpleChannelDownstreamHandler;
11importorg.jboss.netty.example.xlsvr.util.ProtocolUtil;
12importorg.jboss.netty.example.xlsvr.vo.XLResponse;
13importorg.slf4j.Logger;
14importorg.slf4j.LoggerFactory;
15
16/**
17*@authorhankchen
18* 2012-2-3 上午10:48:15
19*/

20
21/**
22* 服务器端编码器
23*/

24publicclassXLServerEncoderextendsSimpleChannelDownstreamHandler{
25 Logger logger=LoggerFactory.getLogger(XLServerEncoder.class);
26
27 @Override
28publicvoidwriteRequested(ChannelHandlerContext ctx, MessageEvent e)throwsException{
29 XLResponse response=(XLResponse)e.getMessage();
30 ByteBuffer headBuffer=ByteBuffer.allocate(16);
31/**
32 * 先组织报文头
33*/

34 headBuffer.put(response.getEncode());
35 headBuffer.put(response.getEncrypt());
36 headBuffer.put(response.getExtend1());
37 headBuffer.put(response.getExtend2());
38 headBuffer.putInt(response.getSessionid());
39 headBuffer.putInt(response.getResult());
40
41/**
42 * 组织报文的数据部分
43*/

44 ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues());
45intlength=dataBuffer.readableBytes();
46 headBuffer.putInt(length);
47/**
48 * 非常重要
49 * ByteBuffer需要手动flip(),ChannelBuffer不需要
50*/

51 headBuffer.flip();
52 ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer();
53 totalBuffer.writeBytes(headBuffer);
54 logger.info("totalBuffer size="+totalBuffer.readableBytes());
55 totalBuffer.writeBytes(dataBuffer);
56 logger.info("totalBuffer size="+totalBuffer.readableBytes());
57 Channels.write(ctx, e.getFuture(), totalBuffer);
58 }

59
60}

61

1packageorg.jboss.netty.example.xlsvr.codec;
2
3importorg.jboss.netty.buffer.ChannelBuffer;
4importorg.jboss.netty.buffer.ChannelBuffers;
5importorg.jboss.netty.channel.Channel;
6importorg.jboss.netty.channel.ChannelHandlerContext;
7importorg.jboss.netty.example.xlsvr.util.ProtocolUtil;
8importorg.jboss.netty.example.xlsvr.vo.XLResponse;
9importorg.jboss.netty.handler.codec.frame.FrameDecoder;
10
11/**
12*@authorhankchen
13* 2012-2-3 上午10:47:54
14*/

15
16/**
17* 客户端解码器
18*/

19publicclassXLClientDecoderextendsFrameDecoder{
20
21 @Override
22protectedObject decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer)throwsException{
23if(buffer.readableBytes()<16){
24returnnull;
25 }

26 buffer.markReaderIndex();
27byteencode=buffer.readByte();
28byteencrypt=buffer.readByte();
29byteextend1=buffer.readByte();
30byteextend2=buffer.readByte();
31intsessionid=buffer.readInt();
32intresult=buffer.readInt();
33intlength=buffer.readInt();//数据包长
34if(buffer.readableBytes()<length){
35 buffer.resetReaderIndex();
36returnnull;
37 }

38 ChannelBuffer dataBuffer=ChannelBuffers.buffer(length);
39 buffer.readBytes(dataBuffer, length);
40
41 XLResponse response=newXLResponse();
42 response.setEncode(encode);
43 response.setEncrypt(encrypt);
44 response.setExtend1(extend1);
45 response.setExtend2(extend2);
46 response.setSessionid(sessionid);
47 response.setResult(result);
48 response.setLength(length);
49 response.setValues(ProtocolUtil.decode(encode, dataBuffer));
50 response.setIp(ProtocolUtil.getClientIp(channel));
51returnresponse;
52 }

53
54}

1packageorg.jboss.netty.example.xlsvr.util;
2
3importjava.net.SocketAddress;
4importjava.nio.charset.Charset;
5importjava.util.HashMap;
6importjava.util.Map;
7importjava.util.Map.Entry;
8
9importorg.jboss.netty.buffer.ChannelBuffer;
10importorg.jboss.netty.buffer.ChannelBuffers;
11importorg.jboss.netty.channel.Channel;
12
13/**
14*@authorhankchen
15* 2012-2-4 下午01:57:33
16*/

17publicclassProtocolUtil{
18
19/**
20 * 编码报文的数据部分
21 *@paramencode
22 *@paramvalues
23 *@return
24*/

25publicstaticChannelBuffer encode(intencode,Map<String,String>values){
26 ChannelBuffer totalBuffer=null;
27if(values!=null&&values.size()>0){
28 totalBuffer=ChannelBuffers.dynamicBuffer();
29intlength=0,index=0;
30 ChannelBuffer [] channelBuffers=newChannelBuffer[values.size()];
31 Charset charset=XLCharSetFactory.getCharset(encode);
32for(Entry<String,String>entry:values.entrySet()){
33 String key=entry.getKey();
34 String value=entry.getValue();
35 ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();
36 buffer.writeInt(key.length());
37 buffer.writeBytes(key.getBytes(charset));
38 buffer.writeInt(value.length());
39 buffer.writeBytes(value.getBytes(charset));
40 channelBuffers[index++]=buffer;
41 length+=buffer.readableBytes();
42 }

43
44for(inti=0; i<channelBuffers.length; i++){
45 totalBuffer.writeBytes(channelBuffers[i]);
46 }

47 }

48returntotalBuffer;
49 }

50
51/**
52 * 解码报文的数据部分
53 *@paramencode
54 *@paramdataBuffer
55 *@return
56*/

57publicstaticMap<String,String>decode(intencode,ChannelBuffer dataBuffer){
58 Map<String,String>dataMap=newHashMap<String, String>();
59if(dataBuffer!=null&&dataBuffer.readableBytes()>0){
60intprocessIndex=0,length=dataBuffer.readableBytes();
61 Charset charset=XLCharSetFactory.getCharset(encode);
62while(processIndex<length){
63/**
64 * 获取Key
65*/

66intsize=dataBuffer.readInt();
67byte[] contents=newbyte[size];
68 dataBuffer.readBytes(contents);
69 String key=newString(contents, charset);
70 processIndex=processIndex+size+4;
71/**
72 * 获取Value
73*/

74 size=dataBuffer.readInt();
75 contents=newbyte[size];
76 dataBuffer.readBytes(contents);
77 String value=newString(contents, charset);
78 dataMap.put(key, value);
79 processIndex=processIndex+size+4;
80 }

81 }

82returndataMap;
83 }

84
85/**
86 * 获取客户端IP
87 *@paramchannel
88 *@return
89*/

90publicstaticString getClientIp(Channel channel){
91/**
92 * 获取客户端IP
93*/

94 SocketAddress address=channel.getRemoteAddress();
95 String ip="";
96if(address!=null){
97 ip=address.toString().trim();
98intindex=ip.lastIndexOf(':');
99if(index<1){
100 index=ip.length();
101 }

102 ip=ip.substring(1, index);
103 }

104if(ip.length()>15){
105 ip=ip.substring(Math.max(ip.indexOf("/")+1, ip.length()-15));
106 }

107returnip;
108 }

109}

110

三、服务器端实现

服务器端提供的功能是:

1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。

2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。

为了展示多协议的运用,这里客户端的请求采用的是基于问本行(\n\r)的协议。

具体代码如下:

1packageorg.jboss.netty.example.xlsvr;
2
3importjava.net.InetSocketAddress;
4importjava.util.concurrent.Executors;
5
6importorg.jboss.netty.bootstrap.ServerBootstrap;
7importorg.jboss.netty.channel.Channel;
8importorg.jboss.netty.channel.ChannelPipeline;
9importorg.jboss.netty.channel.group.ChannelGroup;
10importorg.jboss.netty.channel.group.ChannelGroupFuture;
11importorg.jboss.netty.channel.group.DefaultChannelGroup;
12importorg.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
13importorg.jboss.netty.example.xlsvr.codec.XLServerEncoder;
14importorg.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
15importorg.jboss.netty.handler.codec.frame.Delimiters;
16importorg.jboss.netty.handler.codec.string.StringDecoder;
17importorg.jboss.netty.util.CharsetUtil;
18importorg.slf4j.Logger;
19importorg.slf4j.LoggerFactory;
20
21/**
22*@authorhankchen
23* 2012-1-30 下午03:21:38
24*/

25
26publicclassXLServer{
27publicstaticfinalintport=8080;
28publicstaticfinalLogger logger=LoggerFactory.getLogger(XLServer.class);
29publicstaticfinalChannelGroup allChannels=newDefaultChannelGroup("XLServer");
30privatestaticfinalServerBootstrap serverBootstrap=newServerBootstrap(newNioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
31
32publicstaticvoidmain(String [] args){
33try{
34 XLServer.startup();
35 }
catch(Exception e){
36 e.printStackTrace();
37 }

38 }

39
40publicstaticbooleanstartup()throwsException{
41/**
42 * 采用默认ChannelPipeline管道
43 * 这意味着同一个XLServerHandler实例将被多个Channel通道共享
44 * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能!
45*/

46 ChannelPipeline pipeline=serverBootstrap.getPipeline();
47/**
48 * 解码器是基于文本行的协议,\r\n或者\n\r
49*/

50 pipeline.addLast("frameDecoder",newDelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
51 pipeline.addLast("stringDecoder",newStringDecoder(CharsetUtil.UTF_8));
52 pipeline.addLast("encoder",newXLServerEncoder());
53 pipeline.addLast("handler",newXLServerHandler());
54
55 serverBootstrap.setOption("child.tcpNoDelay",true);//注意child前缀
56 serverBootstrap.setOption("child.keepAlive",true);//注意child前缀
57
58/**
59 * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象
60*/

61 Channel channel=serverBootstrap.bind(newInetSocketAddress(port));
62 allChannels.add(channel);
63 logger.info("server is started on port"+port);
64returnfalse;
65 }

66
67publicstaticvoidshutdown()throwsException{
68try{
69/**
70 * 主动关闭服务器
71*/

72 ChannelGroupFuture future=allChannels.close();
73 future.awaitUninterruptibly();//阻塞,直到服务器关闭
74//serverBootstrap.releaseExternalResources();
75 }
catch(Exception e){
76 e.printStackTrace();
77 logger.error(e.getMessage(),e);
78 }

79finally{
80 logger.info("server is shutdown on port"+port);
81 System.exit(1);
82 }

83 }

84}

85

1packageorg.jboss.netty.example.xlsvr;
2
3importjava.util.Random;
4
5importorg.jboss.netty.channel.Channel;
6importorg.jboss.netty.channel.ChannelFuture;
7importorg.jboss.netty.channel.ChannelHandlerContext;
8importorg.jboss.netty.channel.ChannelHandler.Sharable;
9importorg.jboss.netty.channel.ChannelStateEvent;
10importorg.jboss.netty.channel.ExceptionEvent;
11importorg.jboss.netty.channel.MessageEvent;
12importorg.jboss.netty.channel.SimpleChannelHandler;
13importorg.jboss.netty.example.xlsvr.vo.XLResponse;
14importorg.slf4j.Logger;
15importorg.slf4j.LoggerFactory;
16
17/**
18*@authorhankchen
19* 2012-1-30 下午03:22:24
20*/

21
22@Sharable
23publicclassXLServerHandlerextendsSimpleChannelHandler{
24privatestaticfinalLogger logger=LoggerFactory.getLogger(XLServerHandler.class);
25
26 @Override
27publicvoidmessageReceived(ChannelHandlerContext ctx, MessageEvent e)throwsException{
28 logger.info("messageReceived");
29if(e.getMessage()instanceofString){
30 String content=(String)e.getMessage();
31 logger.info("content is"+content);
32if("shutdown".equalsIgnoreCase(content)){
33//e.getChannel().close();
34 XLServer.shutdown();
35 }
else{
36 sendResponse(ctx);
37 }

38 }
else{
39 logger.error("message is not a String.");
40 e.getChannel().close();
41 }

42 }

43
44 @Override
45publicvoidexceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throwsException{
46 logger.error(e.getCause().getMessage(),e.getCause());
47 e.getCause().printStackTrace();
48 e.getChannel().close();
49 }

50
51 @Override
52publicvoidchannelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)throwsException{
53 logger.info("channelConnected");
54 sendResponse(ctx);
55 }

56
57 @Override
58publicvoidchannelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)throwsException{
59 logger.info("channelClosed");
60//删除通道
61 XLServer.allChannels.remove(e.getChannel());
62 }

63
64 @Override
65publicvoidchannelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)throwsException{
66 logger.info("channelDisconnected");
67super.channelDisconnected(ctx, e);
68 }

69
70 @Override
71publicvoidchannelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)throwsException{
72 logger.info("channelOpen");
73//增加通道
74 XLServer.allChannels.add(e.getChannel());
75 }

76
77/**
78 * 发送响应内容
79 *@paramctx
80 *@parame
81 *@return
82*/

83privateChannelFuture sendResponse(ChannelHandlerContext ctx){
84 Channel channel=ctx.getChannel();
85 Random random=newRandom();
86 XLResponse response=newXLResponse();
87 response.setEncode((byte)0);
88 response.setResult(1);
89 response.setValue("name","hankchen");
90 response.setValue("time", String.valueOf(System.currentTimeMillis()));
91 response.setValue("age",String.valueOf(random.nextInt()));
92/**
93 * 发送接收信息的时间戳到客户端
94 * 注意:Netty中所有的IO操作都是异步的!
95*/

96 ChannelFuture future=channel.write(response);//发送内容
97returnfuture;
98 }

99}

100

四、客户端实现

客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。

关键代码如下:

1/**
2* Copyright (C): 2012
3*@authorhankchen
4* 2012-1-30 下午03:21:26
5*/

6
7/**
8* 服务器特征:
9* 1、使用专用解码器解析服务器发过来的数据
10* 2、客户端主动关闭连接
11*/

12publicclassXLClient{
13publicstaticfinalintport=XLServer.port;
14publicstaticfinalString host="localhost";
15privatestaticfinalLogger logger=LoggerFactory.getLogger(XLClient.class);
16privatestaticfinalNioClientSocketChannelFactory clientSocketChannelFactory=newNioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
17privatestaticfinalClientBootstrap clientBootstrap=newClientBootstrap(clientSocketChannelFactory);
18
19/**
20 *@paramargs
21 *@throwsException
22*/

23publicstaticvoidmain(String[] args)throwsException{
24 ChannelFuture future=XLClient.startup();
25 logger.info("future state is"+future.isSuccess());
26 }

27
28/**
29 * 启动客户端
30 *@return
31 *@throwsException
32*/

33publicstaticChannelFuture startup()throwsException{
34/**
35 * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式
36 * 例如,下面的代码形式是错误的:
37 * ChannelPipeline pipeline=clientBootstrap.getPipeline();
38 * pipeline.addLast("handler", new XLClientHandler());
39*/

40 clientBootstrap.setPipelineFactory(newXLClientPipelineFactory());//只能这样设置
41/**
42 * 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象
43*/

44 clientBootstrap.setOption("tcpNoDelay",true);
45 clientBootstrap.setOption("keepAlive",true);
46
47 ChannelFuture future=clientBootstrap.connect(newInetSocketAddress(host, port));
48/**
49 * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态
50*/

51 future.awaitUninterruptibly();
52/**
53 * 如果连接失败,我们将打印连接失败的原因。
54 * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。
55*/

56if(!future.isSuccess()){
57 future.getCause().printStackTrace();
58 }
else{
59 logger.info("client is connected to server"+host+":"+port);
60 }

61returnfuture;
62 }

63
64/**
65 * 关闭客户端
66 *@paramfuture
67 *@throwsException
68*/

69publicstaticvoidshutdown(ChannelFuture future)throwsException{
70try{
71/**
72 * 主动关闭客户端连接,会阻塞等待直到通道关闭
73*/

74 future.getChannel().close().awaitUninterruptibly();
75//future.getChannel().getCloseFuture().awaitUninterruptibly();
76/**
77 * 释放ChannelFactory通道工厂使用的资源。
78 * 这一步仅需要调用 releaseExternalResources()方法即可。
79 * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。
80*/

81 clientBootstrap.releaseExternalResources();
82 }
catch(Exception e){
83 e.printStackTrace();
84 logger.error(e.getMessage(),e);
85 }

86finally{
87 System.exit(1);
88 logger.info("client is shutdown to server"+host+":"+port);
89 }

90 }

91}

1publicclassXLClientPipelineFactoryimplementsChannelPipelineFactory{
2
3 @Override
4publicChannelPipeline getPipeline()throwsException{
5 ChannelPipeline pipeline=Channels.pipeline();
6/**
7 * 使用专用的解码器,解决数据分段的问题
8 * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。
9*/

10 pipeline.addLast("decoder",newXLClientDecoder());
11/**
12 * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了!
13*/

14 pipeline.addLast("handler",newXLClientHandler());
15returnpipeline;
16 }

17
18}

1/**
2* Copyright (C): 2012
3*@authorhankchen
4* 2012-1-30 下午03:21:52
5*/

6
7/**
8* 服务器特征:
9* 1、使用专用的编码解码器,解决数据分段的问题
10* 2、使用POJO替代ChannelBuffer传输
11*/

12publicclassXLClientHandlerextendsSimpleChannelHandler{
13privatestaticfinalLogger logger=LoggerFactory.getLogger(XLClientHandler.class);
14privatefinalAtomicInteger count=newAtomicInteger(0);//计数器
15
16 @Override
17publicvoidmessageReceived(ChannelHandlerContext ctx, MessageEvent e)throwsException{
18 processMethod1(ctx, e);//处理方式一
19 }

20
21/**
22 *@paramctx
23 *@parame
24 *@throwsException
25*/

26publicvoidprocessMethod1(ChannelHandlerContext ctx, MessageEvent e)throwsException{
27 logger.info("processMethod1……,count="+count.addAndGet(1));
28 XLResponse serverTime=(XLResponse)e.getMessage();
29 logger.info("messageReceived,content:"+serverTime.toString());
30 Thread.sleep(1000);
31
32if(count.get()<10){
33//从新发送请求获取最新的服务器时间
34 ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again\r\n".getBytes()));
35 }
else{
36//从新发送请求关闭服务器
37 ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown\r\n".getBytes()));
38 }

39 }

40
41 @Override
42publicvoidexceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throwsException{
43 logger.info("exceptionCaught");
44 e.getCause().printStackTrace();
45 ctx.getChannel().close();
46 }

47
48 @Override
49publicvoidchannelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)throwsException{
50 logger.info("channelClosed");
51super.channelClosed(ctx, e);
52 }

53
54
55}

全文代码较多,写了很多注释,希望对读者有用,谢谢!

(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen


分享到:
评论

相关推荐

    Netty_中文技术文档

    它借鉴了多种协议(如FTP、SMTP、HTTP等)的实现经验,融合了二进制和文本协议的处理技巧,形成了一套既高效又易于使用的网络编程框架。 与其他类似框架相比,Netty的独特之处在于其设计哲学的贯彻——始终以用户...

    Netty中文指南教程

    - **广泛的协议支持**: 凭借其灵活的架构,Netty可以轻松支持各种网络协议,包括但不限于HTTP、FTP、SMTP等常见协议,以及各种自定义的二进制或文本协议。 #### 四、Netty实践入门 **1. 开发环境准备** - **Netty...

    netty中文指南.docx

    在面对需要处理大量并发连接和高效数据传输的场景时,传统的通用协议和实现往往无法满足需求。例如,HTTP 服务器可能不适应大规模文件传输或实时消息传递的需求。这时,Netty 提供了一个解决方案,它允许开发者定制...

    netty3.1中文用户手册.pdf

    - **Google Protocol Buffers整合**:讨论了Netty如何与Google Protocol Buffers结合使用,以实现高效的序列化和反序列化。 - **总述**:总结了本章中介绍的主要内容,为读者提供了一个关于Netty架构的全面理解。 ...

    MODBUS-RTU协议,modbusrtu协议解析

    RTU模式是MODBUS协议的一种变体,它使用二进制格式传输数据,不包含每个字符的起始和结束标识,因此具有更高的传输效率。在RTU模式下,数据以连续的8位字节流形式发送,每两个字节之间用1.5个停止位隔开,以确保数据...

    Netty-中文技术文档.docx

    Netty的出现解决了通用协议库在大规模、高并发场景下的局限性,使得开发者能够快速构建针对特定需求的高度优化的协议实现。 Netty的核心特性在于它的异步事件驱动模型,这一模型基于Java的非阻塞I/O(NIO)API。...

    Dubbo传输层实现

    在Dubbo中,传输层扮演着数据传输媒介的角色,确保请求和响应能够在分布式环境中准确无误地传递。在这个实现中,Dubbo选择了Netty作为基础网络库,因为Netty是一个高性能、异步事件驱动的网络应用框架,适用于快速...

    Netty面试专题1

    - **JSON**:通用的数据交换格式,易于阅读和编写,但相对于二进制序列化协议,效率较低。 - **Hessian**:轻量级的二进制RPC协议,适合内部服务间通信。 - **Msgpack**:另一种高效的二进制序列化协议。 在选择...

    java开源包3

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java中把对象转化为byte数组的方法.doc

    Java 中将对象转化为 byte 数组的方法是指将 Java 对象序列化为二进制数据流,以便在网络上传输或存储。这种方法可以应用于各种领域,如分布式计算、网络通信、数据存储等。 在 Java 中,对象转化为 byte 数组通常...

    bin.rar

    1. **网络通信**:Hadoop Common包含了一套基于Socket的网络通信组件,如Netty,用于节点间的高效数据传输。这些组件支持TCP/IP协议,确保了Hadoop集群内的节点能够快速、稳定地进行数据交互。 2. **I/O处理**:...

    Dubbo 27道面试题及答案.docx

    Dubbo默认使用Hessian作为序列化框架,这是一种二进制协议,具有轻量级、快速的特点。此外,还可以选择Duddo、FastJson和Java自带的序列化机制。Hessian通过HTTP POST方式传输数据,将辅助信息封装在HTTP头中,核心...

    dubbo源码分析系列1

    - **dubbo-common**:这是一个通用模块,包含了Dubbo框架共用的工具类,如IO处理、日志、配置、类处理等,以及线程池、二进制编码、JSON处理、数据存储接口等。这个模块为其他模块提供了基础支持。 - **dubbo-rpc-...

    grpcjar.rar

    protobuf是Google的一种数据序列化协议,可以将结构化数据转化为二进制格式,便于网络传输。Java开发者可以通过protobuf编译器(protoc)将.proto文件转换为Java代码,其中包含了用于GRPC通信的Stub类。 2. **...

    hadoop-common-2.6.0-bin-master

    - **网络通信**: Hadoop Common提供了网络通信库,如Socket和Netty,用于节点间的通信,如数据传输、心跳检查和任务调度。 - **安全机制**: 支持Kerberos等认证和授权机制,以确保Hadoop集群的安全性。 - **日志...

    java开源包4

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包8

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包1

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

    java开源包11

    Blister是一个用于操作苹果二进制PList文件格式的Java开源类库(可用于发送数据给iOS应用程序)。 重复文件检查工具 FindDup.tar FindDup 是一个简单易用的工具,用来检查计算机上重复的文件。 OpenID的Java客户端...

Global site tag (gtag.js) - Google Analytics