Netty
Netty是一个网络通信框架。主要由事件模型、统一通信API、IO读写缓冲区、 Transport、协议支持五个部分组成。
编写一个好的网络程序没那么容易。且不说涉及到复杂的网络协议栈(TCP、UDP等)、网卡以及socket等。就一个Socket/IO模型就值得去深入研究,这还不包括操作系统内核层面的东西。编写网络程序首先得理解socket编程模型,编写过网络程序的程序员应该有这个体会。
先从一个简单的Netty网络程序开始
public static void main(String[] args) throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup parentGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup();
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoChannelHandler());
}
});
// Start the server.
ChannelFuture f = bootstrap.bind(8007);
}
private static class EchoChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
这是一个简单的ECHO服务器程序。使用Netty编写服务器程序首先从一个ServerBootstrap引导开始,接着设置了两个EventLoopGroup,用于处理Channel产生的事件以及IO操作,第一个是parentGroup,第二个是childGroup,分别用于处理来自客户端的连接(读操作)以及与客户端连接的读写操作。
接着调用channel方法指定用于建立与客户端连接的连接通道类NioServerSocketChannel。
然后调用childHandler方法,通过ChannelInitializer初始化和客户端连接的读写通道,主要初始化一些ChannelHandler。
最后调用bind方法绑定到本地的指定端口,这里会初始化并注册与客户端连接的连接通道,也就是之前设置的NioServerSocketChannel。
这个程序比较简单,不过刚开始接触的话,可能不太好理解。而且如果对网络编程不太熟的话,可能更不容易理解了,得先去学习下网络程序的编写。
Netty
Netty
Netty核心组成
事件模型
统一通信API
IO读写缓冲区
Transport
协议支持
Netty核心组成
Netty核心部分包括事件模型、统一通信API、IO读写缓冲区三部分。
事件模型
Netty被设计为事件驱动的,它的事件模型基于拦截器链模式。Netty的事件驱动是可扩展的。
Netty
Netty的版本变化挺大的,升级也很快,兼容性是一个问题。尤其是Netty3到Netty4,完全是全新的一套实现。
Bootstrap
类似一个引导程序,用来引导(初始化)Channel。
Netty3针对客户端、服务器端以及无连接的网络程序提供了ClientBootstrap、ServerBootstrap以及ConnectionlessBootstrap三种实现。
写道
* A helper class which initializes a {@link Channel}. This class provides
* the common data structure for its subclasses which actually initialize
* {@link Channel}s and their child {@link Channel}s using the common data
* structure. Please refer to {@link ClientBootstrap}, {@link ServerBootstrap},
* and {@link ConnectionlessBootstrap} for client side, server-side, and
* connectionless (e.g. UDP) channel initialization respectively.
ClientBootstrap
ConnectionlessBootstrap
ServerBootstrap
在Netty4中,Bootstrap主要包含EventLoop分组(EventLoopGroup)、Channel工厂,Channel处理、本地监听地址(绑定的IP地址和端口)、以及一些可设置的选项和属性。如果是服务器端Bootstrap,同时也包含这些信息。
Netty4定义了一个抽象的AbstractBootstrap
写道
* {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
* method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
不再提供ConnectionlessBootstrap,只提供了2个实现:Bootstrap和ServerBootstrap,分别用于客户端和服务器端。ServerBootstrap同时支持无连接的网络程序。
Netty4的Bootstrap对应一个BootstrapConfig,对应的一个抽象类AbstractBootstrapConfig。针对客户端和服务器端,分别有一个BootstrapConfig和ServerBootstrapConfig。
对于ServerBootstrap,bind是一个很重要的方法。这个方法将创建一个Channel,并在socket层面绑定到本地的一个端口上。
bind方法将调用一个很重要的方法initAndRegister,这个方法将创建一个Channel,并初始化这个Channel。创建Channel的时候,通过一个Channel工厂来创建这个Channel。这个Channel工厂是在调用channel方法的时候根据指定的Channel类创建的一个ReflectiveChannelFactory(反射Channel工厂)。
Channel创建后,调用init方法初始化Channel,在这个方法中,调用了对应的ChannelPipeline的addLast方法,通过ChannelInitializer来对Channel进行初始化。ChannelInitializer的initChannel方法实现,在对应的ChannelPipeline的过滤器链表末尾添加了一个重要的ChannelHandler(ServerBootstrapAcceptor),这个Handler在有连接到来的时候触发回调channelRead方法,获取和客户端连接通信的Channel,之后向这个Channel对应的ChannelPipeline的过滤器链表末尾添加对应的ChannelHandler。并设置Channel相应的选项和属性。并将这个Channel注册到childGroup中。
Channel
Channel是Netty中最重要也是最核心的一个组件。比如我们基于TCP或者UDP进行通信就是常见的一个Channel,可以简单的理解为Channel就是一种通信的信道。
根据底层Socket或者IO是否阻塞式,Channel也可以是阻塞式或者非阻塞式。比如OioServerSocketChannel就是阻塞式,NioServerSocketChannel就是非阻塞式,EpollServerSocketChannel可以是阻塞式,也可以是阻塞式,这个主要用到了Linux下的EPoll机制。
ChannelPipeline
这是一个拦截式过滤器实现。
写道
* A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a
* {@link Channel}. {@link ChannelPipeline} implements an advanced form of the
* <a href="http://www.oracle.com/technetwork/java/interceptingfilter-142169.html">Intercepting Filter</a> pattern
* to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline
* interact with each other.
关于拦截式过滤器可以看Java文档:
写道
https://www.oracle.com/java/technologies/intercepting-filter.html
ChannelPipeline将一组ChannelHandler串联以处理Channel产生的事件或者IO操作。
ChannelPipeline是一个接口,里边定义的方法比较简单,主要包括Pipeline的ChannelHandler添删改查操作方法、事件触发方法以及IO写入刷新(flush)三类。
Pipeline的ChannelHandler添删改查操作方法
addFirst
addLast
addBefore
addAfter
remove
removeFirst
removeLast
replace
first
firstContext
last
lastContext
get
context
channel
names
toMap
事件触发方法
fireChannelRegistered
fireChannelUnregistered
fireChannelActive
fireChannelInactive
fireExceptionCaught
fireUserEventTriggered
fireChannelRead
fireChannelReadComplete
fireChannelWritabilityChanged
刷新
flush
在创建Channel的时候,同时会创建一个对应的ChannelPipeline,参考AbstractChannel代码。
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
ChannelPipeline接口就只有两个,DefaultChannelPipeline和EmbeddedChannelPipeline,我们能用到的只有DefaultChannelPipeline,EmbeddedChannelPipeline用不到。所以,主要的实现代码在DefaultChannelPipeline。
DefaultChannelPipeline实际上是维护了一个ChannelHandlerContext(AbstractChannelHandlerContext)链表,这是一个双链表设计。我们向Pipeline中添加注册ChannelHandler的时候实际上是一个ChannelHandlerContext。根据这个链表顺序的ChannelHandler去处理Channel产生的事件或者IO操作。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在DefaultChannelPipeline中还有一个对应Channel的引用。
在向Pipeline中添加注册ChannelHandler的时候,会创建一个DefaultChannelHandlerContext实例,并添加注册进去。
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
在调用ChannelPipeline中的方法(addLast等)
ChannelHandler
Channel事件或者IO操作处理器。这是一个接口。
不过里边只定义了几个ChannelHandler在添加或删除以及出错时的触发回调方法。具体的读、写等事件处理的触发回调在ChannelInboundHandler、ChannelOutboundHandler中。
ChannelInboundHandler
ChannelOutboundHandler
Netty
Channel
SocketChannel
NioSocketChannel
NioAcceptedSocketChannel
NioClientSocketChannel
OioSocketChannel
HttpTunnelingClientSocketChannel
ServerChannel
LocalChannel
DatagramChannel
Netty
ChannelFactory
ClientSocketChannelFactory
HttpTunnelingClientSocketChannelFactory
NioClientSocketChannelFactory
OioClientSocketChannelFactory
LocalClientChannelFactory
DefaultLocalClientChannelFactory
ServerChannelFactory
LocalServerChannelFactory
DefaultLocalServerChannelFactory
ServerSocketChannelFactory
NioServerSocketChannelFactory
OioServerSocketChannelFactory
DatagramChannelFactory
NioDatagramChannelFactory
OioDatagramChannelFactory
EmbeddedChannelFactory
Netty, http-message-converter, echo, http request
EchoServer:
public class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
} else {
sslCtx = null;
}
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = Channels.pipeline();
if (sslCtx != null) {
p.addLast("ssl", sslCtx.newHandler());
}
p.addLast("http-message-converter", new HttpRequestConverterHandler());
p.addLast("echo", new EchoHandler());
p.addLast("http", new HttpHandler());
return p;
}
});
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.receiveBufferSize", 1048576);
bootstrap.setOption("child.sendBufferSize", 1048576);
bootstrap.bind(new InetSocketAddress(PORT));
}
}
HttpRequestConverterHandler:
public class HttpRequestConverterHandler extends SimpleChannelUpstreamHandler {
protected StringBuilder httpMessage(Channel channel, String url0) throws MalformedURLException {
URL url = new URL(url0);
String host = url.getHost();
int port = url.getPort();
String path = url.getPath();
System.out.println("host: " + host + ", port: " + port + ", path: " + path);
channel.setAttachment(new HttpHost(host, port));
StringBuilder sb = new StringBuilder();
sb.append("GET " + path + " HTTP/1.1\r\n");
sb.append("Host:" + host + (port == -1 ? "" : String.valueOf(port)) + "\r\n");
sb.append("\r\n");
return sb;
}
/**
* pass a url as a message event
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
byte[] data = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), data);
String msg = new String(data);
System.out.println(msg);
StringBuilder sb = httpMessage(e.getChannel(), msg);
ChannelBufferFactory channelBufferFactory = HeapChannelBufferFactory.getInstance();
ChannelBuffer nb = channelBufferFactory.getBuffer(sb.length());
nb.writeBytes(sb.toString().getBytes("utf-8"));
ctx.sendUpstream(new UpstreamMessageEvent(e.getChannel(), nb, e.getRemoteAddress()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
// Close the connection when an exception is raised.
e.getCause().printStackTrace();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
bos.write("invalid url.".getBytes("utf-8"));
ChannelBufferFactory channelBufferFactory = HeapChannelBufferFactory.getInstance();
ChannelBuffer nb = channelBufferFactory.getBuffer(bos.size());
nb.writeBytes(bos.toByteArray());
e.getChannel().write(nb);
e.getChannel().close();
}
}
HttpHost:
public class HttpHost {
private String host;
private int port;
public HttpHost(String host) {
this(host, -1);
}
public HttpHost(String host, int port) {
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
}
EchoHandler:
public class EchoHandler extends SimpleChannelUpstreamHandler {
private final AtomicLong transferredBytes = new AtomicLong();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// Send back the received message to the remote peer.
transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
if (ctx.canHandleUpstream()) {
ctx.sendUpstream(new UpstreamMessageEvent(e.getChannel(), e.getMessage(), e.getRemoteAddress()));
} else {
e.getChannel().write(buf);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
// Close the connection when an exception is raised.
e.getCause().printStackTrace();
e.getChannel().close();
}
}
HttpHandler:
public class HttpHandler extends SimpleChannelUpstreamHandler {
public String execute(String host, int port, String msg) {
port = port == -1 ? 80 : port;
System.out.println("host: " + host);
System.out.println("port: " + port);
System.out.println("msg: " + msg);
try {
Socket socket = new Socket(host, port);
OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream());
writer.write(msg);
writer.flush();
InputStream is = socket.getInputStream();
StringBuilder sb = new StringBuilder();
byte[] b = new byte[512];
int nb = -1;
while ((nb = is.read(b)) != -1) {
sb.append(new String(b, 0, nb));
}
System.out.println(sb.toString());
return sb.toString();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return "";
}
/**
* pass a url as a message event
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
byte[] data = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), data);
String msg = new String(data);
HttpHost host = (HttpHost) e.getChannel().getAttachment();
String result = execute(host.getHost(), host.getPort(), msg);
byte[] bytes = result.getBytes("utf-8");
ChannelBufferFactory channelBufferFactory = HeapChannelBufferFactory.getInstance();
ChannelBuffer nb = channelBufferFactory.getBuffer(data.length + bytes.length);
nb.writeBytes(data);
nb.writeBytes(bytes);
e.getChannel().write(nb);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
// Close the connection when an exception is raised.
ByteArrayOutputStream bos = new ByteArrayOutputStream();
bos.write("invalid http request.".getBytes("utf-8"));
ChannelBufferFactory channelBufferFactory = HeapChannelBufferFactory.getInstance();
ChannelBuffer nb = channelBufferFactory.getBuffer(bos.size());
nb.writeBytes(bos.toByteArray());
e.getChannel().write(nb);
e.getChannel().close();
}
}
[c:\~]$ telnet localhost 8007
Host 'localhost' resolved to 127.0.0.1.
Connecting to 127.0.0.1:8007...
Connection established.
To escape to local shell, press 'Ctrl+Alt+]'.
invalid url.
Connection closed by foreign host.
Disconnected from remote host(localhost:8007) at 14:19:07.
Type `help' to learn how to use Xshell prompt.
[c:\~]$ telnet localhost 8007
Host 'localhost' resolved to 127.0.0.1.
Connecting to 127.0.0.1:8007...
Connection established.
To escape to local shell, press 'Ctrl+Alt+]'.
http://www.cnblogs.com/zhang-qiang/articles/2050885.html
GET /zhang-qiang/articles/2050885.html HTTP/1.1
Host:www.cnblogs.com
HTTP/1.1 200 OK
Date: Sun, 18 Sep 2016 07:15:25 GMT
Content-Type: text/html; charset=utf-8
Content-Length: 17071
Connection: keep-alive
Vary: Accept-Encoding
Cache-Control: private, max-age=10
Expires: Sun, 18 Sep 2016 07:15:33 GMT
Last-Modified: Sun, 18 Sep 2016 07:15:23 GMT
X-UA-Compatible: IE=10
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="zh-cn">
<head>
http://netty.io/3.8/guide
相关推荐
《Netty实战》是针对Java开发者的一本技术指南,它深入介绍了如何利用Netty这个高性能、异步事件驱动的网络应用程序框架来构建高效且可扩展的网络应用。Netty不仅简化了网络编程的复杂性,还提供了丰富的特性和组件...
Netty (netty-netty-5.0.0.Alpha2.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个压缩包包含 `netty-3.2.5.Final.jar` 和 `netty-3.2.5.Final-sources.jar` 两个文件,它们分别代表了...
《跟闪电侠学Netty:Netty即时聊天实战与底层原理》是一本深入浅出的Netty技术指南,旨在帮助读者掌握Netty框架,并利用它实现即时聊天应用,同时理解其底层工作原理。Netty是Java领域的一款高性能、异步事件驱动的...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“netty-netty-4.1.69.Final.tar.gz”文件是Netty的最新稳定版本,版本号为4.1.69.Final,它是一个压缩包...
Netty (netty-netty-3.10.6.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...
Netty (netty-netty-4.1.77.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"Netty-4.1.17.Final.jar"是Netty框架的一个发行版本,它适用于Java 8及以上版本,这意味着它可以充分...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在Java开发领域,Netty因其强大的功能和高效性能而备受推崇,广泛应用于分布式系统、云计算、游戏服务器等...
这个“netty官网学习手册中文版”针对的是Netty的3.1版本,虽然现在的Netty已经发展到了5.x版本,但3.1版本的知识仍然具有历史参考价值,特别是对于那些初次接触或需要理解Netty基础概念的开发者来说。 1. **Netty...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨 Netty 实时通讯的原理与应用,以及如何利用它构建 WebSocket 服务。 WebSocket 是...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个“最新Netty中文文档CHM版”为中国的开发者提供了一个方便的中文学习资源,解决了阅读英文原版文档时的...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨Netty 5.0官方提供的示例(demo),这些示例是学习和理解Netty核心概念与功能的重要...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Spring 是一个广泛使用的 Java 应用开发框架,尤其在企业级应用中非常流行,它提供了依赖注入、面向切面编程...
netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在这个“netty实现的聊天代码”中,我们可以深入理解如何使用 Netty 框架来构建简单的聊天应用。这个 demo ...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨 Netty 如何与 4G DTU 设备结合,以及如何构建基于 Java 的物联网(IoT)解决方案。...
本jar包为最新的netty-all-4.1.29c.jar 可导入直接用 Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架...
Netty入门教程文档 Netty是Java的网络编程框架,广泛应用于数据采集服务中,本文将对Netty的基本概念和应用进行详细介绍,并将其与ETL技术结合,讲解如何使用Netty进行数据流转和处理。 1. ETL概述 ETL(Extract...