`
yuancihang
  • 浏览: 146456 次
  • 性别: Icon_minigender_1
  • 来自: 洛阳
社区版块
存档分类
最新评论

netty udp编程

    博客分类:
  • java
 
阅读更多
package tmp.net.netty.udp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;

public class NettyUdp {

	private int size;
	private ConnectionlessBootstrap b;
	private DatagramChannel datagramChannel;

	public NettyUdp(int port, int size) {
		super();
		this.size = size;
		
		init(port);
	}
	
	private void init(int port){
		b = new ConnectionlessBootstrap(new NioDatagramChannelFactory(Executors.newCachedThreadPool()));

		b.setOption("tcpNoDelay", true);
		b.setOption("receiveBufferSize", 1048576); // 1M
		 
		b.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				ChannelPipeline pipeline = Channels.pipeline();
				pipeline.addLast("stick", new FixedLengthFrameDecoder(size));
				pipeline.addLast("decoder", new UDPDecoder(size));
				pipeline.addLast("encoder", new UDPEncoder(size));
				pipeline.addLast("logic", new MyHandler());

				return pipeline;
			}
		});
		
		datagramChannel = (DatagramChannel) b.bind(new InetSocketAddress(port));
		System.out.println(" Server is starting ……");
	}
	
	public void writeString(String message, String remoteHost, int remotePort) {
		datagramChannel.write(message, new InetSocketAddress(remoteHost, remotePort));
	}

	public static void main(String[] args) throws IOException {
		new NettyUdp(1000, 100);
		
	}
	
	public void shutdown(){
		if(datagramChannel != null){
			datagramChannel.close();
		}
		if(b != null){
			b.releaseExternalResources();
		}
	}

}

class MyHandler extends SimpleChannelUpstreamHandler {

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		System.out.println(e.getRemoteAddress() + " ->:" + e.getMessage());
		
//		e.getChannel().write(message, remoteAddress)
	}

}

 

编码器:

package tmp.net.netty.udp;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

class UDPEncoder extends OneToOneEncoder {

	public static final char BLANK = ' ';

	private int size;

	public UDPEncoder(int size) {
		super();
		this.size = size;
	}

	@Override
	protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
		if(!(msg instanceof String)){
			return msg;
		}
		
		String message = (String) msg;
		byte[] body = message.getBytes("UTF-8");

		ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
		buf.writeBytes(body);
		fill(buf, size - body.length);
		return buf;
	}

	private void fill(ChannelBuffer buf, int size) {
		for (int i = 0; i < size; i++) {
			buf.writeByte(BLANK);
		}
	}

}

 

 

 

解码器:

package tmp.net.netty.udp;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

class UDPDecoder extends FrameDecoder {

	private int size;

	public UDPDecoder(int size) {
		super();
		this.size = size;
	}

	@Override
	protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
		byte[] body = new byte[size];
		buffer.readBytes(body);

		return new String(body, "UTF-8").trim();
	}
}

 

 

 

测试服务端:

public static void main(String[] args) throws Exception {
		new NettyUdp(1000, 100);

	}

 

 

测试客户端:

	public static void main(String[] args) {
		NettyUdp udp = new NettyUdp(1001, 100);
		for(int i=0; i<5; i++){
			udp.writeString("中国", "127.0.0.1", 1000);
		}
//		udp.shutdown();
	}

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics