`
shihuan830619
  • 浏览: 580136 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Netty中Marshalling编解码自定义栈应用

阅读更多
工程结构图如下:


NettyConstant.java文件内容如下:
package com.shihuan.netty.protocol;

public final class NettyConstant {

	public static final String REMOTEIP = "127.0.0.1";
    public static final int PORT = 8080;
    public static final int LOCAL_PORT = 12088;
    public static final String LOCALIP = "127.0.0.1";
	
}


MessageType.java文件内容如下:
package com.shihuan.netty.protocol;

public enum MessageType {

	SERVICE_REQ((byte) 0), SERVICE_RESP((byte) 1), ONE_WAY((byte) 2), LOGIN_REQ((byte) 3), LOGIN_RESP((byte) 4), HEARTBEAT_REQ((byte) 5), HEARTBEAT_RESP((byte) 6);

	private byte value;

	private MessageType(byte value) {
		this.value = value;
	}

	public byte value() {
		return this.value;
	}
	
}


Header.java文件内容如下:
package com.shihuan.netty.protocol.struct;

import java.util.HashMap;
import java.util.Map;

public final class Header {

	private int crcCode = 0xabef0101;

    private int length;   // 消息长度

    private long sessionID;   // 会话ID

    private byte type;   // 消息类型

    private byte priority;   // 消息优先级

    private Map<String, Object> attachment = new HashMap<String, Object>();   // 附件

	public final int getCrcCode() {
		return crcCode;
	}

	public final void setCrcCode(int crcCode) {
		this.crcCode = crcCode;
	}

	public final int getLength() {
		return length;
	}

	public final void setLength(int length) {
		this.length = length;
	}

	public final long getSessionID() {
		return sessionID;
	}

	public final void setSessionID(long sessionID) {
		this.sessionID = sessionID;
	}

	public final byte getType() {
		return type;
	}

	public final void setType(byte type) {
		this.type = type;
	}

	public final byte getPriority() {
		return priority;
	}

	public final void setPriority(byte priority) {
		this.priority = priority;
	}

	public final Map<String, Object> getAttachment() {
		return attachment;
	}

	public final void setAttachment(Map<String, Object> attachment) {
		this.attachment = attachment;
	}
	
	/*
     * (non-Javadoc)
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
	return "Header [crcCode=" + crcCode + ", length=" + length
		+ ", sessionID=" + sessionID + ", type=" + type + ", priority="
		+ priority + ", attachment=" + attachment + "]";
    }
    
}


NettyMessage.java文件内容如下:
package com.shihuan.netty.protocol.struct;

public final class NettyMessage {

	private Header header;  //消息头

    private Object body;   //消息体

	public final Header getHeader() {
		return header;
	}

	public final void setHeader(Header header) {
		this.header = header;
	}

	public final Object getBody() {
		return body;
	}

	public final void setBody(Object body) {
		this.body = body;
	}
	
	/*
     * (non-Javadoc)
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
    	return "NettyMessage [header=" + header + "]";
    }
    
}


ChannelBufferByteInput.java文件内容如下:
package com.shihuan.netty.protocol.codec;

import io.netty.buffer.ByteBuf;

import java.io.IOException;

import org.jboss.marshalling.ByteInput;

class ChannelBufferByteInput implements ByteInput {

	private final ByteBuf buffer;

    public ChannelBufferByteInput(ByteBuf buffer) {
        this.buffer = buffer;
    }
	
	@Override
	public void close() throws IOException {
		
	}

	@Override
	public int available() throws IOException {
		return buffer.readableBytes();
	}

	@Override
	public int read() throws IOException {
		if (buffer.isReadable()) {
            return buffer.readByte() & 0xff;
        }
        return -1;
	}

	@Override
	public int read(byte[] array) throws IOException {
		return read(array, 0, array.length);
	}

	@Override
	public int read(byte[] dst, int dstIndex, int length) throws IOException {
		int available = available();
        if (available == 0) {
            return -1;
        }

        length = Math.min(available, length);
        buffer.readBytes(dst, dstIndex, length);
        return length;
	}

	@Override
	public long skip(long bytes) throws IOException {
		int readable = buffer.readableBytes();
        if (readable < bytes) {
            bytes = readable;
        }
        buffer.readerIndex((int) (buffer.readerIndex() + bytes));
        return bytes;
	}

}


ChannelBufferByteOutput.java文件内容如下:
package com.shihuan.netty.protocol.codec;

import io.netty.buffer.ByteBuf;

import java.io.IOException;

import org.jboss.marshalling.ByteOutput;

class ChannelBufferByteOutput implements ByteOutput {

	private final ByteBuf buffer;
	
	/**
     * Create a new instance which use the given {@link ByteBuf}
     */
    public ChannelBufferByteOutput(ByteBuf buffer) {
        this.buffer = buffer;
    }
	
	@Override
	public void close() throws IOException {
		
	}

	@Override
	public void flush() throws IOException {
		
	}

	@Override
	public void write(int b) throws IOException {
		buffer.writeByte(b);
	}

	@Override
	public void write(byte[] bytes) throws IOException {
		buffer.writeBytes(bytes);
	}

	@Override
	public void write(byte[] bytes, int srcIndex, int length) throws IOException {
		buffer.writeBytes(bytes, srcIndex, length);
	}
	
	/**
     * Return the {@link ByteBuf} which contains the written content
     */
    ByteBuf getBuffer() {
        return buffer;
    }

}


MarshallingCodecFactory.java文件内容如下:
package com.shihuan.netty.protocol.codec;

import java.io.IOException;

import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
import org.jboss.marshalling.Unmarshaller;

public final class MarshallingCodecFactory {

	/**
	 * 创建Jboss Marshaller
	 * @throws IOException
	 */
	protected static Marshaller buildMarshalling() throws IOException {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
		return marshaller;
	}

	/**
	 * 创建Jboss Unmarshaller
	 * @throws IOException
	 */
	protected static Unmarshaller buildUnMarshalling() throws IOException {
		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
		final MarshallingConfiguration configuration = new MarshallingConfiguration();
		configuration.setVersion(5);
		final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
		return unmarshaller;
	}
	
}


MarshallingDecoder.java文件内容如下:
package com.shihuan.netty.protocol.codec;

import io.netty.buffer.ByteBuf;

import java.io.IOException;

import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;

public class MarshallingDecoder {

	private final Unmarshaller unmarshaller;
	
	/**
	 * Creates a new decoder whose maximum object size is {@code 1048576} bytes.
	 * If the size of the received object is greater than {@code 1048576} bytes,
	 * a {@link StreamCorruptedException} will be raised.
	 * 
	 * @throws IOException
	 * 
	 */
	public MarshallingDecoder() throws IOException {
		unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
	}

	protected Object decode(ByteBuf in) throws Exception {
		int objectSize = in.readInt();
		ByteBuf buf = in.slice(in.readerIndex(), objectSize);
		ByteInput input = new ChannelBufferByteInput(buf);
		try {
			unmarshaller.start(input);
			Object obj = unmarshaller.readObject();
			unmarshaller.finish();
			in.readerIndex(in.readerIndex() + objectSize);
			return obj;
		} finally {
			unmarshaller.close();
		}
	}
    
}


MarshallingEncoder.java文件内容如下:
package com.shihuan.netty.protocol.codec;

import java.io.IOException;

import org.jboss.marshalling.Marshaller;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;

@Sharable
public class MarshallingEncoder {

	private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
	Marshaller marshaller;

	public MarshallingEncoder() throws IOException {
		marshaller = MarshallingCodecFactory.buildMarshalling();
	}

	protected void encode(Object msg, ByteBuf out) throws Exception {
		try {
			int lengthPos = out.writerIndex();
			out.writeBytes(LENGTH_PLACEHOLDER);
			ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
			marshaller.start(output);
			marshaller.writeObject(msg);
			marshaller.finish();
			out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
		} finally {
			marshaller.close();
		}
	}
	
}


NettyMessageDecoder.java文件内容如下:
package com.shihuan.netty.protocol.codec;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import com.shihuan.netty.protocol.struct.Header;
import com.shihuan.netty.protocol.struct.NettyMessage;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

	MarshallingDecoder marshallingDecoder;
	
	public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException {
		super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
		marshallingDecoder = new MarshallingDecoder();
	}

	@Override
	protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
		ByteBuf frame = (ByteBuf) super.decode(ctx, in);
		if (frame == null) {
			return null;
		}

		NettyMessage message = new NettyMessage();
		Header header = new Header();
		header.setCrcCode(frame.readInt());
		header.setLength(frame.readInt());
		header.setSessionID(frame.readLong());
		header.setType(frame.readByte());
		header.setPriority(frame.readByte());

		int size = frame.readInt();
		if (size > 0) {
			Map<String, Object> attch = new HashMap<String, Object>(size);
			int keySize = 0;
			byte[] keyArray = null;
			String key = null;
			for (int i = 0; i < size; i++) {
				keySize = frame.readInt();
				keyArray = new byte[keySize];
				frame.readBytes(keyArray);
				key = new String(keyArray, "UTF-8");
				attch.put(key, marshallingDecoder.decode(frame));
			}
			keyArray = null;
			key = null;
			header.setAttachment(attch);
		}
		if (frame.readableBytes() > 4) {
			message.setBody(marshallingDecoder.decode(frame));
		}
		message.setHeader(header);
		return message;
	}
	
}


NettyMessageEncoder.java文件内容如下:
package com.shihuan.netty.protocol.codec;

import java.io.IOException;
import java.util.Map;

import com.shihuan.netty.protocol.struct.NettyMessage;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public final class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

	MarshallingEncoder marshallingEncoder;

    public NettyMessageEncoder() throws IOException {
    	this.marshallingEncoder = new MarshallingEncoder();
    }
	
	@Override
	protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception {
		if (msg == null || msg.getHeader() == null) {
			throw new Exception("The encode message is null");
		}
		sendBuf.writeInt((msg.getHeader().getCrcCode()));
		sendBuf.writeInt((msg.getHeader().getLength()));
		sendBuf.writeLong((msg.getHeader().getSessionID()));
		sendBuf.writeByte((msg.getHeader().getType()));
		sendBuf.writeByte((msg.getHeader().getPriority()));
		sendBuf.writeInt((msg.getHeader().getAttachment().size()));
		String key = null;
		byte[] keyArray = null;
		Object value = null;
		for (Map.Entry<String, Object> param : msg.getHeader().getAttachment().entrySet()) {
			key = param.getKey();
			keyArray = key.getBytes("UTF-8");
			sendBuf.writeInt(keyArray.length);
			sendBuf.writeBytes(keyArray);
			value = param.getValue();
			marshallingEncoder.encode(value, sendBuf);
		}
		key = null;
		keyArray = null;
		value = null;
		if (msg.getBody() != null) {
			marshallingEncoder.encode(msg.getBody(), sendBuf);
		} else {
			sendBuf.writeInt(0);
		}
		sendBuf.setInt(4, sendBuf.readableBytes()-8);
	}

}


HeartBeatRespHandler.java文件内容如下:
package com.shihuan.netty.protocol.server;

import com.shihuan.netty.protocol.MessageType;
import com.shihuan.netty.protocol.struct.Header;
import com.shihuan.netty.protocol.struct.NettyMessage;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class HeartBeatRespHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		NettyMessage message = (NettyMessage) msg;
		// 返回心跳应答消息
		if (message.getHeader()!=null && message.getHeader().getType()==MessageType.HEARTBEAT_REQ.value()) {
			System.out.println("Receive client heart beat message : ---> " + message);
			NettyMessage heartBeat = buildHeatBeat();
			System.out.println("Send heart beat response message to client : ---> " + heartBeat);
			ctx.writeAndFlush(heartBeat);
		} else
			ctx.fireChannelRead(msg);
	}

	private NettyMessage buildHeatBeat() {
		NettyMessage message = new NettyMessage();
		Header header = new Header();
		header.setType(MessageType.HEARTBEAT_RESP.value());
		message.setHeader(header);
		return message;
	}
	
}


LoginAuthRespHandler.java文件内容如下:
package com.shihuan.netty.protocol.server;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.shihuan.netty.protocol.MessageType;
import com.shihuan.netty.protocol.struct.Header;
import com.shihuan.netty.protocol.struct.NettyMessage;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class LoginAuthRespHandler extends ChannelHandlerAdapter {

	private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();
	private String[] whitekList = { "127.0.0.1", "192.168.1.104" };

	/**
	 * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to
	 * the next {@link ChannelHandler} in the {@link ChannelPipeline}.
	 * 
	 * Sub-classes may override this method to change behavior.
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		NettyMessage message = (NettyMessage) msg;

		// 如果是握手请求消息,处理,其它消息透传
		if (message.getHeader()!=null && message.getHeader().getType()==MessageType.LOGIN_REQ.value()) {
			String nodeIndex = ctx.channel().remoteAddress().toString();
			NettyMessage loginResp = null;
			// 重复登陆,拒绝
			if (nodeCheck.containsKey(nodeIndex)) {
				loginResp = buildResponse((byte) -1);
			} else {
				InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
				String ip = address.getAddress().getHostAddress();
				boolean isOK = false;
				for (String WIP : whitekList) {
					if (WIP.equals(ip)) {
						isOK = true;
						break;
					}
				}
				loginResp = isOK ? buildResponse((byte) 0) : buildResponse((byte) -1);
				if (isOK) {
					nodeCheck.put(nodeIndex, true);
				}
			}
			System.out.println("The login response is : " + loginResp + " body [" + loginResp.getBody() + "]");
			ctx.writeAndFlush(loginResp);
		} else {
			ctx.fireChannelRead(msg);
		}
	}

	private NettyMessage buildResponse(byte result) {
		NettyMessage message = new NettyMessage();
		Header header = new Header();
		header.setType(MessageType.LOGIN_RESP.value());
		message.setHeader(header);
		message.setBody(result);
		return message;
	}

	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		nodeCheck.remove(ctx.channel().remoteAddress().toString());  // 删除缓存
		ctx.close();
		ctx.fireExceptionCaught(cause);
	}
	
}


NettyServer.java文件内容如下:
package com.shihuan.netty.protocol.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.io.IOException;

import com.shihuan.netty.protocol.NettyConstant;
import com.shihuan.netty.protocol.codec.NettyMessageDecoder;
import com.shihuan.netty.protocol.codec.NettyMessageEncoder;

public class NettyServer {

	public void bind() throws Exception {
		// 配置服务端的NIO线程组
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws IOException {
					ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
					ch.pipeline().addLast(new NettyMessageEncoder());
					ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
					ch.pipeline().addLast(new LoginAuthRespHandler());
					ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler());
				}
			});
	
			// 绑定端口,同步等待成功
			ChannelFuture f = b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
			System.out.println("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
			// 等待服务端监听端口关闭
			f.channel().closeFuture().sync();
			
			
			// 绑定端口,同步等待成功
			//b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
			//System.out.println("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
		} finally {
			// 优雅退出,释放线程池资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	
	public static void main(String[] args) throws Exception {
		new NettyServer().bind();
	}
	
}


HeartBeatReqHandler.java文件内容如下:
package com.shihuan.netty.protocol.client;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.shihuan.netty.protocol.MessageType;
import com.shihuan.netty.protocol.struct.Header;
import com.shihuan.netty.protocol.struct.NettyMessage;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class HeartBeatReqHandler extends ChannelHandlerAdapter {

	private volatile ScheduledFuture<?> heartBeat;

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		NettyMessage message = (NettyMessage) msg;
		// 握手成功,主动发送心跳消息
		if (message.getHeader()!=null && message.getHeader().getType()==MessageType.LOGIN_RESP.value()) {
			heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
		} else if (message.getHeader()!=null && message.getHeader().getType()==MessageType.HEARTBEAT_RESP.value()) {
			System.out.println("Client receive server heart beat message : ---> " + message);
		} else {
			ctx.fireChannelRead(msg);
		}
	}

	private class HeartBeatTask implements Runnable {
		private final ChannelHandlerContext ctx;

		public HeartBeatTask(final ChannelHandlerContext ctx) {
			this.ctx = ctx;
		}

		@Override
		public void run() {
			NettyMessage heatBeat = buildHeatBeat();
			System.out.println("Client send heart beat messsage to server : ---> " + heatBeat);
			ctx.writeAndFlush(heatBeat);
		}

		private NettyMessage buildHeatBeat() {
			NettyMessage message = new NettyMessage();
			Header header = new Header();
			header.setType(MessageType.HEARTBEAT_REQ.value());
			message.setHeader(header);
			return message;
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		if (heartBeat != null) {
			heartBeat.cancel(true);
			heartBeat = null;
		}
		ctx.fireExceptionCaught(cause);
	}
	
}


LoginAuthReqHandler.java文件内容如下:
package com.shihuan.netty.protocol.client;

import com.shihuan.netty.protocol.MessageType;
import com.shihuan.netty.protocol.struct.Header;
import com.shihuan.netty.protocol.struct.NettyMessage;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class LoginAuthReqHandler extends ChannelHandlerAdapter {

	/**
	 * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward to the
	 * next {@link ChannelHandler} in the {@link ChannelPipeline}.
	 * 
	 * Sub-classes may override this method to change behavior.
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ctx.writeAndFlush(buildLoginReq());
	}

	/**
	 * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to
	 * the next {@link ChannelHandler} in the {@link ChannelPipeline}.
	 * 
	 * Sub-classes may override this method to change behavior.
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		NettyMessage message = (NettyMessage) msg;

		// 如果是握手应答消息,需要判断是否认证成功
		if (message.getHeader()!=null && message.getHeader().getType()==MessageType.LOGIN_RESP.value()) {
			byte loginResult = (byte) message.getBody();
			if (loginResult != (byte) 0) {
				// 握手失败,关闭连接
				ctx.close();
			} else {
				System.out.println("Login is ok : " + message);
				ctx.fireChannelRead(msg);
			}
		} else {
			ctx.fireChannelRead(msg);
		}
	}

	private NettyMessage buildLoginReq() {
		NettyMessage message = new NettyMessage();
		Header header = new Header();
		header.setType(MessageType.LOGIN_REQ.value());
		message.setHeader(header);
		return message;
	}

	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.fireExceptionCaught(cause);
	}
	
}


NettyClient.java文件内容如下:
package com.shihuan.netty.protocol.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.shihuan.netty.protocol.NettyConstant;
import com.shihuan.netty.protocol.codec.NettyMessageDecoder;
import com.shihuan.netty.protocol.codec.NettyMessageEncoder;

public class NettyClient {

	private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
	EventLoopGroup group = new NioEventLoopGroup();

	public void connect(int port, String host) throws Exception {
		// 配置客户端NIO线程组
		try {
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
					ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder());
					ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
					ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler());
					ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler());
				}
			});
			// 发起异步连接操作
			ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync();
			future.channel().closeFuture().sync();
		} finally {
			// 所有资源释放完成之后,清空资源,再次发起重连操作
			executor.execute(new Runnable() {
				@Override
				public void run() {
					try {
						TimeUnit.SECONDS.sleep(1);
						try {
							connect(NettyConstant.PORT, NettyConstant.REMOTEIP);   // 发起重连操作
						} catch (Exception e) {
							e.printStackTrace();
						}
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			});
		}
	}
	
	public static void main(String[] args) throws Exception {
		new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
	}
	
}



【注】: 附件中的protocolstack.rar文件是可运行的工程源代码。
  • 大小: 35.6 KB
分享到:
评论

相关推荐

    Netty中Marshalling编解码应用

    总的来说,Netty的Marshalling编解码应用是网络编程中的一种高级技巧,能够帮助开发者更高效地处理Java对象在网络中的传输。通过深入学习和实践,你可以掌握这一技术,并将其应用于分布式系统、微服务架构等多种场景...

    Netty4编写服务器客户端,自定义编解码,发送自定义消息

    在这个项目中,我们将深入理解如何利用 Netty 4 来编写服务器和客户端,实现自定义的消息编解码,并进行通信。 首先,我们要创建一个自定义的消息类。这个消息类通常会包含必要的字段,比如消息头、消息体等,以...

    Netty+自定义Protobuf编解码器

    在分布式系统和网络通信中,Netty是一个非常流行的高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。而Protocol Buffers(简称Protobuf)是Google开发的一种数据序列化协议...

    Netty之自定义编解码器.zip

    本压缩包文件"Netty之自定义编解码器.zip"着重讨论的是如何在Netty中自定义编解码器,这是Netty框架中的一个重要组成部分,用于将应用程序数据转换为网络传输的数据格式,以及将接收到的网络数据转换回应用程序可以...

    Netty的技术的总结(Marshalling编解码,tcp的拆包粘包,webservice),包括新手入门源码,注释清楚,绝对物超所值

    **Marshalling编解码**:Marshalling是将对象序列化成二进制的过程,便于在网络中传输。在Netty中,MarshallingDecoder和MarshallingEncoder是用于处理这种序列化和反序列化的组件。它们允许我们将Java对象转换为...

    Netty中Protobuf编解码应用

    Protobuf编解码器在Netty中的应用,使得基于Protobuf的数据在网络传输中变得更加便捷和高效。 首先,理解Protobuf的基本概念。Protobuf定义了一种语言中立、平台中立的数据表示格式,通过.proto文件定义消息结构。...

    Netty 框架学习 —— 编解码器框架(csdn)————程序.pdf

    在Netty中,编解码器是处理数据转换的关键组件,它们将原始的字节流转换为应用程序可理解的消息格式,反之亦然。本文将深入探讨Netty中的编解码器框架。 首先,我们需要理解编码器和解码器的基本概念。编码器负责将...

    Netty学习笔记_Springboot实现自定义协议.docx

    本文主要介绍了使用Netty框架在Springboot项目中实现自定义协议的方法。自定义协议是指在网络通信中,使用特定的数据格式来传输数据,以满足特定的业务需求。在本文中,我们将使用Springboot框架来实现一个简单的...

    netty自定义数据包协议

    Netty 是一个高性能、异步...总结来说,通过在Netty中自定义解码器和编码器,我们可以有效地处理网络通信中的拆包和粘包问题,确保数据的正确传输和解析。同时,理解并掌握这个过程对于开发高性能的网络应用至关重要。

    netty编码器,解码器实例

    // 自定义解码器 public class CustomMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List&lt;Object&gt; out) throws Exception { if (in....

    基于java序列化的Netty编解码技术

    4. **Netty的自定义编解码器**:除了使用内置的`SerializationCodec`,Netty还支持自定义编解码器,可以根据业务需求实现`MessageToByteEncoder`和`ByteToMessageDecoder`接口,以更灵活的方式处理数据编码和解码。...

    03-04-10-Netty编解码的艺术1

    【Netty 编解码的艺术】是Netty学习中的一个重要章节,主要探讨了在处理网络通信时遇到的数据拆包和粘包问题,以及Netty如何通过编码和解码技术来解决这些问题。 首先,TCP协议是一种面向流的协议,它不保证数据包...

    netty分隔符和定长解码器的应用

    它们可能包含了示例代码或教程,帮助开发者理解如何在实际项目中应用这两种解码器。通过学习这些例子,你可以了解如何在Netty管道(ChannelPipeline)中添加解码器,以及如何处理解码后的事件。通常,解码器会被添加...

    分布式锁,基于Netty长连接实现,自定义协议,高性能锁

    基于Netty实现的分布式锁,利用了Netty作为高效的异步事件驱动网络应用框架,可以提供长连接服务,降低网络开销,提高系统的吞吐量和响应速度。 Netty是Java领域的一款优秀网络通信框架,它提供了高度可定制化的...

    基于netty5的自定义协议Demo

    在Netty中,自定义协议的实现主要涉及到两个关键组件:编码器(Encoder)和解码器(Decoder)。编码器负责将应用层的数据转换为网络传输的数据格式,解码器则负责将接收到的网络数据还原为应用层能理解的格式。在这...

    netty消息拆包粘包编解码(java代码)

    3. **Netty中的编解码处理**:Netty 提供了多种ChannelHandler(处理器)来解决这个问题,其中`LengthFieldBasedFrameDecoder`和`LengthFieldPrepender`是处理粘包和拆包的常用工具。 - `...

    基于Protostuff实现的Netty编解码器.docx

    然而,为了在Netty中使用这些序列化和反序列化功能,我们需要创建自定义的编解码器。Netty提供了一些基础类来帮助我们实现这一目标。 对于编码器,我们继承`MessageToByteEncoder`。这个类期望我们实现`encode`方法...

    《netty实战》http协议、自定义协议、自定义RPC模块学习源码.zip

    在Netty中,你可以通过定义ByteToMessageDecoder和MessageToByteEncoder等编解码器来处理自定义协议的数据帧。这些编解码器允许你将接收到的原始字节流转换为易于处理的对象,或者将业务对象编码为字节流进行发送。 ...

Global site tag (gtag.js) - Google Analytics