`
cqupt123
  • 浏览: 66873 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

netty从入门到精通——实例篇(一)

阅读更多

 

  前面了解了netty的基本用法与几个核心概念,从本文开始会通过一些实例(主要参考源码example)来进一步学习netty的使用方法以及其中的原理。

  先来实现一个简单的功能:服务端如果接收到客户端的连接,则返回字符串”success”,客户端读到该消息打印出来。

  服务端的方法改写了第一篇blogNetty从入门到精通—入门篇中的handler,具体如下:

 

class MyChannelHandler extends SimpleChannelHandler {
	…
	…
	…
	@Override
	public void channelConnected(ChannelHandlerContext ctx,
			ChannelStateEvent e) throws Exception {
		System.out.println("Channel connected " + e);
		Channel ch = e.getChannel();
		ChannelBuffer cb = ChannelBuffers.wrappedBuffer("success".getBytes()) ;
		ch.write(cb);
		}
	…
	…
	…
}

   客户端的编码如下:

 

package com.netty.intr;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

public class NettyClient {
	final static String host = "127.0.0.1";
	final static int port = 8080;

	public static void main(String[] args) {
		Client client = new Client();
		client.config(host, port).start();
	}
}

class Client {
	ClientBootstrap bootstrap;
	ChannelHandler myHandler = new MyClientHandler();
	String host;
	int port;
	public Client() {
		bootstrap = new ClientBootstrap(
	            new NioClientSocketChannelFactory(
	                    Executors.newCachedThreadPool(),
	                    Executors.newCachedThreadPool()));
		
		 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
	         public ChannelPipeline getPipeline() throws Exception {
	             return Channels.pipeline(myHandler);
	         }
	     });
	}
	
	Client config(String host, int port) {
		this.host = host;
		this.port = port;
		bootstrap.setOption("remoteAddress", new InetSocketAddress(this.host, this.port));
		return this;
	}
	
	void start() {
		 bootstrap.connect();
	}
	
	class MyClientHandler extends SimpleChannelUpstreamHandler {
		@Override
		public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
				throws Exception {
			System.out.println("Client Channel closed " + e);
		}

		@Override
		public void channelConnected(ChannelHandlerContext ctx,
				ChannelStateEvent e) throws Exception {
			System.out.println("Client Channel connected " + e);
		}

		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
				throws Exception {
			try {
				ChannelBuffer buf = (ChannelBuffer) e.getMessage();
				byte[] bytes = buf.array();
				System.out.println("Client reseived message : " + new String(bytes));
			} catch (Exception ex) {
				ex.printStackTrace();
				throw ex;
			}
		}
	}
}

 

   依次启动NettyServerNettyClient

  Serverconsole打印:

 

Channel connected [id: 0xcb6626ea, /127.0.0.1:49503 => /127.0.0.1:8080] CONNECTED: /127.0.0.1:49503
  

  Clientconsole打印:

 

Client Channel connected [id: 0xc4c911b3, /127.0.0.1:49503 => /127.0.0.1:8080] CONNECTED: /127.0.0.1:8080
Client reseived message : success
  

  Server的处理器MyChannelHandler中,回写消息调用了Channel.write(Object)方法。

 

Channel ch = e.getChannel();
ChannelBuffer cb = ChannelBuffers.wrappedBuffer("success".getBytes()) ;
ch.write(cb);
 

 

  既然write方法的参数为Object,为什么这里不能直接传入字符串”success”呢,而是封装到ChannelBuffer类型的对象中,ChannelBuffer是做什么的。概念篇没有提到,而放在这里刚好合适。

  NIO中的Buffer

   前面提到过,在NIO中同样存在一个缓冲区,叫做ByteBuffer,来配合Channel的使用。在ByteBuffer内部存储数据的实质为一个字节数组,如:final byte[] hb,并定义了四个标记来管理它。其中包括:mark <= position <= limit <= capacity。其中capacity用来表示缓冲区的大小;position用来标识下一个可读取或者写入的位置;limit表示读取或者写入的上限位置,如果要在>=limit的位置做读写操作会抛出异常;mark用来记录当前position的值,记录之后position随着读写发生变化,在调用reset()方法时,会将position恢复为mark记录的值。在buffer中提供了很多putget方法来放入和读取数据,这里不多做介绍,可以查看API。但其中有几个重要的方法需要关注:

 

1.  flip()方法:在读取或者写入n个字节(position + n < limit)后,position += n。如果是先读取数据到buffer后写入到Channel,必须position的值回退到起初的值,并且将limit设置为有效位置,才能让读入的数据真正的写入Channel。调用flip()方法后,buffer中的四个标记会发生以下变化:

 

limit = position;
position = 0;
mark = -1;
 

2.  clear()方法:同样的,将buffer中的数据写入Channel后,再读取一些数据到buffer中,此时往往需要将各标记的值归位,当做一个新的buffer来使用(当然也有特殊情况)。调用clear()方法后,标记变化如下:

 

position = 0;
limit = capacity;
mark = -1;
 

3.  rewind()方法:如果发现刚才从Channel读取的数据需要重新读取,可以调用该方法。调用后标记变化如下:

 

position = 0;
mark = -1;

  尤其是flip()clear()方法,在使用的过程中会频繁用到,否则会造成读取和写入的错乱。

  ByteBuffer主要有两个继承的类分别是:HeapByteBufferMappedByteBuffer。他们的不同之处在于HeapByteBuffer会在JVM的堆上分配内存资源,而MappedByteBuffer的资源则会由JVM之外的操作系统内核来分配。DirectByteBuffer继承了MappedByteBuffer,采用了直接内存映射的方式,将文件直接映射到虚拟内存,同时减少在内核缓冲区和用户缓冲区之间的调用,尤其在处理大文件方面有很大的性能优势。但是在使用内存映射的时候会造成文件句柄一直被占用而无法删除的情况,网上也有很多介绍。 

 

  Netty中的Buffer

   Netty中使用ChannelBuffer来处理读写,之所以废弃ByteBuffer,官方说法是ChannelBuffer简单易用并且有性能方面的优势。在ChannelBuffer中使用ByteBuffer或者byte[]来存储数据。同样的,ChannelBuffer也提供了几个标记来控制读写并以此取代ByteBufferpositionlimit,分别是:

0 <= readerIndex <= writerIndex <= capacity,同时也有类似于markmarkedReaderIndexmarkedWriterIndex。当写入buffer时,writerIndex增加,从buffer中读取数据时readerIndex增加,而不能超过writerIndex。有了这两个变量后,就不用每次写入buffer后调用flip()方法,方便了很多。在ChannelBuffer中有几个重要的类继承,如下图:



   AbstractChannelBuffer中实现了基本的方法;HeapChannelBuffer是对NIOheapBuffer的封装,它有两个继承类:BigEndianHeapChannelBufferLittleEndianHeapChannelBuffer(试想,我们将一个int类型(32)的数据放入内存中,内存会以什么样的顺序放入这32位的数据呢?这就分为big-endianlittle-endian的字节序,big-endian就是说将数据的高位放在内存地址更小的位置,little-endian是将低位放在内存地址更小的位置,选择和所用硬件和操作系统相同的字节序有利于提高性能);ByteBufferBackedChannelBuffer是对NIOderectBuffer的封装;DynamicChannelBuffer继承于AbstractChannelBuffer,实现了buffer的自动扩容;CompositeChannelBuffer也是继承于AbstractChannelBuffer,抽象了操作多个buffer的情况,将多个buffer有序的放入数组中,通过计算找出要操作的buffer的下标,而不是将多个buffer复制到一个更大的buffer中;实现WrappedChannelBuffer接口的类主要是对buffer进行进一步的包装,一般由netty框架内部调用;ReplayingDecoderBuffer用于封装了解码时常有的处理,配合ReplayDecoder使用,后面会对编码解码做专门研究。

  ChannelBuffer往往由BufferFactory或者ChannelBuffers类来创建实例。

  回到本实例,netty在读取到客户端的msg时,根据用户配置的BufferFactory的不同会将消息封装成derectBuffer或者heapBuffer。所以,当我们在接收数据时,pipline中第一个upstreamHandler拿到的msg(e.getMessage())虽然是Object类型,但是肯定是这两种形式的buffer。那么,我们在写回数据的时候能不能直接使用其他类型呢,答案是:pipline中第一个downstreamHandler不能随意的放入其他对象,原因是piplinedownstream事件是从tail端往上执行的,所以第一个downstreamHandler调用的channel.write()或者channels.write()方法传入的object会直接传递给netty底层处理,而在netty的写入出口中,只接收两种类型的对象:ChannelBufferFileRegionFileRegion可用于0-copy的文件传输,一般情况下,应用程序向socket发送文件流时,操作系统需要先将文件的字节流存储到内核缓冲区(file_read_buffer),然后拷贝到用户缓冲区,在由用户缓冲区拷贝到内核缓冲区(socket_buffer)由协议引擎发送。这样会创建四个缓冲区,两次复制的过程。所谓零拷贝是指:内核通过DMA引擎,直接将file_read_buffer的数据copysocket_buffer中,而在后面的改进中废除了复制的操作,给socket_buffer增加了数据的位置和长度信息描述,直接将数据从file_read_buffer传递给协议引擎。在netty中只有选择NIO模型才能支持0-copy,当然JDK版本或者操作系统不支持也是不行的)。所以本实例先将字符串”success”方法放入到ChannelBuffer中,然后在调用write方法。

  多处理器

  再来看看Client端,在接收到数据后也需要从ChannelBuffer中取出字节,然后再转换成我们想要的String。当然ChannnelBuffer提供了很多如何取出一些基本类型数据的方法,在必要的时候可以使用。但是这样还是不太方便,我们能不能直接获得一个String对象呢?这里就可以使用前面介绍过的在pipline中添加一个handler专门用来解码,转换成我们所需要的String类型,再传递给MyClientHandler

  首先添加一个解码的handler:

 

class StringClientHandler extends SimpleChannelUpstreamHandler {
		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
				throws Exception {
			try {
				ChannelBuffer buf = (ChannelBuffer) e.getMessage();
				byte[] bytes = buf.array();
				System.out
						.println("Client reseived message and convert it to a String!");
				Channels.fireMessageReceived(e.getChannel(), new String(bytes));
			} catch (Exception ex) {
				ex.printStackTrace();
				throw ex;
			}
		}
}

   修改MyClientHandlermessageReceived方法,直接将收到的msg当做字符串来处理。

     class MyClientHandler extends SimpleChannelUpstreamHandler {

		…
		…
		…
		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
				throws Exception {
			try {
				String msg = (String) e.getMessage();
				System.out.println("Client reseived message : " + msg);
			} catch (Exception ex) {
				ex.printStackTrace();
				throw ex;
			}
		}
	}
 

   StringClientHandler放到pipline中的第一个位置:

  class Client {

	…
…
	StringClientHandler stringHandler = new StringClientHandler();
	public Client() {
		…
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(stringHandler, myHandler);
			}
		});
	}
…
…
}

   这样再次运行程序,客户端就会有这样的信息打印出来:

 

Client reseived message and convert it to a String!
Client reseived message : success
 

   StringClientHandler中首先处理了接收到的ChannelBuffer,然后转换成String,在通过方法Channels.fireMessageReceived(ctx, new String(bytes));向下传递消息。注意这个方法,如果没有调用,对于msg的处理就会到此为止,MyClientHandler将不会运行。Channels为我们提供了很多方法用于向上或者向下传递事件,对应于概念篇中讲到的各种事件。其中以fire开头方法用于upstream事件,而例如write这样的方法主要用于downstream事件。而这些方法往往是成对出现的,例如:fireChannelOpen(Channel channel)fireChannelOpen(ChannelHandlerContext ctx)等,由于这两个方法有不同的参数,造成了流程的不同,如果参数是Channel,则整个流程会从piplinehead upstreamHandler开始重新执行,如果参数是ChannelHandlerContext,则会直接执行下一个upstreamHandler。同样,write(Channel channel, Object message)write(ChannelHandlerContext ctx, ChannelFuture future, Object message)等方法,出了多了一个future外,和前面的两个方法相同,流程也会相同,如果没有传入ChannelFuture,则会在方法中的第一步中创建一个future来支持netty的异步事件处理机制。

  所以,如果我们将Channels.fireMessageReceived(ctx, new String(bytes));改为Channels.fireMessageReceived(e.getChannel(), new String(bytes));则会出现下面的异常:

 

java.lang.ClassCastException: java.lang.String cannot be cast to org.jboss.netty.buffer.ChannelBuffer

   这是因为在第一次接收到数据后,将msg转换为String类了,并调用Channels.fireMessageReceived(e.getChannel(), new String(bytes));,通知pipline从头upstreamHandler开始执行,即执行StringClientHandler.messageReseived,此时msg已经是String类型了,所以转换成ChannelBuffer就会报错。如果在应用程序中没有打印异常信息,而我们应用的日志级别在WARN以上的话,我们将看不到异常信息,所以最好的方式是在handler中实现exceptionCaught方法来处理异常。

 

 

  • 大小: 50.8 KB
分享到:
评论
2 楼 ykdsg 2014-09-09  
楼上正解,看了下源码才知道Channels.fireMessageReceived(e.getChannel(), new String(bytes));  是从头开始执行的
1 楼 jacky2014 2014-03-14  
Channels.fireMessageReceived(e.getChannel(), new String(bytes));
应该改成
Channels.fireMessageReceived(ctx, str); 
否则还是传回StringClientHandler处理

相关推荐

    netty入门到精通.txt

    根据提供的文件信息“netty入门到精通”,我们可以深入探讨Netty框架的相关知识点,包括其基本概念、核心组件、应用场景以及如何逐步掌握这项技术。 ### Netty框架简介 Netty是一款高性能、异步事件驱动的网络应用...

    netty从入门到精通所有代码

    这个“Netty 从入门到精通所有代码”压缩包包含了一系列的示例代码,帮助开发者逐步理解并掌握 Netty 的核心概念和实际应用。 1. **Netty 基本概念** - **NIO (Non-blocking I/O)**:Netty 是基于 Java NIO 构建的...

    Netty 框架学习 —— 第一个 Netty 应用(csdn)————程序.pdf

    在本篇关于“Netty框架学习——第一个Netty应用”的文章中,我们将深入理解如何使用Netty构建一个简单的Echo服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用程序框架,广泛应用于Java领域的服务器开发。...

    Netty入门教程文档

    Netty入门教程文档 Netty是Java的网络编程框架,广泛应用于数据采集服务中,本文将对Netty的基本概念和应用进行详细介绍,并将其与ETL技术结合,讲解如何使用Netty进行数据流转和处理。 1. ETL概述 ETL(Extract...

    Netty全套学习资源(包括源码、笔记、学习文档等)

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它是 Java 平台上的一个开源项目,广泛应用于分布式系统、云计算平台、游戏服务器、聊天应用以及大数据传输等...

    读书笔记:《Netty实战》源代码——Scala版.zip

    读书笔记:《Netty实战》源代码——Scala版

    Netty-In-Action中文版.pdf

    《Netty in Action》是一本深入探讨Netty框架的中文指南,它为读者提供了全面了解和熟练使用Netty所需的知识。Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...

    Netty 快速入门系列-源码

    Netty快速入门系列源码, 参考 https://blog.csdn.net/netcobol Netty是一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 Netty是...

    Netty UDP协议网络打洞实例

    Netty UDP协议网络打洞实例是利用Netty框架在UDP(User Datagram Protocol)协议基础上实现的一种穿透NAT(Network Address Translation)的技术。NAT技术在现代互联网中广泛使用,它允许内部网络中的设备共享一个...

    netty电子书

    Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个基于NIO的客户,服务器端编程框架,...

    netty案例,netty4.1基础入门篇六《NettyServer群发消息》源码

    netty案例,netty4.1基础入门篇六《NettyServer群发消息》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724778&idx=1&sn=72e4b1ea5323475b16e99c6720c7069d&scene=19#wechat_redirect

    高性能网络通信框架Netty从入门到核心源码剖析.rar

    本资源《高性能网络通信框架Netty从入门到核心源码剖析》将带领读者逐步了解并深入掌握Netty的使用和设计原理。 一、Netty入门 1. 安装与环境配置:介绍如何在Java项目中引入Netty依赖,以及相关的构建工具如Maven...

    Netty+入门与实战:仿写微信+IM+即时通讯系统.rar

    《Netty+入门与实战:仿写微信+IM+即时通讯系统》是一本专注于使用Netty框架构建即时通讯系统的教程。Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发服务器和客户端的Java应用。它极大地简化了网络编程...

    高性能网络通信框架 Netty从入门到核心源码剖析

    高性能网络通信框架_Netty从入门到核心源码剖析

    netty快速入门教程3-4集 共12集

    总的来说,这12集的Netty快速入门教程将带你从基础到进阶,全面理解Netty框架的核心组件和用法。通过学习客户端的构建和Netty线程模型,你将具备使用Netty开发高效、可靠网络应用的能力。不论是构建TCP、UDP服务,...

    netty从入门到进阶的基础文档

    这个基础文档将带你从入门到进阶,深入理解Netty的核心概念和应用。 1. **入门理解** - **什么是Netty?** Netty是由JBOSS提供的一个Java开源框架,主要用于开发高并发、低延迟的网络应用,如TCP、UDP、HTTP、...

    Netty-入门Netty编码

    同样,我们需要定义一个 ChannelHandlerContext 的处理器,例如 TimeClientHandler,用来处理从服务器接收到的响应。 在 Netty 中,数据传输通常涉及 ByteBuf,这是 Netty 提供的高效字节缓冲区。ByteBuf 可以动态...

    Netty权威指南 PDF电子书下载 带目录书签 完整版

    《Netty权威指南》是一本深入探讨Netty框架的详细教程,旨在帮助读者全面理解并掌握这个高性能、异步事件驱动的网络应用框架。Netty是Java领域中用于开发网络应用的首选工具,广泛应用于分布式系统、云计算、游戏...

    Netty从入门到实战,深入Netty底层原理,结合实战,打造百万级连接的IM通讯软件。-netty_study.zip

    Netty从入门到实战,深入Netty底层原理,结合实战,打造百万级连接的IM通讯软件。-netty_study

    读书笔记:Netty从入门到实战深入Netty底层原理结合实战打造百万级连接的IM通讯软件。.zip

    读书笔记:Netty从入门到实战深入Netty底层原理结合实战打造百万级连接的IM通讯软件。

Global site tag (gtag.js) - Google Analytics