- 浏览: 582910 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (174)
- JBPM (3)
- WWF (0)
- JavaScript (11)
- J2EE (40)
- OperationSystem (11)
- 数据库 (12)
- CSS (1)
- Ajax (2)
- J2SE (30)
- Tools (10)
- 服务器中间件 (3)
- 异常 (0)
- Flex (5)
- jQuery (11)
- html (9)
- Ejb (1)
- HTML5 Shiv–让该死的IE系列支持HTML5吧 (1)
- Spring (9)
- Quartz (3)
- log4j (1)
- maven (1)
- cpdetector (1)
- JSON (1)
- log4jdbc (1)
- asm (8)
- FusionCharts (1)
- jqplot (1)
- highcharts (1)
- excanvas (1)
- html5 (1)
- jpcap介绍 (1)
- weblogic (3)
- URLURLClassLoader (0)
- URLClassLoader (1)
- ant (2)
- ivy (2)
- nexus (1)
- IT (0)
- LoadRunner (1)
- SCSS (1)
- ruby (1)
- webstorm (1)
- typescript (1)
- Jboss7 (1)
- wildfly (1)
- oracle (5)
- esb (0)
- dubbo (2)
- zookeeper (3)
- eclipse (1)
- Android (2)
- Studio (1)
- Google (1)
- 微信 (1)
- 企业号 (1)
- Linux (13)
- Oracle12c (1)
- Hadoop (1)
- InletexEMC (1)
- Windows (1)
- Netty (3)
- Marshalling (2)
- Protobuf (1)
- gcc (1)
- Git (1)
- GitLab (1)
- shell (2)
- java (3)
- Spring4 (1)
- hibernate4 (1)
- postgresql (1)
- ApacheServer (2)
- Tomcat (2)
- ApacheHttpServer (2)
- realvnc (1)
- redhat (7)
- vncviewer (1)
- LVS (4)
- LVS-DR (1)
- RedHat6.5 (5)
- LVS-NAT (1)
- LVS-IPTUNNEL (2)
- LVS-TUN (1)
- keepalived (2)
- yum (1)
- iso (1)
- VMware (1)
- redhat5 (1)
- ha (1)
- nginx (2)
- proguard (1)
- Mat (1)
- DTFJ (1)
- axis2 (1)
- web service (1)
- centos (1)
- random (1)
- urandom (1)
- apache (1)
- IBM (1)
- cve (1)
- 漏洞 (1)
- JDBC (1)
- DataSource (1)
- jdk (1)
- tuxedo (2)
- wtc (1)
最新评论
-
skying007:
好资料,谢谢分享给啊
FusionCharts在服务器端导出图片(J2EE版) -
cgnnzg:
大神好 可以发一份源码给我学习么 多谢了 978241085 ...
springmvc+dubbo+zookeeper -
jifengjianhao:
求源码:854606899@qq.com
springmvc+dubbo+zookeeper -
wdloyeu:
shihuan8@163.com邮箱网盘在哪,没找到。能给份源 ...
Java Socket长连接示例代码 -
huangshangyuanji:
求代码:45613032@qq.com
springmvc+dubbo+zookeeper
工程结构图如下:
NettyConstant.java文件内容如下:
MessageType.java文件内容如下:
Header.java文件内容如下:
NettyMessage.java文件内容如下:
ChannelBufferByteInput.java文件内容如下:
ChannelBufferByteOutput.java文件内容如下:
MarshallingCodecFactory.java文件内容如下:
MarshallingDecoder.java文件内容如下:
MarshallingEncoder.java文件内容如下:
NettyMessageDecoder.java文件内容如下:
NettyMessageEncoder.java文件内容如下:
HeartBeatRespHandler.java文件内容如下:
LoginAuthRespHandler.java文件内容如下:
NettyServer.java文件内容如下:
HeartBeatReqHandler.java文件内容如下:
LoginAuthReqHandler.java文件内容如下:
NettyClient.java文件内容如下:
【注】: 附件中的protocolstack.rar文件是可运行的工程源代码。
NettyConstant.java文件内容如下:
package com.shihuan.netty.protocol; public final class NettyConstant { public static final String REMOTEIP = "127.0.0.1"; public static final int PORT = 8080; public static final int LOCAL_PORT = 12088; public static final String LOCALIP = "127.0.0.1"; }
MessageType.java文件内容如下:
package com.shihuan.netty.protocol; public enum MessageType { SERVICE_REQ((byte) 0), SERVICE_RESP((byte) 1), ONE_WAY((byte) 2), LOGIN_REQ((byte) 3), LOGIN_RESP((byte) 4), HEARTBEAT_REQ((byte) 5), HEARTBEAT_RESP((byte) 6); private byte value; private MessageType(byte value) { this.value = value; } public byte value() { return this.value; } }
Header.java文件内容如下:
package com.shihuan.netty.protocol.struct; import java.util.HashMap; import java.util.Map; public final class Header { private int crcCode = 0xabef0101; private int length; // 消息长度 private long sessionID; // 会话ID private byte type; // 消息类型 private byte priority; // 消息优先级 private Map<String, Object> attachment = new HashMap<String, Object>(); // 附件 public final int getCrcCode() { return crcCode; } public final void setCrcCode(int crcCode) { this.crcCode = crcCode; } public final int getLength() { return length; } public final void setLength(int length) { this.length = length; } public final long getSessionID() { return sessionID; } public final void setSessionID(long sessionID) { this.sessionID = sessionID; } public final byte getType() { return type; } public final void setType(byte type) { this.type = type; } public final byte getPriority() { return priority; } public final void setPriority(byte priority) { this.priority = priority; } public final Map<String, Object> getAttachment() { return attachment; } public final void setAttachment(Map<String, Object> attachment) { this.attachment = attachment; } /* * (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionID=" + sessionID + ", type=" + type + ", priority=" + priority + ", attachment=" + attachment + "]"; } }
NettyMessage.java文件内容如下:
package com.shihuan.netty.protocol.struct; public final class NettyMessage { private Header header; //消息头 private Object body; //消息体 public final Header getHeader() { return header; } public final void setHeader(Header header) { this.header = header; } public final Object getBody() { return body; } public final void setBody(Object body) { this.body = body; } /* * (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { return "NettyMessage [header=" + header + "]"; } }
ChannelBufferByteInput.java文件内容如下:
package com.shihuan.netty.protocol.codec; import io.netty.buffer.ByteBuf; import java.io.IOException; import org.jboss.marshalling.ByteInput; class ChannelBufferByteInput implements ByteInput { private final ByteBuf buffer; public ChannelBufferByteInput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { } @Override public int available() throws IOException { return buffer.readableBytes(); } @Override public int read() throws IOException { if (buffer.isReadable()) { return buffer.readByte() & 0xff; } return -1; } @Override public int read(byte[] array) throws IOException { return read(array, 0, array.length); } @Override public int read(byte[] dst, int dstIndex, int length) throws IOException { int available = available(); if (available == 0) { return -1; } length = Math.min(available, length); buffer.readBytes(dst, dstIndex, length); return length; } @Override public long skip(long bytes) throws IOException { int readable = buffer.readableBytes(); if (readable < bytes) { bytes = readable; } buffer.readerIndex((int) (buffer.readerIndex() + bytes)); return bytes; } }
ChannelBufferByteOutput.java文件内容如下:
package com.shihuan.netty.protocol.codec; import io.netty.buffer.ByteBuf; import java.io.IOException; import org.jboss.marshalling.ByteOutput; class ChannelBufferByteOutput implements ByteOutput { private final ByteBuf buffer; /** * Create a new instance which use the given {@link ByteBuf} */ public ChannelBufferByteOutput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { } @Override public void flush() throws IOException { } @Override public void write(int b) throws IOException { buffer.writeByte(b); } @Override public void write(byte[] bytes) throws IOException { buffer.writeBytes(bytes); } @Override public void write(byte[] bytes, int srcIndex, int length) throws IOException { buffer.writeBytes(bytes, srcIndex, length); } /** * Return the {@link ByteBuf} which contains the written content */ ByteBuf getBuffer() { return buffer; } }
MarshallingCodecFactory.java文件内容如下:
package com.shihuan.netty.protocol.codec; import java.io.IOException; import org.jboss.marshalling.Marshaller; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; import org.jboss.marshalling.Unmarshaller; public final class MarshallingCodecFactory { /** * 创建Jboss Marshaller * @throws IOException */ protected static Marshaller buildMarshalling() throws IOException { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); Marshaller marshaller = marshallerFactory.createMarshaller(configuration); return marshaller; } /** * 创建Jboss Unmarshaller * @throws IOException */ protected static Unmarshaller buildUnMarshalling() throws IOException { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration); return unmarshaller; } }
MarshallingDecoder.java文件内容如下:
package com.shihuan.netty.protocol.codec; import io.netty.buffer.ByteBuf; import java.io.IOException; import org.jboss.marshalling.ByteInput; import org.jboss.marshalling.Unmarshaller; public class MarshallingDecoder { private final Unmarshaller unmarshaller; /** * Creates a new decoder whose maximum object size is {@code 1048576} bytes. * If the size of the received object is greater than {@code 1048576} bytes, * a {@link StreamCorruptedException} will be raised. * * @throws IOException * */ public MarshallingDecoder() throws IOException { unmarshaller = MarshallingCodecFactory.buildUnMarshalling(); } protected Object decode(ByteBuf in) throws Exception { int objectSize = in.readInt(); ByteBuf buf = in.slice(in.readerIndex(), objectSize); ByteInput input = new ChannelBufferByteInput(buf); try { unmarshaller.start(input); Object obj = unmarshaller.readObject(); unmarshaller.finish(); in.readerIndex(in.readerIndex() + objectSize); return obj; } finally { unmarshaller.close(); } } }
MarshallingEncoder.java文件内容如下:
package com.shihuan.netty.protocol.codec; import java.io.IOException; import org.jboss.marshalling.Marshaller; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; @Sharable public class MarshallingEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; Marshaller marshaller; public MarshallingEncoder() throws IOException { marshaller = MarshallingCodecFactory.buildMarshalling(); } protected void encode(Object msg, ByteBuf out) throws Exception { try { int lengthPos = out.writerIndex(); out.writeBytes(LENGTH_PLACEHOLDER); ChannelBufferByteOutput output = new ChannelBufferByteOutput(out); marshaller.start(output); marshaller.writeObject(msg); marshaller.finish(); out.setInt(lengthPos, out.writerIndex() - lengthPos - 4); } finally { marshaller.close(); } } }
NettyMessageDecoder.java文件内容如下:
package com.shihuan.netty.protocol.codec; import java.io.IOException; import java.util.HashMap; import java.util.Map; import com.shihuan.netty.protocol.struct.Header; import com.shihuan.netty.protocol.struct.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder { MarshallingDecoder marshallingDecoder; public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); marshallingDecoder = new MarshallingDecoder(); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) { return null; } NettyMessage message = new NettyMessage(); Header header = new Header(); header.setCrcCode(frame.readInt()); header.setLength(frame.readInt()); header.setSessionID(frame.readLong()); header.setType(frame.readByte()); header.setPriority(frame.readByte()); int size = frame.readInt(); if (size > 0) { Map<String, Object> attch = new HashMap<String, Object>(size); int keySize = 0; byte[] keyArray = null; String key = null; for (int i = 0; i < size; i++) { keySize = frame.readInt(); keyArray = new byte[keySize]; frame.readBytes(keyArray); key = new String(keyArray, "UTF-8"); attch.put(key, marshallingDecoder.decode(frame)); } keyArray = null; key = null; header.setAttachment(attch); } if (frame.readableBytes() > 4) { message.setBody(marshallingDecoder.decode(frame)); } message.setHeader(header); return message; } }
NettyMessageEncoder.java文件内容如下:
package com.shihuan.netty.protocol.codec; import java.io.IOException; import java.util.Map; import com.shihuan.netty.protocol.struct.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public final class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> { MarshallingEncoder marshallingEncoder; public NettyMessageEncoder() throws IOException { this.marshallingEncoder = new MarshallingEncoder(); } @Override protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception { if (msg == null || msg.getHeader() == null) { throw new Exception("The encode message is null"); } sendBuf.writeInt((msg.getHeader().getCrcCode())); sendBuf.writeInt((msg.getHeader().getLength())); sendBuf.writeLong((msg.getHeader().getSessionID())); sendBuf.writeByte((msg.getHeader().getType())); sendBuf.writeByte((msg.getHeader().getPriority())); sendBuf.writeInt((msg.getHeader().getAttachment().size())); String key = null; byte[] keyArray = null; Object value = null; for (Map.Entry<String, Object> param : msg.getHeader().getAttachment().entrySet()) { key = param.getKey(); keyArray = key.getBytes("UTF-8"); sendBuf.writeInt(keyArray.length); sendBuf.writeBytes(keyArray); value = param.getValue(); marshallingEncoder.encode(value, sendBuf); } key = null; keyArray = null; value = null; if (msg.getBody() != null) { marshallingEncoder.encode(msg.getBody(), sendBuf); } else { sendBuf.writeInt(0); } sendBuf.setInt(4, sendBuf.readableBytes()-8); } }
HeartBeatRespHandler.java文件内容如下:
package com.shihuan.netty.protocol.server; import com.shihuan.netty.protocol.MessageType; import com.shihuan.netty.protocol.struct.Header; import com.shihuan.netty.protocol.struct.NettyMessage; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class HeartBeatRespHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 返回心跳应答消息 if (message.getHeader()!=null && message.getHeader().getType()==MessageType.HEARTBEAT_REQ.value()) { System.out.println("Receive client heart beat message : ---> " + message); NettyMessage heartBeat = buildHeatBeat(); System.out.println("Send heart beat response message to client : ---> " + heartBeat); ctx.writeAndFlush(heartBeat); } else ctx.fireChannelRead(msg); } private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_RESP.value()); message.setHeader(header); return message; } }
LoginAuthRespHandler.java文件内容如下:
package com.shihuan.netty.protocol.server; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.shihuan.netty.protocol.MessageType; import com.shihuan.netty.protocol.struct.Header; import com.shihuan.netty.protocol.struct.NettyMessage; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class LoginAuthRespHandler extends ChannelHandlerAdapter { private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>(); private String[] whitekList = { "127.0.0.1", "192.168.1.104" }; /** * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to * the next {@link ChannelHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 如果是握手请求消息,处理,其它消息透传 if (message.getHeader()!=null && message.getHeader().getType()==MessageType.LOGIN_REQ.value()) { String nodeIndex = ctx.channel().remoteAddress().toString(); NettyMessage loginResp = null; // 重复登陆,拒绝 if (nodeCheck.containsKey(nodeIndex)) { loginResp = buildResponse((byte) -1); } else { InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = address.getAddress().getHostAddress(); boolean isOK = false; for (String WIP : whitekList) { if (WIP.equals(ip)) { isOK = true; break; } } loginResp = isOK ? buildResponse((byte) 0) : buildResponse((byte) -1); if (isOK) { nodeCheck.put(nodeIndex, true); } } System.out.println("The login response is : " + loginResp + " body [" + loginResp.getBody() + "]"); ctx.writeAndFlush(loginResp); } else { ctx.fireChannelRead(msg); } } private NettyMessage buildResponse(byte result) { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.LOGIN_RESP.value()); message.setHeader(header); message.setBody(result); return message; } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); nodeCheck.remove(ctx.channel().remoteAddress().toString()); // 删除缓存 ctx.close(); ctx.fireExceptionCaught(cause); } }
NettyServer.java文件内容如下:
package com.shihuan.netty.protocol.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.io.IOException; import com.shihuan.netty.protocol.NettyConstant; import com.shihuan.netty.protocol.codec.NettyMessageDecoder; import com.shihuan.netty.protocol.codec.NettyMessageEncoder; public class NettyServer { public void bind() throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws IOException { ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast(new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync(); System.out.println("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT)); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); // 绑定端口,同步等待成功 //b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync(); //System.out.println("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT)); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyServer().bind(); } }
HeartBeatReqHandler.java文件内容如下:
package com.shihuan.netty.protocol.client; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import com.shihuan.netty.protocol.MessageType; import com.shihuan.netty.protocol.struct.Header; import com.shihuan.netty.protocol.struct.NettyMessage; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class HeartBeatReqHandler extends ChannelHandlerAdapter { private volatile ScheduledFuture<?> heartBeat; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 握手成功,主动发送心跳消息 if (message.getHeader()!=null && message.getHeader().getType()==MessageType.LOGIN_RESP.value()) { heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS); } else if (message.getHeader()!=null && message.getHeader().getType()==MessageType.HEARTBEAT_RESP.value()) { System.out.println("Client receive server heart beat message : ---> " + message); } else { ctx.fireChannelRead(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { NettyMessage heatBeat = buildHeatBeat(); System.out.println("Client send heart beat messsage to server : ---> " + heatBeat); ctx.writeAndFlush(heatBeat); } private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_REQ.value()); message.setHeader(header); return message; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } }
LoginAuthReqHandler.java文件内容如下:
package com.shihuan.netty.protocol.client; import com.shihuan.netty.protocol.MessageType; import com.shihuan.netty.protocol.struct.Header; import com.shihuan.netty.protocol.struct.NettyMessage; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class LoginAuthReqHandler extends ChannelHandlerAdapter { /** * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward to the * next {@link ChannelHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buildLoginReq()); } /** * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to * the next {@link ChannelHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 如果是握手应答消息,需要判断是否认证成功 if (message.getHeader()!=null && message.getHeader().getType()==MessageType.LOGIN_RESP.value()) { byte loginResult = (byte) message.getBody(); if (loginResult != (byte) 0) { // 握手失败,关闭连接 ctx.close(); } else { System.out.println("Login is ok : " + message); ctx.fireChannelRead(msg); } } else { ctx.fireChannelRead(msg); } } private NettyMessage buildLoginReq() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.LOGIN_REQ.value()); message.setHeader(header); return message; } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
NettyClient.java文件内容如下:
package com.shihuan.netty.protocol.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.shihuan.netty.protocol.NettyConstant; import com.shihuan.netty.protocol.codec.NettyMessageDecoder; import com.shihuan.netty.protocol.codec.NettyMessageEncoder; public class NettyClient { private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); EventLoopGroup group = new NioEventLoopGroup(); public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler()); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler()); } }); // 发起异步连接操作 ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync(); future.channel().closeFuture().sync(); } finally { // 所有资源释放完成之后,清空资源,再次发起重连操作 executor.execute(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); try { connect(NettyConstant.PORT, NettyConstant.REMOTEIP); // 发起重连操作 } catch (Exception e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) throws Exception { new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP); } }
【注】: 附件中的protocolstack.rar文件是可运行的工程源代码。
相关推荐
总的来说,Netty的Marshalling编解码应用是网络编程中的一种高级技巧,能够帮助开发者更高效地处理Java对象在网络中的传输。通过深入学习和实践,你可以掌握这一技术,并将其应用于分布式系统、微服务架构等多种场景...
在这个项目中,我们将深入理解如何利用 Netty 4 来编写服务器和客户端,实现自定义的消息编解码,并进行通信。 首先,我们要创建一个自定义的消息类。这个消息类通常会包含必要的字段,比如消息头、消息体等,以...
在分布式系统和网络通信中,Netty是一个非常流行的高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。而Protocol Buffers(简称Protobuf)是Google开发的一种数据序列化协议...
本压缩包文件"Netty之自定义编解码器.zip"着重讨论的是如何在Netty中自定义编解码器,这是Netty框架中的一个重要组成部分,用于将应用程序数据转换为网络传输的数据格式,以及将接收到的网络数据转换回应用程序可以...
**Marshalling编解码**:Marshalling是将对象序列化成二进制的过程,便于在网络中传输。在Netty中,MarshallingDecoder和MarshallingEncoder是用于处理这种序列化和反序列化的组件。它们允许我们将Java对象转换为...
Protobuf编解码器在Netty中的应用,使得基于Protobuf的数据在网络传输中变得更加便捷和高效。 首先,理解Protobuf的基本概念。Protobuf定义了一种语言中立、平台中立的数据表示格式,通过.proto文件定义消息结构。...
在Netty中,编解码器是处理数据转换的关键组件,它们将原始的字节流转换为应用程序可理解的消息格式,反之亦然。本文将深入探讨Netty中的编解码器框架。 首先,我们需要理解编码器和解码器的基本概念。编码器负责将...
本文主要介绍了使用Netty框架在Springboot项目中实现自定义协议的方法。自定义协议是指在网络通信中,使用特定的数据格式来传输数据,以满足特定的业务需求。在本文中,我们将使用Springboot框架来实现一个简单的...
Netty 是一个高性能、异步...总结来说,通过在Netty中自定义解码器和编码器,我们可以有效地处理网络通信中的拆包和粘包问题,确保数据的正确传输和解析。同时,理解并掌握这个过程对于开发高性能的网络应用至关重要。
在Netty中,可以通过自定义的 `ChannelInboundHandler` 或 `ChannelOutboundHandler` 来实现特定的数据解码和编码逻辑。例如,可以创建一个 `HexDecoder` 和 `HexEncoder`,分别处理从16进制字符串到字节的转换和...
// 自定义解码器 public class CustomMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in....
4. **Netty的自定义编解码器**:除了使用内置的`SerializationCodec`,Netty还支持自定义编解码器,可以根据业务需求实现`MessageToByteEncoder`和`ByteToMessageDecoder`接口,以更灵活的方式处理数据编码和解码。...
【Netty 编解码的艺术】是Netty学习中的一个重要章节,主要探讨了在处理网络通信时遇到的数据拆包和粘包问题,以及Netty如何通过编码和解码技术来解决这些问题。 首先,TCP协议是一种面向流的协议,它不保证数据包...
它们可能包含了示例代码或教程,帮助开发者理解如何在实际项目中应用这两种解码器。通过学习这些例子,你可以了解如何在Netty管道(ChannelPipeline)中添加解码器,以及如何处理解码后的事件。通常,解码器会被添加...
基于Netty实现的分布式锁,利用了Netty作为高效的异步事件驱动网络应用框架,可以提供长连接服务,降低网络开销,提高系统的吞吐量和响应速度。 Netty是Java领域的一款优秀网络通信框架,它提供了高度可定制化的...
在Netty中,自定义协议的实现主要涉及到两个关键组件:编码器(Encoder)和解码器(Decoder)。编码器负责将应用层的数据转换为网络传输的数据格式,解码器则负责将接收到的网络数据还原为应用层能理解的格式。在这...
3. **Netty中的编解码处理**:Netty 提供了多种ChannelHandler(处理器)来解决这个问题,其中`LengthFieldBasedFrameDecoder`和`LengthFieldPrepender`是处理粘包和拆包的常用工具。 - `...
然而,为了在Netty中使用这些序列化和反序列化功能,我们需要创建自定义的编解码器。Netty提供了一些基础类来帮助我们实现这一目标。 对于编码器,我们继承`MessageToByteEncoder`。这个类期望我们实现`encode`方法...