`
Donald_Draper
  • 浏览: 981133 次
社区版块
存档分类
最新评论

netty 网络通信示例二

阅读更多
netty 网络通信示例一 :http://donald-draper.iteye.com/blog/2383326
上篇文章我们看了一个简单的网络通信实例,在通信的过程成由于网络等原因,可能存在粘包的问题,对于粘包问题,处理呢。我们来看一个获取服务器时间的实例,这个实例也许不够恰当,我们只是示范处理粘包问题:
服务端:
package netty.main.time;

import java.net.InetSocketAddress;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import netty.handler.time.TimeServerHandler;

/**
 * 
 * @author donald
 * 2017年6月21日
 * 下午12:48:17
 */
public class TimeServer {
	private static final Logger log = LoggerFactory.getLogger(TimeServer.class);
	 static final boolean SSL = System.getProperty("ssl") != null;
	private static final  String ip = "192.168.31.153";
	private static final  int port = 10010;
    public static void main(String[] args) throws Exception {
      run();
    }
    public static void run() throws Exception {
    	 // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

    	/*
    	 * EventLoopGroup(多线程事件loop),处理IO操作,这里我们用了两个事件loop
    	 * 第一个boss用于处理器监听连接请求,第二个worker用于数据的传输;
    	 * 具体线程是多少依赖于事件loop的具体实现
    	 * */
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
        	//ServerBootstrap,用于配置服务端,一般为ServerSocket通道
            ServerBootstrap serverBoot = new ServerBootstrap(); 
            serverBoot.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) 
             .childHandler(new ChannelInitializer<SocketChannel>() { 
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                	//添加通道处理器到通道关联的管道,准确的中文翻译为管道线, 此管道线与Mina中过滤链十分相似,
                	//ChannelInitializer用于配置通道的管道线,ChannelPipeline
                	 ChannelPipeline pipeline = ch.pipeline();
                     if (sslCtx != null) {
                    	 pipeline.addLast(sslCtx.newHandler(ch.alloc()));
                     }
//                     pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                     pipeline.addLast(new TimeServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)//socket监听器连接队列大小、
             .childOption(ChannelOption.SO_KEEPALIVE, true); //保活,此配置针对ServerSocket通道接收连接产生的Socket通道
            InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
            // 绑定地址,开始监听
            ChannelFuture f = serverBoot.bind(inetSocketAddress).sync();
            log.info("=========Server is start=========");
            //等待,直到ServerSocket关闭
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

服务端处理器:
package netty.handler.time;

import java.nio.charset.Charset;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 
 * @author donald
 * 2017年6月21日
 * 下午12:48:01
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
   private static final Logger log = LoggerFactory.getLogger(TimeServerHandler.class);
   private static final String TIME_PROTOCL = "?time";
   private static final Charset charsetDecoder= Charset.forName("UTF-8");
   /**
    * 读client通道数据,通道处理器上下文ChannelHandlerContext与Mina的会话很像
    */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    	ByteBuf in = (ByteBuf)msg;
    	String message = (String) in.readCharSequence(in.writerIndex(), charsetDecoder);
        log.info("===Server reciever message:" +message);
        if(message.equals(TIME_PROTOCL)){
        	//通过通道处理器上下文的ByteBufAllocator创建容量至少为8个字节的ByteBuf
        	ByteBuf time = ctx.alloc().buffer(8);
        	time.writeLong(System.currentTimeMillis());
        	/*
        	在发送数据时,我们并没有调用nio的ByteBuffer#flip类似的方法,这是由于
        	为了避免nio忘记flip操作的问题,Netty通过readIndex和writeIndex两个index
        	表示ByteBuf的相对开始和结束位置;当向ByteBuffer中写数据时,writeIndex将会增长,
        	而readIndex不变。
        	*/
        	/*
        	ctx#write,writeAndFlush方法返回一个写结果ChannelFuture,
        	ChannelFuture表示一个IO事件操作,如果想要在ctx写操作后,关闭连接,不可以用如下方式:
        	Channel ch = ...;
        	ch.writeAndFlush(message);
        	ch.close();
        	因为Netty的写操作时异步的,上面这种关闭连接方式,有可能在消息没发送完前,连接已经关闭,为了
        	能在消息发送完毕后再关闭会话,可以通过添加通道结果监听器,在消息发送完时,触发监听器operationComplete
        	事件。*/
        	
        	final ChannelFuture cfuture = ctx.writeAndFlush(time);
        	final ChannelHandlerContext ctx_refer = ctx;
        	cfuture.addListener(new ChannelFutureListener() {
        	        @Override
        	        public void operationComplete(ChannelFuture future) {
        	            assert cfuture == future;
        	            ctx_refer.close();
        	        }
        	 }); 
        	//上面添加监听器,可以直接使用通道结果监听器内部的CLOSE监听器
        	//cfuture.addListener(ChannelFutureListener.CLOSE);
        }   
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    	//异常发生时,关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}


客户端:
package netty.main.time;

import java.net.InetSocketAddress;

import javax.net.ssl.SSLException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import netty.handler.time.TimeClientHandler;
/**
 * 
 * @author donald
 * 2017年6月21日
 * 下午12:48:10
 */
public final class TimeClient {
	private static final Logger log = LoggerFactory.getLogger(TimeClient.class);
	private static final boolean SSL = System.getProperty("ssl") != null;
	private static final String ip = System.getProperty("host", "192.168.31.153");
	private static final int port = Integer.parseInt(System.getProperty("port", "10010"));
    public static void main(String[] args) throws Exception {
       run();
    }
    private static void run() throws SSLException, InterruptedException{
    	 //配置安全套接字上下文
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
        	//Bootstrap与 ServerBootstrap相似,不同的是Bootstrap用于配置客户端,
        	//一般为Socket通道,或无连接通道
            Bootstrap bootstrap = new Bootstrap();
            //EventLoopGroup有 boss和worker两组,对于客户端只需要用worker
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                	 //添加安全套接字处理器和通道处理器到
                     ChannelPipeline pipeline = ch.pipeline();
                     if (sslCtx != null) {
                    	 pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port));
                     }
//                     pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                     pipeline.addLast(new TimeClientHandler());
                 }
             });
            InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
            //连接socket地址
            ChannelFuture f = bootstrap.connect(inetSocketAddress).sync();
            log.info("=========Client is start=========");
            //等待,直到连接关闭
            f.channel().closeFuture().sync();
        } finally {
        	workerGroup.shutdownGracefully();
        }
    }
}

客户端处理器:
package netty.handler.time;


import java.nio.charset.Charset;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * 
 * @author donald
 * 2017年6月21日
 * 下午12:47:53
 */
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
	private static final Logger log = LoggerFactory.getLogger(TimeClientHandler.class);
	private static final String TIME_PROTOCL = "?time";
	private static final Charset charsetEncoder= Charset.forName("UTF-8");
	/**
	 * 在通道连接建立时(准备传输数据)触发
	 */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    	ByteBuf timeReq = ctx.alloc().buffer(5);
    	timeReq.writeCharSequence(TIME_PROTOCL, charsetEncoder);
    	ctx.writeAndFlush(timeReq);
    }  
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    	ByteBuf in = (ByteBuf)msg;
    	long nowTime = in.readLong();
    	Date nowDay = new Date(nowTime);
    	log.info("===Server Time:" +nowDay.toLocaleString());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

启动服务端与客户端,控制台输出:
服务端:
[INFO ] 2017-07-05 22:50:09 netty.main.time.TimeServer =========Server is start=========
[INFO ] 2017-07-05 22:50:14 netty.handler.time.TimeServerHandler ===Server reciever message:?time
客户端:
[INFO ] 2017-07-05 22:50:14 netty.main.time.TimeClient =========Client is start=========
[INFO ] 2017-07-05 22:50:14 netty.handler.time.TimeClientHandler ===Server Time:2017-7-5 22:50:14

针对粘包问题,我们对上面的实例进行改造:
服务端:
package netty.main.time;

import java.net.InetSocketAddress;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import netty.handler.time.TimeServerHandler2;

/**
 * 
 * @author donald
 * 2017年6月21日
 * 下午12:48:17
 */
public class TimeServerForDecoder {
	private static final Logger log = LoggerFactory.getLogger(TimeServerForDecoder.class);
	 static final boolean SSL = System.getProperty("ssl") != null;
	private static final  String ip = "192.168.31.153";
	private static final  int port = 10010;
    public static void main(String[] args) throws Exception {
      run();
    }
    public static void run() throws Exception {
    	 // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

    	/*
    	 * EventLoopGroup(多线程事件loop),处理IO操作,这里我们用了两个事件loop
    	 * 第一个boss用于处理器监听连接请求,第二个worker用于数据的传输;
    	 * 具体线程是多少依赖于事件loop的具体实现
    	 * */
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
        	//ServerBootstrap,用于配置服务端,一般为ServerSocket通道
            ServerBootstrap serverBoot = new ServerBootstrap(); 
            serverBoot.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) 
             .childHandler(new ChannelInitializer<SocketChannel>() { 
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                	//添加通道处理器到通道关联的管道,准确的中文翻译为管道线, 此管道线与Mina中过滤链十分相似,
                	//ChannelInitializer用于配置通道的管道线,ChannelPipeline
                	 ChannelPipeline pipeline = ch.pipeline();
                     if (sslCtx != null) {
                    	 pipeline.addLast(sslCtx.newHandler(ch.alloc()));
                     }
//                     pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                     pipeline.addLast(new TimeServerHandler2());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)//socket监听器连接队列大小、
             .childOption(ChannelOption.SO_KEEPALIVE, true); //保活,此配置针对ServerSocket通道接收连接产生的Socket通道
            InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
            // 绑定地址,开始监听
            ChannelFuture f = serverBoot.bind(inetSocketAddress).sync();
            log.info("=========Server is start=========");
            //等待,直到ServerSocket关闭
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
  
}


服务端处理器:
package netty.handler.time;

import java.nio.charset.Charset;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 
 * @author donald
 * 2017年6月21日
 * 下午12:48:01
 */
public class TimeServerHandler2 extends ChannelInboundHandlerAdapter {
   private static final Logger log = LoggerFactory.getLogger(TimeServerHandler2.class);
   private static final String TIME_PROTOCL = "?time";
   private static final Charset charsetDecoder= Charset.forName("UTF-8");
   /**
    * 读client通道数据,通道处理器上下文ChannelHandlerContext与Mina的会话很像
    */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    	ByteBuf in = (ByteBuf)msg;
    	String message = (String) in.readCharSequence(in.writerIndex(), charsetDecoder);
        log.info("===Server reciever message:" +message);
        if(message.equals(TIME_PROTOCL)){
        	//通过通道处理器上下文的ByteBufAllocator创建容量至少为8个字节的ByteBuf
        	ByteBuf time = ctx.alloc().buffer(8);
        	time.writeLong(System.currentTimeMillis());
        	/*
        	在发送数据时,我们并没有调用nio的ByteBuffer#flip类似的方法,这是由于
        	为了避免nio忘记flip操作的问题,Netty通过readIndex和writeIndex两个index
        	表示ByteBuf的相对开始和结束位置;当向ByteBuffer中写数据时,writeIndex将会增长,
        	而readIndex不变。
        	*/
        	/*
        	ctx#write,writeAndFlush方法返回一个写结果ChannelFuture,
        	ChannelFuture表示一个IO事件操作,如果想要在ctx写操作后,关闭连接,不可以用如下方式:
        	Channel ch = ...;
        	ch.writeAndFlush(message);
        	ch.close();
        	因为Netty的写操作时异步的,上面这种关闭连接方式,有可能在消息没发送完前,连接已经关闭,为了
        	能在消息发送完毕后再关闭会话,可以通过添加通道结果监听器,在消息发送完时,触发监听器operationComplete
        	事件。*/
        	ctx.writeAndFlush(time);
        }   
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    	//异常发生时,关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

客户端:
package netty.main.time;

import java.net.InetSocketAddress;

import javax.net.ssl.SSLException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import netty.codec.time.TimeDecoder;
import netty.handler.time.TimeClientHandler;
/**
 *  客户端要与TimeServerForDecoder服务端配合使用
 * @author donald
 * 2017年6月21日
 * 下午12:48:10
 */
public final class TimeClientWithDecoder {
	private static final Logger log = LoggerFactory.getLogger(TimeClientWithDecoder.class);
	private static final boolean SSL = System.getProperty("ssl") != null;
	private static final String ip = System.getProperty("host", "192.168.31.153");
	private static final int port = Integer.parseInt(System.getProperty("port", "10010"));
    public static void main(String[] args) throws Exception {
       run();
    }
    private static void run() throws SSLException, InterruptedException{
    	 //配置安全套接字上下文
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
        	//Bootstrap与 ServerBootstrap相似,不同的是Bootstrap用于配置客户端,
        	//一般为Socket通道,或无连接通道
            Bootstrap bootstrap = new Bootstrap();
            //EventLoopGroup有 boss和worker两组,对于客户端只需要用worker
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                	 //添加安全套接字处理器和通道处理器到
                     ChannelPipeline pipeline = ch.pipeline();
                     if (sslCtx != null) {
                    	 pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port));
                     }
//                     pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                     pipeline.addLast(new TimeDecoder(),new TimeClientHandler());
                 }
             });
            InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
            //连接socket地址
            ChannelFuture f = bootstrap.connect(inetSocketAddress).sync();
            log.info("=========Client is start=========");
            //等待,直到连接关闭
            f.channel().closeFuture().sync();
        } finally {
        	workerGroup.shutdownGracefully();
        }
    }
}

客户端解码器:
package netty.codec.time;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
 * 字节流消息解码器ByteToMessageDecoder,是#ChannelInboundHandler的实现,可以解决粘包问题;
 * 字节消息解码的内部有一个可累计buffer,当有数据到达时,将会调用#decode方法,解码消息,如果累计buffer中
 * 没有足够的数据,则不会添加对象到out,如果有对象添加到out,表示解码器成功解码了一个消息;我们不需要一次解码多个消息,
 * 解码器将会不断地调用#decode方法,直到没有对象可以添加到out。
 * @author donald
 * 2017年6月22日
 * 上午8:55:20
 */
public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 8) {
            return;
        }
        out.add(in.readBytes(8));
    }
}

启动服务端与客户端:
服务端:
[INFO ] 2017-07-05 22:57:58 netty.main.time.TimeServerForDecoder =========Server is start=========
[INFO ] 2017-07-05 22:58:08 netty.handler.time.TimeServerHandler2 ===Server reciever message:?time
客户端:
[INFO ] 2017-07-05 22:58:08 netty.main.time.TimeClientWithDecoder =========Client is start=========
[INFO ] 2017-07-05 22:58:08 netty.handler.time.TimeClientHandler ===Server Time:2017-7-5 22:58:08

解码器还有另外一种形式:
package netty.codec.time;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
/**
 * 回复解码器ReplayingDecoder为字节流消息解码器ByteToMessageDecoder的实现
 * @author donald
 * 2017年6月22日
 * 上午8:55:20
 */
public class TimeDecoder2 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(8));
    }
}

还有另外一种形式,在客户端处理器中处理,当然这种方式我们不建议使用:
package netty.handler.time;


import java.nio.charset.Charset;
import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 *TimeClientHandlerX处理粘包情况,
 *由于网络等原因,有时候我们不能够一次接收一个完成的数据包,我们必须等待完成的数据包,
 *我们可以等待数据包数据完成时,才解析数据。
 *在此示例中,我们接收的是一个8字节的long数据,在网络不佳的情况下,也有可能出现不能一次
 *接收的情况。
 * @author donald
 * 2017年6月21日
 * 下午12:47:53
 */
public class TimeClientHandler2 extends ChannelInboundHandlerAdapter {
	private static final Logger log = LoggerFactory.getLogger(TimeClientHandler2.class);
	private static final String TIME_PROTOCL = "?time";
	private static final Charset charsetEncoder= Charset.forName("UTF-8").newEncoder().charset();
	private ByteBuf buf;
	/**
	 * 在通道连接建立时(准备传输数据)触发
	 */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    	ByteBuf timeReq = ctx.alloc().buffer(5);
    	timeReq.writeCharSequence(TIME_PROTOCL, charsetEncoder);
    	ctx.writeAndFlush(timeReq);
    }  
    /**
     * Gets called after the {@link ChannelHandler} was added to the 
     * actual context and it's ready to handle events.
     * 在通道处理器添加到实际上下文,准备处理事件时触发,可以用于初始化阻塞时间较短的任务
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(8);
    }
    /**
     * Gets called after the {@link ChannelHandler} was removed from the actual
     *  context and it doesn't handle events anymore.
     * 在通道处理器从实际上下文移除,不再处理事件时触发,可以用于释放初始化任务申请的资源
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release();
        buf = null;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
    	ByteBuf in = (ByteBuf)msg;
    	//将所有的字节序列数据累积到buf中
    	buf.writeBytes(in);
    	in.release();
    	/*
    	 待buffer中有足够的数据时,解析数据,否则当更多的数据到达时,netty将会再次调用#channelRead方法
    	 */
    	if(buf.readableBytes()>=8){
    		long nowTime = buf.readLong();
        	Date nowDay = new Date(nowTime);
        	log.info("===Server Time:" +nowDay.toLocaleString());
    	}
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

分享到:
评论

相关推荐

    netty通信完整示例

    这个“netty通信完整示例”提供了一套完整的Netty使用案例,包括了jar包、服务器端、客户端、编码器和解码器,确保了你能够亲身体验Netty的强大功能。 首先,让我们深入理解Netty的核心概念: 1. **NIO(Non-...

    基于Netty框架的网络通信示例.zip

    基于Netty框架的网络通信示例 内容概要 本项目是一个基于Netty框架的网络通信示例,涵盖了多种网络通信场景,包括TCP、UDP、WebSocket等。项目展示了如何使用Netty进行高性能的异步网络编程,并提供了丰富的示例...

    Netty初始学习示例

    在网络通信中,由于数据包大小不固定,可能会出现粘包或拆包现象。Netty通过自定义的解码器如`LengthFieldBasedFrameDecoder`来解决这个问题,它根据长度字段来分割数据包。 7. **心跳机制**: 为了检测网络连接...

    Android基于Netty框架实现通信

    在Android开发中,为了实现高效的网络通信,开发者常常会选择使用Netty框架。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于多种协议的服务器和客户端应用。本篇文章将详细探讨如何在Android环境中利用...

    netty 入门Reactor示例

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入理解Netty的Reactor模式之前,我们先了解一下什么是Reactor模式。 Reactor模式是一种事件处理模式,它...

    基于netty的即时通信

    在即时通信领域,Netty作为一个高性能、异步事件驱动的网络应用框架,广泛应用于各种复杂的通信系统。本项目“基于Netty的即时通信”旨在演示如何利用Netty构建一个简单的聊天应用,它包括服务端和客户端,实现了多...

    netty框架,服务端、客户端代码示例

    在这个"Netty框架,服务端、客户端代码示例"中,我们将深入探讨如何使用Netty构建服务端和客户端的通信。 首先,让我们了解Netty的基本架构。Netty的核心是它的“线程模型”和“通道”概念。线程模型采用“事件循环...

    Android-netty和socket通信的demo

    本示例“Android-netty和socket通信的demo”将展示如何在Android平台上利用Netty进行网络通信,与传统的Socket编程相结合,实现更加灵活和高效的通信机制。 首先,理解Netty的基本概念是非常重要的。Netty是一个...

    Netty实现简单的客户端服务端通信示例

    总的来说,Netty 的简单客户端-服务端通信示例帮助我们理解了 Netty 的基本架构和工作原理。通过实践这个示例,你可以进一步探索 Netty 如何处理网络连接、数据传输以及自定义处理器的实现,从而更好地掌握 Netty 在...

    android netty cli +probuf示例

    总结来说,“android netty cli +protobuf示例”展示了如何在Android客户端利用Netty的异步网络通信能力和Protobuf的数据序列化特性,构建一个高效的即时通讯系统。这个示例涵盖了从消息定义到网络通信的全过程,...

    (源码)基于Netty框架的网络通信系统.zip

    2. 高性能通信利用Netty的高性能异步事件驱动框架,实现高效、低延迟的网络通信。 3. 多种通信模式包括同步IO、异步IO、多路复用等,适应不同的网络通信需求。 4. 丰富的示例代码提供了多种网络通信场景的示例代码,...

    基于Netty框架的网络通信项目.zip

    本项目通过多个示例代码,展示了如何使用Netty实现不同类型的网络通信,包括HTTP服务器、WebSocket服务器、Protobuf编解码、Thrift服务等。 项目的主要特性和功能 1. HTTP服务器 实现了一个简单的HTTP服务器,...

    Netty通信服务器搭建demo

    通过这个示例,我们可以学习到如何利用Netty创建服务器、处理I/O事件以及解决通信过程中的粘包问题。理解并熟练运用这些知识点,将有助于我们构建更高效、可靠的网络应用程序。在实际项目中,还需要考虑异常处理、...

    Android使用Netty网络框架实践(客户端、服务端)

    在Android开发中,有时我们需要构建高性能的网络通信应用,这时Netty框架就能派上大用场。Netty是一个异步事件驱动的网络应用程序框架,它为高性能、高可用性的网络服务器和客户端提供了一种简单易用的方式。本实践...

    netty通讯示例

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,...通过深入理解并实践这个“netty通讯示例”,你可以掌握Netty的基本用法,以及如何处理网络通信中的常见问题,这对于构建高并发、高性能的网络应用至关重要。

    netty5.0通信jar和客户端服务器demo

    总结来说,Netty 5.0提供了一套强大且灵活的网络通信框架,通过异步I/O、高效的内存管理以及灵活的Pipeline设计,极大地简化了高性能网络应用的开发。通过"netty_11-07_ok"这样的示例,开发者可以深入理解Netty的...

    java nio&netty系列之三netty网络模型代码以及简化版代码示例

    本篇将主要探讨Netty网络模型的代码实现及简化版代码示例。 首先,Netty的核心是它的事件驱动模型,也称为Reactor模式。在Netty中,BossGroup负责接收新的连接请求,WorkerGroup则处理已连接的SocketChannel上的...

    java netty通信

    通过上述知识点的学习和实践,你可以构建出稳定、高效的网络通信服务。在实际项目中,结合具体的业务需求,灵活运用Netty提供的各种组件和设计模式,能够大大提高开发效率和系统的可靠性。在压缩包文件`test`中,...

    netty官方例子

    4. **WebSocket服务器和客户端**:WebSocket提供双向通信,这个例子展示了如何用Netty实现WebSocket服务器和客户端,处理文本和二进制帧。 5. **TCP和UDP通信**:Netty支持TCP和UDP协议,你可以找到处理TCP连接和...

    netty-book:netty权威指南示例代码

    《Netty权威指南》是一本深入探讨Java网络编程框架Netty的专业书籍,其示例代码提供了丰富的实践场景,帮助读者更好地理解和应用Netty。Netty是Java领域内广泛使用的高性能、异步事件驱动的网络应用框架,适用于创建...

Global site tag (gtag.js) - Google Analytics