`

Netty 简单样例分析

 
阅读更多

Netty 是JBoss旗下的io传输的框架,他利用java里面的nio来实现高效,稳定的io传输。

作为io传输,就会有client和server,下面我们看看用netty怎样写client和server

Client:
需要做的事情:
1.配置client启动类
  ClientBootstrap bootstrap = new ClientBootstrap(..)

2.根据不同的协议或者模式为client启动类设置pipelineFactory。
这里telnet pipline Factory 在netty中已经存在,所有直接用
  bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
 也可以自己定义
 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(
                        new DiscardClientHandler(firstMessageSize));
            }
        });       
 这里DiscardClientHandler 就是自己定义的handler,他需要
 public class DiscardServerHandler extends SimpleChannelUpstreamHandler
 继承SimpleChannelUpstreamHandler  来实现自己的handler。这里DiscardClientHandler
 是处理自己的client端的channel,他的
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        // Server is supposed to send nothing.  Therefore, do nothing.
    }
  可以看到Discard client不需要接受任何信息
 
3.连接server
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

这里解释一下channelFuture:

在Netty中所有的io操作都是异步的,这也就是意味任何io访问,那么就立即返回处理,并且不能确保
返回的数据全部完成。因此就出现了channelFuture,channelFuture在传输数据时候包括数据和状态两个
部分。他只有Uncompleted和Completed

 
 

                                      +---------------------------+
                                      | Completed successfully    |
                                      +---------------------------+
                                 +---->      isDone() = true      |
 +--------------------------+    |    |   isSuccess() = true      |
 |        Uncompleted       |    |    +===========================+
 +--------------------------+    |    | Completed with failure    |
 |      isDone() = false    |    |    +---------------------------+
 |   isSuccess() = false    |----+---->   isDone() = true         |
 | isCancelled() = false    |    |    | getCause() = non-null     |
 |    getCause() = null     |    |    +===========================+
 +--------------------------+    |    | Completed by cancellation |
                                 |    +---------------------------+
                                 +---->      isDone() = true      |
                                      | isCancelled() = true      |
                                      +---------------------------+
 

 
 既然netty io是异步的,那么如何知道channel传送完成有两种方式,一种添加监听器
 addListener(ChannelFutureListener) 还有一种直接调用await()方法,这两种方式
 有下面的区别
 监听器:是以事件模式的,因此代码就需要用事件模式的样式去写,相当复杂,但他是non-blocking模式的
 性能方面要比await方法好,而且不会产生死锁情况
 
 await(): 直接方法调用,使用简单,但是他是blocking模式,性能方面要弱而且会产生死锁情况
 
 不要在ChannelHandler 里面调用await(),这是因为通常在channelHandler里的event method是被i/o线程调用的
 (除非ChannelPipeline里面有个ExecutionHandler),那么如果这个时候用await就容易产生死锁。
 
 

// BAD - NEVER DO THIS
 @Override
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     if (e.getMessage() instanceof GoodByeMessage) {
         ChannelFuture future = e.getChannel().close();
         future.awaitUninterruptibly();
         // Perform post-closure operation
         // ...
     }
 }

 // GOOD
 @Override
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     if (e.getMessage() instanceof GoodByeMessage) {
         ChannelFuture future = e.getChannel().close();
         future.addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture future) {
                 // Perform post-closure operation
                 // ...
             }
         });
     }
 }

 
 虽然await调用比较危险,但是你确保不是在一个i/o 线程中调用该方法,毕竟await方法还是很简洁方便的,如果
 调用该方法是在一个i/o 线程,那么就会抛出 IllegalStateException
 
 await的timeout和i/o timeout区别
 需要注意的是这两个timeout是不一样的, #await(long),#await(long, TimeUnit), #awaitUninterruptibly(long),
 #awaitUninterruptibly(long, TimeUnit) 这里面的timeout也i/o timeout 没有任何关系,如果io timeout,那么
 channelFuture 将被标记为completed with failure,而await的timeout 与future完全没有关系,只是await动作的
 timeout。
 

 // BAD - NEVER DO THIS
 ClientBootstrap b = ...;
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly(10, TimeUnit.SECONDS);
 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     // You might get a NullPointerException here because the future
     // might not be completed yet.
     f.getCause().printStackTrace();
 } else {
     // Connection established successfully
 }

 // GOOD
 ClientBootstrap b = ...;
 // Configure the connect timeout option.
 b.setOption("connectTimeoutMillis", 10000);
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly();

 // Now we are sure the future is completed.
 assert f.isDone();

 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     f.getCause().printStackTrace();
 } else {
     // Connection established successfully
 }

 
4.等待或监听数据全部完成
如: future.getChannel().getCloseFuture().awaitUninterruptibly();

5.释放连接等资源
 bootstrap.releaseExternalResources();
 
Server:
1.配置server

ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));
                       
2.设置pipeFactory
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new EchoServerHandler());
            }
        });
    或者
   bootstrap.setPipelineFactory(new HttpServerPipelineFactory());
  
3.绑定sever端端口
bootstrap.bind(new InetSocketAddress(8080));

 

======================netty个人理解分割线======================================

 

netty是个非常好用的nio框架,提供了安全、快捷的tcp/ip、udp/ip通讯方式。网上也有好多文章,netty3.6的官方文档是这个http://netty.io/3.6/guide/,写得很好。

下面就读着这篇文档,做下笔记:

先看两段代码:

一:DiscardServer,这是个服务端的代码,是服务端的启动程序(创建channel,指定通道处理程序)

 

Java代码 
  1. public class DiscardServer {  
  2.   
  3.     private final int port;  
  4.   
  5.     public DiscardServer(int port) {  
  6.         this.port = port;  
  7.     }  
  8.   
  9.     public void run() {  
  10.         // Configure the server.  
  11.         ServerBootstrap bootstrap = new ServerBootstrap(  
  12.                 new NioServerSocketChannelFactory(  
  13.                         Executors.newCachedThreadPool(),  
  14.                         Executors.newCachedThreadPool()));  
  15.   
  16.         // Set up the pipeline factory.  
  17.         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
  18.             public ChannelPipeline getPipeline() throws Exception {  
  19.                 return Channels.pipeline(new DiscardServerHandler());  
  20.             }  
  21.         });  
  22.   
  23.         // Bind and start to accept incoming connections.  
  24.         bootstrap.bind(new InetSocketAddress(port));  
  25.     }  
  26.   
  27.     public static void main(String[] args) throws Exception {  
  28.         int port;  
  29.         if (args.length > 0) {  
  30.             port = Integer.parseInt(args[0]);  
  31.         } else {  
  32.             port = 8080;  
  33.         }  
  34.         new DiscardServer(port).run();  
  35.     }  
  36. }  

 

 

既然是通信,那当然有通道,Channel就是通道,通道是谁创建的呢,当然是ChannelFactory

代码中的NioServerSocketChannelFactory就是服务端的NIO的Socket的通道工厂,名字起得很直白吧。

创建这个工厂需要两个线程池——Executor(Executors.newCachedThreadPool()返回的ExecutorService,ExecutorService继承自Executor),这两个Executor一个用来执行boss线程,一个用来执行I/O工人线程。

工厂设置好之后, 就该创建Channel了,创建Channel是个复杂的过程,netty提供了一个帮助类ServerBootstrap,使用它可以便捷的执行这个创建过程。

有了通道后,当然会有相关的通讯处理,这些处理就是ChannelHandler(DiscardServerHandler就是自定义的handler,代码见二:DiscardServerHandler),ChannelHandler使用ChannelPipeline来设置(添加、移除、获取),ChannelPipelineFactory当然是用来创建ChannelPipeline的了。

最后,绑定个端口号,开始启动服务吧

二:DiscardServerHandler,通道处理程序

 

Java代码 
  1. public class DiscardServerHandler extends SimpleChannelUpstreamHandler {  
  2.   
  3.     private static final Logger logger = Logger.getLogger(  
  4.             DiscardServerHandler.class.getName());  
  5.   
  6.     private long transferredBytes;  
  7.   
  8.     public long getTransferredBytes() {  
  9.         return transferredBytes;  
  10.     }  
  11.   
  12.     @Override  
  13.     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {  
  14.         if (e instanceof ChannelStateEvent) {  
  15.             logger.info(e.toString());  
  16.         }  
  17.   
  18.         // Let SimpleChannelHandler call actual event handler methods below.  
  19.         super.handleUpstream(ctx, e);  
  20.     }  
  21.   
  22.     @Override  
  23.     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
  24.         // Discard received data silently by doing nothing.  
  25.        transferredBytes += ((ChannelBuffer) e.getMessage()).readableBytes();  
  26.     }  
  27.   
  28.     @Override  
  29.     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
  30.         // Close the connection when an exception is raised.  
  31.         logger.log(  
  32.                 Level.WARNING,  
  33.                 "Unexpected exception from downstream.",  
  34.                 e.getCause());  
  35.         e.getChannel().close();  
  36.     }  
  37. }  

 这个通道处理程序继承自SimpleChannelUpstreamHandler,最终实现了ChannelHandler,netty是基于事件驱动的,所以DiscardServerHandler实现的方法中都会有相关的事件

 

messageReceived:当消息到达时执行

exceptionCaught:当发生异常时执行

messageReceived有两个参数,一个是ChannelHandlerContext,一个是MessageEvent,messageEvent就是消息事件,从消息事件中可以取出消息(getMessage())

服务端的代码搞定了,下面上客户端的代码,客户端的代码跟服务端的大体一致:

一:DiscardClient

Java代码 
  1. public class DiscardClient {  
  2.   
  3.     private final String host;  
  4.     private final int port;  
  5.     private final int firstMessageSize;  
  6.   
  7.     public DiscardClient(String host, int port, int firstMessageSize) {  
  8.         this.host = host;  
  9.         this.port = port;  
  10.         this.firstMessageSize = firstMessageSize;  
  11.     }  
  12.   
  13.     public void run() {  
  14.         // Configure the client.  
  15.         ClientBootstrap bootstrap = new ClientBootstrap(  
  16.                 new NioClientSocketChannelFactory(  
  17.                         Executors.newCachedThreadPool(),  
  18.                         Executors.newCachedThreadPool()));  
  19.   
  20.         // Set up the pipeline factory.  
  21.         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
  22.             public ChannelPipeline getPipeline() throws Exception {  
  23.                 return Channels.pipeline(  
  24.                         new DiscardClientHandler(firstMessageSize));  
  25.             }  
  26.         });  
  27.   
  28.         // Start the connection attempt.  
  29.         ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));  
  30.   
  31.         // Wait until the connection is closed or the connection attempt fails.  
  32.         future.getChannel().getCloseFuture().awaitUninterruptibly();  
  33.   
  34.         // Shut down thread pools to exit.  
  35.         bootstrap.releaseExternalResources();  
  36.     }  
  37.   
  38.     public static void main(String[] args) throws Exception {  
  39.         // Print usage if no argument is specified.  
  40.         if (args.length < 2 || args.length > 3) {  
  41.             System.err.println(  
  42.                     "Usage: " + DiscardClient.class.getSimpleName() +  
  43.                     " <host> <port> [<first message size>]");  
  44.             return;  
  45.         }  
  46.   
  47.         // Parse options.  
  48.         final String host = args[0];  
  49.         final int port = Integer.parseInt(args[1]);  
  50.         final int firstMessageSize;  
  51.         if (args.length == 3) {  
  52.             firstMessageSize = Integer.parseInt(args[2]);  
  53.         } else {  
  54.             firstMessageSize = 256;  
  55.         }  
  56.   
  57.         new DiscardClient(host, port, firstMessageSize).run();  
  58.     }  
  59. }  

 二:

Java代码 
  1. public class DiscardClientHandler extends SimpleChannelUpstreamHandler {  
  2.   
  3.     private static final Logger logger = Logger.getLogger(  
  4.             DiscardClientHandler.class.getName());  
  5.   
  6.     private long transferredBytes;  
  7.     private final byte[] content;  
  8.   
  9.     public DiscardClientHandler(int messageSize) {  
  10.         if (messageSize <= 0) {  
  11.             throw new IllegalArgumentException(  
  12.                     "messageSize: " + messageSize);  
  13.         }  
  14.         content = new byte[messageSize];  
  15.     }  
  16.   
  17.     public long getTransferredBytes() {  
  18.         return transferredBytes;  
  19.     }  
  20.   
  21.     @Override  
  22.     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {  
  23.         if (e instanceof ChannelStateEvent) {  
  24.             if (((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {  
  25.                 logger.info(e.toString());  
  26.             }  
  27.         }  
  28.   
  29.         // Let SimpleChannelHandler call actual event handler methods below.  
  30.         super.handleUpstream(ctx, e);  
  31.     }  
  32.   
  33.     @Override  
  34.     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {  
  35.         // Send the initial messages.  
  36.         generateTraffic(e);  
  37.     }  
  38.   
  39.     @Override  
  40.     public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {  
  41.         // Keep sending messages whenever the current socket buffer has room.  
  42.         generateTraffic(e);  
  43.     }  
  44.   
  45.     @Override  
  46.     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
  47.         // Server is supposed to send nothing.  Therefore, do nothing.  
  48.     }  
  49.   
  50.     @Override  
  51.     public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) {  
  52.         transferredBytes += e.getWrittenAmount();  
  53.     }  
  54.   
  55.     @Override  
  56.     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
  57.         // Close the connection when an exception is raised.  
  58.         logger.log(  
  59.                 Level.WARNING,  
  60.                 "Unexpected exception from downstream.",  
  61.                 e.getCause());  
  62.         e.getChannel().close();  
  63.     }  
  64.   
  65.     private void generateTraffic(ChannelStateEvent e) {  
  66.         // Keep generating traffic until the channel is unwritable.  
  67.         // A channel becomes unwritable when its internal buffer is full.  
  68.         // If you keep writing messages ignoring this property,  
  69.         // you will end up with an OutOfMemoryError.  
  70.         Channel channel = e.getChannel();  
  71.         while (channel.isWritable()) {  
  72.             ChannelBuffer m = nextMessage();  
  73.             if (m == null) {  
  74.                 break;  
  75.             }  
  76.             channel.write(m);  
  77.         }  
  78.     }  
  79.   
  80.     private ChannelBuffer nextMessage() {  
  81.         return ChannelBuffers.wrappedBuffer(content);  
  82.     }  
  83. }  

 

 

分享到:
评论
1 楼 aerfaguihua 2016-02-24  
请问楼主 netty编写的客户端能否同步监听接收服务器传来的数据?

相关推荐

    netty简单实例

    在本文中,我们将深入探讨 Netty 的基础知识,并通过提供的"ChatNetty"示例,学习如何构建一个简单的聊天应用。这个示例是针对初学者的,旨在帮助理解 Netty 的基本工作原理和用法。 首先,让我们了解 Netty 的核心...

    Netty4.0 官网例子(免费)

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,专为 Java 设计。...Netty4.0 是其一个重要版本,引入了诸多改进和新特性...你可以从 netty-4.0 压缩包中的源代码开始,逐步分析和实践,从而提升你的网络编程技能。

    Netty工程完整的源码.rar

    十二个实例带你轻松掌握Netty 江成军 Netty开发网络通信系统的流程和步骤 Netty如何处理连接、状态监测、超时、心跳、断线重连等 Netty的各种解码器,并利用解码器处理复杂繁琐的粘包/拆包 Netty集成第三方编码解码...

    netty源码深入分析

    《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...

    netty源码分析教程视频

    一个netty的入门教程以及源码分析视频,适合刚学习的人

    netty淘宝代码分析

    在本文中,我们将深入探讨Netty在淘宝网综合业务平台中的应用,以及相关的代码分析。 首先,Netty的核心设计原则是基于Reactor模式,这种模式在处理高并发网络连接时具有显著优势。它通过一个或多个线程来处理I/O...

    netty http client & server

    而在 "netty-http-client" 文件中,包含了一个简单的 HTTP 客户端示例,演示了如何向服务器发送请求并接收响应。 总的来说,Netty 提供了强大而灵活的工具来构建 HTTP 客户端和服务器,无论是对于学习网络编程,...

    netty基础说明-结构分析-网关了解

    java-netty图,分析选择器 java-netty图,分析IO java-netty图,分析管道 java-netty图,分析缓存

    MINA2与Netty4比较分析

    接下来将根据标题和描述的要求详细分析Mina2与Netty4的区别,重点从它们的线程模型、Buffer使用以及Netty4中集成的序列化工具ProtoBuf等方面进行比较。 首先,Mina2和Netty4都是异步事件驱动的网络应用框架。Netty4...

    多线程并发编程在Netty的应用分析

    《多线程并发编程在Netty中的应用分析》 在当今高性能网络编程领域,Netty以其高效、灵活和易用的特性,成为了许多系统开发的首选框架。本篇内容将深入探讨多线程并发编程在Netty中的核心应用,帮助读者理解如何...

    Netty简单应答程序

    这个压缩包中的"Netty简单应答程序"示例,旨在展示如何使用Netty构建一个基本的通信系统,其中客户端能够向服务器发送消息,并接收到服务器的回应。 首先,我们要理解Netty的基本组件。Netty的核心是其Channel...

    netty简单聊天室

    在“netty简单聊天室”这个项目中,Netty被用来构建一个简单的验证聊天工具,允许用户通过客户端进行实时通信。下面将详细阐述Netty在聊天室中的应用及其相关知识点。 1. **Netty架构**: Netty的核心是它的...

    netty简易时间和操作系统查询服务器源码

    这个“netty简易时间和操作系统查询服务器源码”项目,旨在为初学者提供一个基础的Netty应用实例,帮助理解如何利用Netty进行网络通信,以及如何获取服务器时间及操作系统信息。 首先,让我们深入了解一下Netty。...

    用netty实现文件传输

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何利用 Netty 4.0.23 版本来实现文件的传输,这包括客户端和服务器端的交互过程。...

    netty简单 的demo很好理解

    这个“netty简单的demo很好理解”的例子,很可能是为了展示Netty的基本用法,帮助初学者理解其核心概念。我们将通过以下几个方面来深入探讨这个Demo: 1. **异步编程模型**: Netty 使用了Java NIO(非阻塞I/O)...

    netty 技术分析

    高性能应用框架 ,netty分析

    Netty源码分析总结.rar

    这个"Netty源码分析总结.rar"压缩包文件,从其标题和描述来看,很可能是针对Netty的源码分析课程或者笔记的集合,其中包含了对Netty核心机制的深入探讨。尽管提供的子文件名是"课时34:Netty源码分析总结下期预告....

    Netty4.x源码分析详解

    在深入分析 Netty 4.x 源码之前,我们首先需要了解其核心概念和架构。 Netty 的核心组件包括: 1. **ByteBuf**: 作为传统 ByteBuffer 的替代品,ByteBuf 提供了更高效且易用的内存管理机制,支持读写分离,避免了...

Global site tag (gtag.js) - Google Analytics