`
jilong-liang
  • 浏览: 485077 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类

netty的入门经典例子的使用

阅读更多


/*
 * Copyright 2009 Red Hat, Inc.
 *
 * Red Hat licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package org.jboss.netty.channel.socket.http;

import static org.jboss.netty.channel.Channels.*;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.NotYetConnectedException;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;

/**
 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
 * @author Andy Taylor (andy.taylor@jboss.org)
 * @author <a href="http://gleamynode.net/">Trustin Lee</a>
 * @version $Rev: 2285 $, $Date: 2010-05-27 21:02:49 +0900 (Thu, 27 May 2010) $
 */
class HttpTunnelingClientSocketChannel extends AbstractChannel
        implements org.jboss.netty.channel.socket.SocketChannel {

    final HttpTunnelingSocketChannelConfig config;

    volatile boolean requestHeaderWritten;

    final Object interestOpsLock = new Object();

    final SocketChannel realChannel;

    private final HttpTunnelingClientSocketChannel.ServletChannelHandler handler = new ServletChannelHandler();

    HttpTunnelingClientSocketChannel(
            ChannelFactory factory,
            ChannelPipeline pipeline,
            ChannelSink sink, ClientSocketChannelFactory clientSocketChannelFactory) {

        super(null, factory, pipeline, sink);

        config = new HttpTunnelingSocketChannelConfig(this);
        DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
        channelPipeline.addLast("decoder", new HttpResponseDecoder());
        channelPipeline.addLast("encoder", new HttpRequestEncoder());
        channelPipeline.addLast("handler", handler);
        realChannel = clientSocketChannelFactory.newChannel(channelPipeline);

        fireChannelOpen(this);
    }

    public HttpTunnelingSocketChannelConfig getConfig() {
        return config;
    }

    public InetSocketAddress getLocalAddress() {
        return realChannel.getLocalAddress();
    }

    public InetSocketAddress getRemoteAddress() {
        return realChannel.getRemoteAddress();
    }

    public boolean isBound() {
        return realChannel.isBound();
    }

    public boolean isConnected() {
        return realChannel.isConnected();
    }

    @Override
    public int getInterestOps() {
        return realChannel.getInterestOps();
    }

    @Override
    public boolean isWritable() {
        return realChannel.isWritable();
    }

    @Override
    protected boolean setClosed() {
        return super.setClosed();
    }

    @Override
    public ChannelFuture write(Object message, SocketAddress remoteAddress) {
        if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
            return super.write(message, null);
        }
        else {
            return getUnsupportedOperationFuture();
        }
    }

    void bindReal(final SocketAddress localAddress, final ChannelFuture future) {
        realChannel.bind(localAddress).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) {
                if (f.isSuccess()) {
                    future.setSuccess();
                } else {
                    future.setFailure(f.getCause());
                }
            }
        });
    }

    void connectReal(final SocketAddress remoteAddress, final ChannelFuture future) {
        final SocketChannel virtualChannel = this;
        realChannel.connect(remoteAddress).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) {
                final String serverName = config.getServerName();
                final int serverPort = ((InetSocketAddress) remoteAddress).getPort();
                final String serverPath = config.getServerPath();

                if (f.isSuccess()) {
                    // Configure SSL
                    SSLContext sslContext = config.getSslContext();
                    ChannelFuture sslHandshakeFuture = null;
                    if (sslContext != null) {
                        // Create a new SSLEngine from the specified SSLContext.
                        SSLEngine engine;
                        if (serverName != null) {
                            engine = sslContext.createSSLEngine(serverName, serverPort);
                        } else {
                            engine = sslContext.createSSLEngine();
                        }

                        // Configure the SSLEngine.
                        engine.setUseClientMode(true);
                        engine.setEnableSessionCreation(config.isEnableSslSessionCreation());
                        String[] enabledCipherSuites = config.getEnabledSslCipherSuites();
                        if (enabledCipherSuites != null) {
                            engine.setEnabledCipherSuites(enabledCipherSuites);
                        }
                        String[] enabledProtocols = config.getEnabledSslProtocols();
                        if (enabledProtocols != null) {
                            engine.setEnabledProtocols(enabledProtocols);
                        }

                        SslHandler sslHandler = new SslHandler(engine);
                        realChannel.getPipeline().addFirst("ssl", sslHandler);
                        sslHandshakeFuture = sslHandler.handshake();
                    }

                    // Send the HTTP request.
                    final HttpRequest req = new DefaultHttpRequest(
                            HttpVersion.HTTP_1_1, HttpMethod.POST, serverPath);
                    if (serverName != null) {
                        req.setHeader(HttpHeaders.Names.HOST, serverName);
                    }
                    req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
                    req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
                    req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
                    req.setHeader(HttpHeaders.Names.USER_AGENT, HttpTunnelingClientSocketChannel.class.getName());

                    if (sslHandshakeFuture == null) {
                        realChannel.write(req);
                        requestHeaderWritten = true;
                        future.setSuccess();
                        fireChannelConnected(virtualChannel, remoteAddress);
                    } else {
                        sslHandshakeFuture.addListener(new ChannelFutureListener() {
                            public void operationComplete(ChannelFuture f) {
                                if (f.isSuccess()) {
                                    realChannel.write(req);
                                    requestHeaderWritten = true;
                                    future.setSuccess();
                                    fireChannelConnected(virtualChannel, remoteAddress);
                                } else {
                                    future.setFailure(f.getCause());
                                    fireExceptionCaught(virtualChannel, f.getCause());
                                }
                            }
                        });
                    }
                } else {
                    future.setFailure(f.getCause());
                    fireExceptionCaught(virtualChannel, f.getCause());
                }
            }
        });
    }

    void writeReal(final ChannelBuffer a, final ChannelFuture future) {
        if (!requestHeaderWritten) {
            throw new NotYetConnectedException();
        }

        final int size = a.readableBytes();
        final ChannelFuture f;

        if (size == 0) {
            f = realChannel.write(ChannelBuffers.EMPTY_BUFFER);
        } else {
            f = realChannel.write(new DefaultHttpChunk(a));
        }

        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) {
                if (f.isSuccess()) {
                    future.setSuccess();
                    if (size != 0) {
                        fireWriteComplete(HttpTunnelingClientSocketChannel.this, size);
                    }
                } else {
                    future.setFailure(f.getCause());
                }
            }
        });
    }

    private ChannelFuture writeLastChunk() {
        if (!requestHeaderWritten) {
            return failedFuture(this, new NotYetConnectedException());
        } else {
            return realChannel.write(HttpChunk.LAST_CHUNK);
        }
    }

    void setInterestOpsReal(final int interestOps, final ChannelFuture future) {
        realChannel.setInterestOps(interestOps).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) {
                if (f.isSuccess()) {
                    future.setSuccess();
                } else {
                    future.setFailure(f.getCause());
                }
            }
        });
    }

    void disconnectReal(final ChannelFuture future) {
        writeLastChunk().addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) {
                realChannel.disconnect().addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture f) {
                        if (f.isSuccess()) {
                            future.setSuccess();
                        } else {
                            future.setFailure(f.getCause());
                        }
                    }
                });
            }
        });
    }

    void unbindReal(final ChannelFuture future) {
        writeLastChunk().addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) {
                realChannel.unbind().addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture f) {
                        if (f.isSuccess()) {
                            future.setSuccess();
                        } else {
                            future.setFailure(f.getCause());
                        }
                    }
                });
            }
        });
    }

    void closeReal(final ChannelFuture future) {
        writeLastChunk().addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture f) {
                realChannel.close().addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture f) {
                        // Note: If 'future' refers to the closeFuture,
                        // setSuccess() and setFailure() do nothing.
                        // AbstractChannel.setClosed() should be called instead.
                        // (See AbstractChannel.ChannelCloseFuture)

                        if (f.isSuccess()) {
                            future.setSuccess();
                        } else {
                            future.setFailure(f.getCause());
                        }

                        // Notify the closeFuture.
                        setClosed();
                    }
                });
            }
        });
    }

    final class ServletChannelHandler extends SimpleChannelUpstreamHandler {

        private volatile boolean readingChunks;
        final SocketChannel virtualChannel = HttpTunnelingClientSocketChannel.this;

        @Override
        public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e)
                throws Exception {
            fireChannelBound(virtualChannel, (SocketAddress) e.getValue());
        }

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            if (!readingChunks) {
                HttpResponse res = (HttpResponse) e.getMessage();
                if (res.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
                    throw new ChannelException("Unexpected HTTP response status: " + res.getStatus());
                }

                if (res.isChunked()) {
                    readingChunks = true;
                } else {
                    ChannelBuffer content = res.getContent();
                    if (content.readable()) {
                        fireMessageReceived(HttpTunnelingClientSocketChannel.this, content);
                    }
                    // Reached to the end of response - close the request.
                    closeReal(succeededFuture(virtualChannel));
                }
            } else {
                HttpChunk chunk = (HttpChunk) e.getMessage();
                if (!chunk.isLast()) {
                    fireMessageReceived(HttpTunnelingClientSocketChannel.this, chunk.getContent());
                } else {
                    readingChunks = false;
                    // Reached to the end of response - close the request.
                    closeReal(succeededFuture(virtualChannel));
                }
            }
        }

        @Override
        public void channelInterestChanged(ChannelHandlerContext ctx,
                ChannelStateEvent e) throws Exception {
            fireChannelInterestChanged(virtualChannel);
        }

        @Override
        public void channelDisconnected(ChannelHandlerContext ctx,
                ChannelStateEvent e) throws Exception {
            fireChannelDisconnected(virtualChannel);
        }

        @Override
        public void channelUnbound(ChannelHandlerContext ctx,
                ChannelStateEvent e) throws Exception {
            fireChannelUnbound(virtualChannel);
        }

        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
                throws Exception {
            fireChannelClosed(virtualChannel);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            fireExceptionCaught(virtualChannel, e.getCause());
            realChannel.close();
        }
    }
}

 

 
源代码



 



 

 

  • 大小: 89.3 KB
  • 大小: 67 KB
  • 大小: 89.3 KB
  • 大小: 71 KB
3
0
分享到:
评论
2 楼 三里小龙 2013-03-30  
谢啦
1 楼 liyunhe9566 2012-09-25  
不错,挺好,看看资料

相关推荐

    Netty的入门经典例子

    "Netty 入门经典例子" Netty 是一个异步的、事件驱动的网络编程框架和工具,使用 Netty 可以快速开发出可维护的、high-performance 的协议服务及其客户端应用。Netty 相当简化和流线化了网络应用的编程开发过程,...

    Netty 入门与实战:仿写微信 IM 即时通讯系统.zip

    《Netty 入门与实战:仿写微信 IM 即时通讯系统》是一本深入浅出的教程,旨在帮助读者掌握使用Netty构建高效、稳定、高性能的即时通讯系统的方法。通过模仿微信IM的实现,本书将理论知识与实践案例相结合,使读者...

    netty入门案例.zip

    这个“netty入门案例.zip”文件提供了一个简单的 Netty 应用示例,旨在帮助初学者快速理解并掌握 Netty 的基本概念和使用方法。下面将详细介绍这个入门案例中的关键知识点。 首先,Netty 的核心是其 ChannelHandler...

    netty入门例子--(不是翻译官方文档)

    这个“netty入门例子”旨在帮助初学者理解Netty的基本用法和特性,而不是简单地翻译官方文档,它提供了几个开发模板,以便于深入理解Netty中的消息异步通信机制和TCP通信的封装。 首先,Netty的核心是它的异步模型...

    netty+protobuf入门案例

    在"Netty+Protobuf入门案例"中,我们可以学习到如何结合这两个强大的工具进行高效的数据交换。首先,理解 Protobuf 的基本概念至关重要。 Protobuf 提供了一种定义数据结构的语言,通过编译器将这些结构转换为各种...

    Netty网络编程视频教程

    近百节视频详细讲解,需要的小伙伴自行百度网盘下载,... Netty 入门 1. 概述 2. Hello World 3. 组件 4. 双向通信 三. Netty 进阶 1. 粘包与半包 2. 协议设计与解析 3. 聊天室案例 四. 优化与源码 1. 优化 2. 源码分析

    netty原理及例子

    本篇文章将深入探讨Netty的原理,并通过实例帮助你快速入门和提高。 1. **Netty的基本概念** - **NIO(Non-blocking I/O)**:Netty是基于Java NIO构建的,NIO允许单线程处理多个通道,减少了线程创建和销毁的开销...

    netty案例,netty4.1基础入门篇十《关于ChannelOutboundHandlerAdapter简单使用》源码

    netty案例,netty4.1基础入门篇十《关于ChannelOutboundHandlerAdapter简单使用》源码 ...

    Netty5入门3个简单例子

    Netty基于非阻塞I/O模型,使用Java NIO(Non-blocking Input/Output)库,这使得它在处理大量并发连接时表现出色。它的设计目标是提供一种易于使用的API,使开发者能够快速构建出稳定且高效的网络应用。 第一个例子...

    Netty实践学习案例

    1. **基础入门**:这些示例可能会涵盖如何创建一个简单的 Netty 服务器和客户端,展示如何设置 ChannelPipeline,添加处理器(Handler),以及如何进行数据的读写操作。 2. **编解码器的使用**:示例可能涉及自定义...

    netty案例,netty4.1基础入门篇六《NettyServer群发消息》源码

    netty案例,netty4.1基础入门篇六《NettyServer群发消息》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724778&idx=1&sn=72e4b1ea5323475b16e99c6720c7069d&scene=19#wechat_redirect

    netty入门与实战-netty-learn.zip

    这个“netty入门与实战-netty-learn.zip”文件包含了学习 Netty 的资源,帮助初学者理解并掌握这个强大的网络库。Netty-learn-master 可能是一个包含源代码、教程文档或者示例项目的目录。 Netty 的核心概念: 1. ...

    netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》源码

    netty案例,netty4.1基础入门篇十一《netty udp通信方式案例Demo》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724927&idx=1&sn=a16bc8e98d6a27816da0896adcc83778&scene=19#wechat_redirect

    掘金-Netty 入门与实战-netty-demo.zip

    这个压缩包“掘金-Netty 入门与实战-netty-demo.zip”包含了关于 Netty 的入门教程和实战示例,特别是通过 "netty-demo-pipeline-channelHandler" 这个文件名我们可以推测,它可能包含了一个展示 Netty ...

    Netty3.5代码例子

    通过实践Netty 3.5的入门实例,你可以学习如何设置服务器、建立连接、定义消息处理流程,以及如何优雅地处理异常。这些基础将帮助你理解Netty的工作原理,为后续学习更高级的功能打下坚实基础。在实践中,你可能会...

    netty+protobuf入门案例.

    这个入门案例展示了如何使用Netty处理网络连接,并通过Protobuf进行高效的数据交换。实际项目中,你可能需要处理更复杂的协议,例如心跳检查、错误处理、多消息类型的解析等。但这个基础已经足够帮助你理解这两项...

    netty案例,netty4.1基础入门篇十二《简单实现一个Netty的Http服务》源码

    netty案例,netty4.1基础入门篇十二《简单实现一个Netty的Http服务》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724932&idx=1&sn=eb1631ddbd0e7a0dcb3a655374631f48&scene=19#wechat_redirect

    netty案例,netty4.1基础入门篇七《嗨!NettyClient》源码

    netty案例,netty4.1基础入门篇七《嗨!NettyClient》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724783&idx=1&sn=bc827e680ebd533fe67720fd695257be&scene=19#wechat_redirect

    netty案例.zip

    这个"netty案例.zip"压缩包提供了一个简单的Netty入门示例,帮助初学者理解如何使用Netty进行网络编程。下面将详细解释这个案例中的核心知识点。 1. **Netty基础架构**: Netty采用了Reactor模式,它是一种处理...

Global site tag (gtag.js) - Google Analytics