Netty 是JBoss旗下的io传输的框架,他利用java里面的nio来实现高效,稳定的io传输。
作为io传输,就会有client和server,下面我们看看用netty怎样写client和server
Client:
需要做的事情:
1.配置client启动类
ClientBootstrap bootstrap = new ClientBootstrap(..)
2.根据不同的协议或者模式为client启动类设置pipelineFactory。
这里telnet pipline Factory 在netty中已经存在,所有直接用
bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
也可以自己定义
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new DiscardClientHandler(firstMessageSize));
}
});
这里DiscardClientHandler 就是自己定义的handler,他需要
public class DiscardServerHandler extends SimpleChannelUpstreamHandler
继承SimpleChannelUpstreamHandler 来实现自己的handler。这里DiscardClientHandler
是处理自己的client端的channel,他的
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Server is supposed to send nothing. Therefore, do nothing.
}
可以看到Discard client不需要接受任何信息
3.连接server
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
这里解释一下channelFuture:
在Netty中所有的io操作都是异步的,这也就是意味任何io访问,那么就立即返回处理,并且不能确保
返回的数据全部完成。因此就出现了channelFuture,channelFuture在传输数据时候包括数据和状态两个
部分。他只有Uncompleted和Completed
+---------------------------+ | Completed successfully | +---------------------------+ +----> isDone() = true | +--------------------------+ | | isSuccess() = true | | Uncompleted | | +===========================+ +--------------------------+ | | Completed with failure | | isDone() = false | | +---------------------------+ | isSuccess() = false |----+----> isDone() = true | | isCancelled() = false | | | getCause() = non-null | | getCause() = null | | +===========================+ +--------------------------+ | | Completed by cancellation | | +---------------------------+ +----> isDone() = true | | isCancelled() = true | +---------------------------+
既然netty io是异步的,那么如何知道channel传送完成有两种方式,一种添加监听器
addListener(ChannelFutureListener) 还有一种直接调用await()方法,这两种方式
有下面的区别
监听器:是以事件模式的,因此代码就需要用事件模式的样式去写,相当复杂,但他是non-blocking模式的
性能方面要比await方法好,而且不会产生死锁情况
await(): 直接方法调用,使用简单,但是他是blocking模式,性能方面要弱而且会产生死锁情况
不要在ChannelHandler 里面调用await(),这是因为通常在channelHandler里的event method是被i/o线程调用的
(除非ChannelPipeline里面有个ExecutionHandler),那么如果这个时候用await就容易产生死锁。
// BAD - NEVER DO THIS@Override
public void messageReceived(ChannelHandlerContext
ctx,MessageEvent
e) { if (e.getMessage() instanceof GoodByeMessage) {ChannelFuture
future = e.getChannel().close(); future.awaitUninterruptibly(); // Perform post-closure operation // ... } } // GOOD@Override
public void messageReceived(ChannelHandlerContext
ctx,MessageEvent
e) { if (e.getMessage() instanceof GoodByeMessage) {ChannelFuture
future = e.getChannel().close(); future.addListener(newChannelFutureListener
() { public void operationComplete(ChannelFuture
future) { // Perform post-closure operation // ... } }); } }
虽然await调用比较危险,但是你确保不是在一个i/o 线程中调用该方法,毕竟await方法还是很简洁方便的,如果
调用该方法是在一个i/o 线程,那么就会抛出 IllegalStateException
await的timeout和i/o timeout区别
需要注意的是这两个timeout是不一样的, #await(long),#await(long, TimeUnit), #awaitUninterruptibly(long),
#awaitUninterruptibly(long, TimeUnit) 这里面的timeout也i/o timeout 没有任何关系,如果io timeout,那么
channelFuture 将被标记为completed with failure,而await的timeout 与future完全没有关系,只是await动作的
timeout。
// BAD - NEVER DO THISClientBootstrap
b = ...;ChannelFuture
f = b.connect(...); f.awaitUninterruptibly(10, TimeUnit.SECONDS); if (f.isCancelled()) { // Connection attempt cancelled by user } else if (!f.isSuccess()) { // You might get a NullPointerException here because the future // might not be completed yet. f.getCause().printStackTrace(); } else { // Connection established successfully } // GOODClientBootstrap
b = ...; // Configure the connect timeout option. b.setOption("connectTimeoutMillis", 10000);ChannelFuture
f = b.connect(...); f.awaitUninterruptibly(); // Now we are sure the future is completed. assert f.isDone(); if (f.isCancelled()) { // Connection attempt cancelled by user } else if (!f.isSuccess()) { f.getCause().printStackTrace(); } else { // Connection established successfully }
4.等待或监听数据全部完成
如: future.getChannel().getCloseFuture().awaitUninterruptibly();
5.释放连接等资源
bootstrap.releaseExternalResources();
Server:
1.配置server
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
2.设置pipeFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new EchoServerHandler());
}
});
或者
bootstrap.setPipelineFactory(new HttpServerPipelineFactory());
3.绑定sever端端口
bootstrap.bind(new InetSocketAddress(8080));
======================netty个人理解分割线======================================
netty是个非常好用的nio框架,提供了安全、快捷的tcp/ip、udp/ip通讯方式。网上也有好多文章,netty3.6的官方文档是这个http://netty.io/3.6/guide/,写得很好。
下面就读着这篇文档,做下笔记:
先看两段代码:
一:DiscardServer,这是个服务端的代码,是服务端的启动程序(创建channel,指定通道处理程序)
- public class DiscardServer {
- private final int port;
- public DiscardServer(int port) {
- this.port = port;
- }
- public void run() {
- // Configure the server.
- ServerBootstrap bootstrap = new ServerBootstrap(
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(new DiscardServerHandler());
- }
- });
- // Bind and start to accept incoming connections.
- bootstrap.bind(new InetSocketAddress(port));
- }
- public static void main(String[] args) throws Exception {
- int port;
- if (args.length > 0) {
- port = Integer.parseInt(args[0]);
- } else {
- port = 8080;
- }
- new DiscardServer(port).run();
- }
- }
既然是通信,那当然有通道,Channel就是通道,通道是谁创建的呢,当然是ChannelFactory
代码中的NioServerSocketChannelFactory就是服务端的NIO的Socket的通道工厂,名字起得很直白吧。
创建这个工厂需要两个线程池——Executor(Executors.newCachedThreadPool()返回的ExecutorService,ExecutorService继承自Executor),这两个Executor一个用来执行boss线程,一个用来执行I/O工人线程。
工厂设置好之后, 就该创建Channel了,创建Channel是个复杂的过程,netty提供了一个帮助类ServerBootstrap,使用它可以便捷的执行这个创建过程。
有了通道后,当然会有相关的通讯处理,这些处理就是ChannelHandler(DiscardServerHandler就是自定义的handler,代码见二:DiscardServerHandler),ChannelHandler使用ChannelPipeline来设置(添加、移除、获取),ChannelPipelineFactory当然是用来创建ChannelPipeline的了。
最后,绑定个端口号,开始启动服务吧
二:DiscardServerHandler,通道处理程序
- public class DiscardServerHandler extends SimpleChannelUpstreamHandler {
- private static final Logger logger = Logger.getLogger(
- DiscardServerHandler.class.getName());
- private long transferredBytes;
- public long getTransferredBytes() {
- return transferredBytes;
- }
- @Override
- public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
- if (e instanceof ChannelStateEvent) {
- logger.info(e.toString());
- }
- // Let SimpleChannelHandler call actual event handler methods below.
- super.handleUpstream(ctx, e);
- }
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- // Discard received data silently by doing nothing.
- transferredBytes += ((ChannelBuffer) e.getMessage()).readableBytes();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- // Close the connection when an exception is raised.
- logger.log(
- Level.WARNING,
- "Unexpected exception from downstream.",
- e.getCause());
- e.getChannel().close();
- }
- }
这个通道处理程序继承自SimpleChannelUpstreamHandler,最终实现了ChannelHandler,netty是基于事件驱动的,所以DiscardServerHandler实现的方法中都会有相关的事件
messageReceived:当消息到达时执行
exceptionCaught:当发生异常时执行
messageReceived有两个参数,一个是ChannelHandlerContext,一个是MessageEvent,messageEvent就是消息事件,从消息事件中可以取出消息(getMessage())
服务端的代码搞定了,下面上客户端的代码,客户端的代码跟服务端的大体一致:
一:DiscardClient
- public class DiscardClient {
- private final String host;
- private final int port;
- private final int firstMessageSize;
- public DiscardClient(String host, int port, int firstMessageSize) {
- this.host = host;
- this.port = port;
- this.firstMessageSize = firstMessageSize;
- }
- public void run() {
- // Configure the client.
- ClientBootstrap bootstrap = new ClientBootstrap(
- new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(
- new DiscardClientHandler(firstMessageSize));
- }
- });
- // Start the connection attempt.
- ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
- // Wait until the connection is closed or the connection attempt fails.
- future.getChannel().getCloseFuture().awaitUninterruptibly();
- // Shut down thread pools to exit.
- bootstrap.releaseExternalResources();
- }
- public static void main(String[] args) throws Exception {
- // Print usage if no argument is specified.
- if (args.length < 2 || args.length > 3) {
- System.err.println(
- "Usage: " + DiscardClient.class.getSimpleName() +
- " <host> <port> [<first message size>]");
- return;
- }
- // Parse options.
- final String host = args[0];
- final int port = Integer.parseInt(args[1]);
- final int firstMessageSize;
- if (args.length == 3) {
- firstMessageSize = Integer.parseInt(args[2]);
- } else {
- firstMessageSize = 256;
- }
- new DiscardClient(host, port, firstMessageSize).run();
- }
- }
二:
- public class DiscardClientHandler extends SimpleChannelUpstreamHandler {
- private static final Logger logger = Logger.getLogger(
- DiscardClientHandler.class.getName());
- private long transferredBytes;
- private final byte[] content;
- public DiscardClientHandler(int messageSize) {
- if (messageSize <= 0) {
- throw new IllegalArgumentException(
- "messageSize: " + messageSize);
- }
- content = new byte[messageSize];
- }
- public long getTransferredBytes() {
- return transferredBytes;
- }
- @Override
- public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
- if (e instanceof ChannelStateEvent) {
- if (((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
- logger.info(e.toString());
- }
- }
- // Let SimpleChannelHandler call actual event handler methods below.
- super.handleUpstream(ctx, e);
- }
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- // Send the initial messages.
- generateTraffic(e);
- }
- @Override
- public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
- // Keep sending messages whenever the current socket buffer has room.
- generateTraffic(e);
- }
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- // Server is supposed to send nothing. Therefore, do nothing.
- }
- @Override
- public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) {
- transferredBytes += e.getWrittenAmount();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- // Close the connection when an exception is raised.
- logger.log(
- Level.WARNING,
- "Unexpected exception from downstream.",
- e.getCause());
- e.getChannel().close();
- }
- private void generateTraffic(ChannelStateEvent e) {
- // Keep generating traffic until the channel is unwritable.
- // A channel becomes unwritable when its internal buffer is full.
- // If you keep writing messages ignoring this property,
- // you will end up with an OutOfMemoryError.
- Channel channel = e.getChannel();
- while (channel.isWritable()) {
- ChannelBuffer m = nextMessage();
- if (m == null) {
- break;
- }
- channel.write(m);
- }
- }
- private ChannelBuffer nextMessage() {
- return ChannelBuffers.wrappedBuffer(content);
- }
- }
相关推荐
在本文中,我们将深入探讨 Netty 的基础知识,并通过提供的"ChatNetty"示例,学习如何构建一个简单的聊天应用。这个示例是针对初学者的,旨在帮助理解 Netty 的基本工作原理和用法。 首先,让我们了解 Netty 的核心...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,专为 Java 设计。...Netty4.0 是其一个重要版本,引入了诸多改进和新特性...你可以从 netty-4.0 压缩包中的源代码开始,逐步分析和实践,从而提升你的网络编程技能。
十二个实例带你轻松掌握Netty 江成军 Netty开发网络通信系统的流程和步骤 Netty如何处理连接、状态监测、超时、心跳、断线重连等 Netty的各种解码器,并利用解码器处理复杂繁琐的粘包/拆包 Netty集成第三方编码解码...
《Netty源码深入分析》是由美团基础架构部的闪电侠老师所分享的一系列关于Netty源码解析的视频教程。以下将根据标题、描述、标签以及部分内容等信息,对Netty及其源码进行深入剖析。 ### Netty简介 Netty是基于...
一个netty的入门教程以及源码分析视频,适合刚学习的人
在本文中,我们将深入探讨Netty在淘宝网综合业务平台中的应用,以及相关的代码分析。 首先,Netty的核心设计原则是基于Reactor模式,这种模式在处理高并发网络连接时具有显著优势。它通过一个或多个线程来处理I/O...
而在 "netty-http-client" 文件中,包含了一个简单的 HTTP 客户端示例,演示了如何向服务器发送请求并接收响应。 总的来说,Netty 提供了强大而灵活的工具来构建 HTTP 客户端和服务器,无论是对于学习网络编程,...
java-netty图,分析选择器 java-netty图,分析IO java-netty图,分析管道 java-netty图,分析缓存
接下来将根据标题和描述的要求详细分析Mina2与Netty4的区别,重点从它们的线程模型、Buffer使用以及Netty4中集成的序列化工具ProtoBuf等方面进行比较。 首先,Mina2和Netty4都是异步事件驱动的网络应用框架。Netty4...
《多线程并发编程在Netty中的应用分析》 在当今高性能网络编程领域,Netty以其高效、灵活和易用的特性,成为了许多系统开发的首选框架。本篇内容将深入探讨多线程并发编程在Netty中的核心应用,帮助读者理解如何...
这个压缩包中的"Netty简单应答程序"示例,旨在展示如何使用Netty构建一个基本的通信系统,其中客户端能够向服务器发送消息,并接收到服务器的回应。 首先,我们要理解Netty的基本组件。Netty的核心是其Channel...
在“netty简单聊天室”这个项目中,Netty被用来构建一个简单的验证聊天工具,允许用户通过客户端进行实时通信。下面将详细阐述Netty在聊天室中的应用及其相关知识点。 1. **Netty架构**: Netty的核心是它的...
这个“netty简易时间和操作系统查询服务器源码”项目,旨在为初学者提供一个基础的Netty应用实例,帮助理解如何利用Netty进行网络通信,以及如何获取服务器时间及操作系统信息。 首先,让我们深入了解一下Netty。...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何利用 Netty 4.0.23 版本来实现文件的传输,这包括客户端和服务器端的交互过程。...
这个“netty简单的demo很好理解”的例子,很可能是为了展示Netty的基本用法,帮助初学者理解其核心概念。我们将通过以下几个方面来深入探讨这个Demo: 1. **异步编程模型**: Netty 使用了Java NIO(非阻塞I/O)...
高性能应用框架 ,netty分析
这个"Netty源码分析总结.rar"压缩包文件,从其标题和描述来看,很可能是针对Netty的源码分析课程或者笔记的集合,其中包含了对Netty核心机制的深入探讨。尽管提供的子文件名是"课时34:Netty源码分析总结下期预告....
在深入分析 Netty 4.x 源码之前,我们首先需要了解其核心概念和架构。 Netty 的核心组件包括: 1. **ByteBuf**: 作为传统 ByteBuffer 的替代品,ByteBuf 提供了更高效且易用的内存管理机制,支持读写分离,避免了...