`
yx200404
  • 浏览: 78552 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

使用Netty构建一个多线程服务器与客户端(一)

阅读更多
发此篇博客的目的是,如果有网络通讯方面的大牛看到,希望能给与一些指导.

我相信很多人都能用netty开发出自己的客户端和服务器,但是,此服务器和客户端的可重用性有多高呢?我一直想弄个性能还算不错,然后其他人在此架构上做简单的命令处理即可.

开发这个服务器和客户端的原因是正是如此,也可以说是一个简单的网络平台.让其具备一定的2次开发功能.

以下代码只是一个初步是设想,有兴趣的朋友我们可以一起讨论讨论.

如果你完全不了解netty,请先自己学习一下.^_^,因为我也是菜鸟,无法解释那么多的类是干什么的.^_^

关于netty的下载和其他个jar请自行下载

首先是服务器的初步实现.因为Netty是基于事件的,再加上其无阻塞的特性.我们必须要牢记:

数据发送后,代码不会被阻塞,而是顺序运行,也就是说,做了一件事件后,这件事情不一定已经成功,所以我们不能在下一行代码中百分百的确定其已经发送到了对方,因此,你会发行其很多方法都会返回一个"Future".只要注意到这一点,Netty的使用难度就不是很大了.

(一)handler处理篇
首先,是handler,初次接触netty的朋友要注意,handler不是一个单例.即每个channel下都会有自己的一个handler实例.

public class ServerHandler extends SimpleChannelUpstreamHandler {

    private static final Logger logger = Logger.getLogger(
            ServerHandler.class.getName());
    private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
    private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();

    @Override
    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
            throws Exception {
        if (e instanceof ChannelStateEvent) {
            logger.log(Level.INFO, "Channel state changed: {0}", e);
        }
        super.handleUpstream(ctx, e);
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception {
        System.out.println(this);
        String request = (String) e.getMessage();
        //如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
        //服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
        if (request.toLowerCase().equals("bye")) {
            ChannelFuture future = e.getChannel().write("bye\r\n");
            future.addListener(ChannelFutureListener.CLOSE);
        } else {
            //以下是我初步解析客户端发送过来的数据,然后决定处理方式
            RecevieData receivedaData = MessageDecoder.decode(request);
            if (null != receivedaData) {
                //服务器第5版
                if (VersionCode.V5.equals(receivedaData.getVersion())) {
                    //然后判断命令是否存在
                    for (String s : CommandCode.COMMANDS) {
                        if (s.equals(receivedaData.getActionType())) {
                            COMMAND_FLAG.set(true);
                            if (s.equals(CommandCode.KEEP_ALIVE)) {
                                serverChannelGroup.addChannel(e.getChannel());
                            }
                            break;
                        } else {
                            COMMAND_FLAG.set(false);
                        }
                    }
                    if (COMMAND_FLAG.get()) {
                        COMMAND_FLAG.set(false);
                        //将这个命令传递给下一个handler来处理.
                        //这里的"下一个handler"即为用户自己定义的处理handler
                        ctx.sendUpstream(e);
                    } else {
                        e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
                    }
                } else {
                    //版本错误
                    e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
                }
            } else {
                //如果格式错误,那么直接返回
                e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
            }
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
            throws Exception {
        logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
                e.getCause());
        e.getChannel().close();
        ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
    }
}


在上面这个handler中,我使用了ctx.sendUpstream(e);来处理,个人觉得此处为了要实现执行运行时代码,也可以使用接口等方式.但既然netty提供了sendUpstream 的方法,我们用这个岂不是更方便^_^

下面是使用SSL连接的handler
public class ServerSSLHandler extends SimpleChannelUpstreamHandler {

    private static final Logger logger = Logger.getLogger(
            ServerSSLHandler.class.getName());
    private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
    private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();

    @Override
    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
            throws Exception {
        if (e instanceof ChannelStateEvent) {
            logger.log(Level.INFO, "Channel state changed: {0}", e);
        }
        super.handleUpstream(ctx, e);
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        //ssl握手
        SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
        sslHandler.handshake();
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception {
        System.out.println(this);
        String request = (String) e.getMessage();
        //如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
        //服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
        if (request.toLowerCase().equals("bye")) {
            ChannelFuture future = e.getChannel().write("bye\r\n");
            future.addListener(ChannelFutureListener.CLOSE);
        } else {
            //以下是我初步解析客户端发送过来的数据,然后决定处理方式
            RecevieData receivedaData = MessageDecoder.decode(request);
            if (null != receivedaData) {
                //服务器第5版
                if (VersionCode.V5.equals(receivedaData.getVersion())) {
                    //然后判断命令是否存在
                    for (String s : CommandCode.COMMANDS) {
                        if (s.equals(receivedaData.getActionType())) {
                            COMMAND_FLAG.set(true);
                            if (s.equals(CommandCode.KEEP_ALIVE)) {
                                serverChannelGroup.addChannel(e.getChannel());
                            }
                            break;
                        } else {
                            COMMAND_FLAG.set(false);
                        }
                    }
                    if (COMMAND_FLAG.get()) {
                        COMMAND_FLAG.set(false);
                        //将这个命令传递给下一个handler来处理.
                        //这里的"下一个handler"即为用户自己定义的处理handler
                        ctx.sendUpstream(e);
                    } else {
                        e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
                    }
                } else {
                    //版本错误
                    e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
                }
            } else {
                //如果格式错误,那么直接返回
                e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
            }
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
            throws Exception {
        logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
                e.getCause());
        e.getChannel().close();
        ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
    }
}


关于SSL连接需要用到的一些其他东西,稍后在介绍

当我们有了2个handler后,当然就是要把他们添加到我们的Pipeline中
public class ServerPipelineFactory implements
        ChannelPipelineFactory {

    public ChannelPipeline getPipeline() {
        ChannelPipeline pipeline = pipeline();
        ServerConfig config = ServerConfig.getInstance();
        try {
            if (config.ssl()) {
                SSLEngine engine =
                        SecureSslContextFactory.getServerContext().createSSLEngine();
                //说明是服务器端SslContext
                engine.setUseClientMode(false);
                pipeline.addLast("ssl", new SslHandler(engine));
            }

            //此Decoder可以自动解析一句以\r\n结束的命令,我为了方便,也用了这个Decoder
            //使用这个Decoder,我不用刻意发送命令长度用于解析,只要没有收到\r\n说明数据还
            //没有发送完毕.这个Decoder会等到收到\r\n后调用下个handler
            pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
                    8192, Delimiters.lineDelimiter()));
            //字串解码,可以自己设置charset
            pipeline.addLast("decoder", new StringDecoder());
            //字串编码,可以自己设置charset
            pipeline.addLast("encoder", new StringEncoder());

            if (config.ssl()) {
                //如果开启了SSL,那么使用sslhandler
                pipeline.addLast("sslhandler", new ServerSSLHandler());
            } else {
                //如果没有开启SSL,那么使用普通handler
                pipeline.addLast("handler", new ServerHandler());
            }
            //遍历配置文件中的服务器handler,将其添加进Pipeline链中
            for (Element e : config.handler()) {
                pipeline.addLast(e.attribute(e.getQName("id")).getValue().trim(),
                        (ChannelHandler) Class.forName(e.attribute(e.getQName("class")).getValue().trim()).newInstance());
            }
        } catch (DocumentException ex) {
            Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
        } catch (InstantiationException ex) {
            Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
        } catch (IllegalAccessException ex) {
            Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
        } catch (ClassNotFoundException ex) {
            Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
        }
        return pipeline;
    }
}


下面是xml处理类
public class ServerConfig {

    private static final String HOST = "host";
    private static final String PORT = "port";
    private static final String HANDLER = "handler";
    private static final String CLIENTHANDLER = "clienthandler";
    private static final String SSL = "ssl";
    private static final ServerConfig SERVER_CONFIG = new ServerConfig();
    private static final String XML_PATH = "lib/server.xml";
    private static final SAXReader SAR_READER = new SAXReader();

    private ServerConfig() {
        super();
    }

    public String host() throws DocumentException {
        return this.rootElement().element(HOST).getTextTrim().trim();
    }

    public int port() throws DocumentException {
        return Integer.parseInt(this.rootElement().element(PORT).getTextTrim().trim());
    }

    public boolean ssl() throws DocumentException {
        return Integer.parseInt(this.rootElement().element(SSL).getTextTrim().trim()) == 1 ? true : false;
    }

    public List<Element> handler() throws DocumentException {
        return this.rootElement().elements(HANDLER);
    }

     public List<Element> clienthandler() throws DocumentException {
        return this.rootElement().elements(CLIENTHANDLER);
    }

    private Element rootElement() throws DocumentException {
        return SAR_READER.read(new File(XML_PATH)).getRootElement();
    }

    public static ServerConfig getInstance() {
        return SERVER_CONFIG;
    }
}


server.xml,放到lib下即可,注意其中的handler 以及clienthandler 项,如果你新建了自己的handler,那么需要在此xml中配置一下.
<?xml version="1.0" encoding="UTF-8"?>

<root>
    <!-- 配置主机地址 -->
    <host>127.0.0.1</host>
    <!-- 配置服务端口 -->
    <port>8080</port>
    <!-- 是否启用ssl,1为启用,0为停用 -->
    <ssl>0</ssl>

    <!--服务器业务handler -->
    <handler id="timeHandler" class="com.chinatenet.nio.server.handler.ServerTimeHandler" />
    
    <!--客户端业务handler -->
    <clienthandler id="timeHandler" class="com.chinatenet.nio.client.handler.ClientTimeHandler" />
</root>


到此,一个简单的可扩展handler的服务器雏形就出来了

下面,我们添加一个自定义的服务器处理handler进来
public class ServerTimeHandler extends SimpleChannelUpstreamHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        RecevieData receivedaData = MessageDecoder.decode((String) e.getMessage());
        if (CommandCode.GET_TIME.equals(receivedaData.getActionType())
                || CommandCode.KEEP_ALIVE.equals(receivedaData.getActionType())) {
            if (VersionCode.V5.equals(receivedaData.getVersion())) {
                //回复客户端后,即可进行自己的业务.当然.这里可以根据需要,看
                //是先回复再处理还是等处理结果出来后,将结果返回客户端
                e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.OK,
                        System.currentTimeMillis() / 1000 + ""));
            } else {
                //版本错误
                e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED,
                        StatusCode.VERSION_NOT_SUPPORTED_TXET));
            }
        } else {
            //如果不是此handler处理的命令,那么流下去
            ctx.sendUpstream(e);
        }
    }
    
}


最后测试一下
public class Server {

    public static void main(String[] args) throws DocumentException {
        ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool()));

        bootstrap.setPipelineFactory(new ServerPipelineFactory());

        //因为我要用到长连接
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        
        ServerConfig config = ServerConfig.getInstance();
        bootstrap.bind(new InetSocketAddress(Integer.valueOf(config.port())));
    }
}


总结:在整个服务器编码中,刚开始会遇到"远程主机强迫关闭了一个现有的连接。"等这类错误,最后修改成"相互告知对方我要关闭了"再进行关闭就可以了.

最后再完善一下异常处理即可.

http://www.nptpark.com

5
2
分享到:
评论
3 楼 gufeng_java 2013-08-25  
哈哈,最近刚刚需要使用Netty开发东西。
多谢楼主的博文给我提供了思路!
多谢多谢!!!
2 楼 wj_126mail 2012-12-13  
引用
总结:在整个服务器编码中,刚开始会遇到"远程主机强迫关闭了一个现有的连接。"等这类错误,最后修改成"相互告知对方我要关闭了"再进行关闭就可以了.


请问,我也遇到这个问题了,怎么改成"相互告知对方我要关闭了"再进行关闭",谢谢!
1 楼 xlx 2012-09-29  
求源码。。。

相关推荐

    使用Netty4实现多线程的消息分发

    在本文中,我们将深入探讨如何利用 Netty 4 实现多线程的消息分发,这对于构建分布式系统、游戏服务器或者任何需要高效处理并发连接的应用尤其重要。 一、Netty 框架简介 Netty 是由 JBoss 提供的一个开源项目,它...

    netty中的多线程应用

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在 Netty 中,多线程的应用是其处理高并发、高性能的关键因素之一。下面我们将深入探讨 Netty 中的多线程并发...

    Netty通讯框架-多线程-源代码

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨Netty的多线程模型之前,我们先了解一下Netty的基本概念。 Netty的核心组件包括Bootstrap(引导类...

    基于netty的服务器客户端收发消息代码

    在这个“基于Netty的服务器客户端收发消息代码”中,我们可以看到一个简单的实现,它演示了如何使用Netty进行双向通信,即服务器与客户端之间的消息交换。 首先,我们从服务器端(ChatServer)入手。服务器端通常...

    netty高性能异步I/O服务器/客户端开源开发工具

    它是一个开源的Java库,广泛应用于服务器和客户端的开发,尤其在处理高并发、低延迟的网络服务时,Netty的表现尤为突出。Netty的核心是基于NIO(非阻塞I/O)和EventLoop事件驱动模型,这使得它能够在单线程或多线程...

    netty 包含客户端和服务器端

    - 高性能:Netty 使用 NIO(非阻塞 I/O)模型,允许单线程处理多个连接,减少了线程切换的开销。 - 易于使用:通过 ChannelHandler 和 ChannelPipeline,开发者可以方便地定义和插入自定义的处理逻辑。 - 强大的...

    netty4 Android客户端和服务器端

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本文中,我们将深入探讨如何使用 Netty 4.0.31.Final 版本构建 Android 客户端和服务器端应用程序。 首先...

    Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议

    本文将深入探讨如何使用Java的Netty框架实现一个基于DTU(Data Transfer Unit)的TCP服务器,该服务器具备多端口通信和多协议解析的能力。 首先,DTU是一种专门用于远程数据传输的设备,它能够通过GPRS、3G/4G等...

    java服务器端(Netty_Proto)和c++客户端tcp通讯.rar

    标题中的“java服务器端(Netty_Proto)和c++客户端tcp通讯”表明这是一个关于使用Java的Netty框架和Google的Protocol Buffers(ProtoBuf)进行TCP通信的项目。Netty是一个高性能、异步事件驱动的网络应用框架,常...

    使用jboss netty 创建高性能webservice客户端及服务端

    通过上述步骤,我们就可以利用JBoss Netty构建出高性能的Web Service客户端和服务端。不过,这只是基础,深入研究Netty源码可以帮助我们更好地理解和优化系统性能,例如了解其内部的事件调度机制、缓冲区管理以及...

    使用netty使用http协议开发文件服务器

    Netty是一个高性能、异步事件驱动的网络应用框架,常用于构建高效的服务器和客户端应用程序,尤其是在高并发场景下。在本示例中,我们将探讨如何使用Netty实现一个基于HTTP协议的文件服务器。 首先,我们需要理解...

    用Netty实现Websocket(包含服务器代码和客户端网页)

    通过以上步骤,你可以构建一个基本的基于Netty的WebSocket服务器和客户端。然而,实际项目中可能还需要处理更多复杂情况,比如多线程管理、负载均衡、错误处理、消息序列化等。熟悉Netty的API和特性,结合实际需求,...

    java socket tcpip多线程网络通信服务器客户端

    在“java socket tcpip多线程网络通信服务器客户端”这个主题中,我们将深入探讨如何使用Java Socket实现基于TCP/IP协议的多线程服务器和客户端通信。 TCP(传输控制协议)是一种面向连接的、可靠的、基于字节流的...

    netty3 客户端 服务端聊天

    总的来说,这个“netty3 客户端 服务端聊天”示例展示了如何利用 Netty 3 构建一个简单的聊天系统,涉及了 Netty 的基本架构、长连接机制、数据传输以及多用户通信等功能。在实际应用中,开发者还可以根据需求扩展,...

    基于Netty消息框架的Java游戏服务器.zip

    在构建高性能、高并发的游戏服务器时,Netty作为一个强大的异步事件驱动的网络应用程序框架,被广泛应用。"基于Netty消息框架的Java游戏服务器"项目,旨在利用Netty的特性来设计并实现一个高效的游戏服务器。这个...

    用netty实现文件传输

    总结,利用 Netty 实现文件传输是一个涉及网络编程、多线程、I/O 操作以及内存管理的综合实践。通过 Netty 的强大功能,我们可以构建高效、可靠的文件传输系统。在实际应用中,还需要考虑到性能优化、安全性以及异常...

    基于netty和protobuf的聊天系统,客户端+服务器

    通过这个项目,开发者不仅可以学习到如何使用Netty构建网络应用,还能掌握protobuf在实际项目中的应用,理解数据序列化和反序列化的流程。同时,这个聊天系统也可以作为进一步开发分布式系统、实时通信应用的基础,...

    使用 netty实现一个简单的聊天室

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在这个“使用 Netty 实现一个简单的聊天室”的项目中,我们将深入理解 Netty 的核心概念和组件,以及如何利用...

    多线程并发编程在Netty的应用分析

    多线程在这里的作用是为每个连接分配一个独立的工作线程,使得服务器可以并行处理多个客户端请求,避免了单线程模型中的阻塞问题。 Netty的线程模型主要包括Boss线程组和Worker线程组。Boss线程组负责接收新的连接...

    netty 推送 android客户端

    综上所述,"Netty推送android客户端"涉及到的技术栈广泛,包括Netty的网络通信、序列化与反序列化、Android客户端编程、心跳机制、安全传输以及异常处理等多个方面。在实际项目中,理解并熟练掌握这些知识点对于构建...

Global site tag (gtag.js) - Google Analytics