`
kavy
  • 浏览: 888505 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

搭建服务器处理系统(基于netty)

 
阅读更多

搭建服务器处理系统(基于netty)-我们到底能走多远系列(25)

推荐:

  google rest 一个不错的http测试应用,google浏览器可以使用。做接口什么的,很有帮助。亲,还不快了解一下。

扯淡:

  现在我算是进入一个能带着你向前走的团队,但是产品设计太扯淡,互联网应用,开发周期异常的短,也高估了开发的能力,赶进度的开发bug很多啊。

  如果开发只是完成任务的行动,是不会感到痛苦的。所以说:想要做好产品的开发,痛苦才刚刚开始。原因就是开发无法左右产品的设计.....

 

主题:

  时刻关注排行的朋友注意啦,你们都得了排行强迫症啦,赶快找个护士就医吧。

  一个排行.....(我需要护士)

关于排名的详细:摸我

好吧,据说netty排在第一,那就学习一下吧!

更具公司很久以前的一个服务器框架代码,不断删减,然后得到一个很简单的服务器框架,分享一下。

自己画的流程图,流程比较简单,这方面比较弱,不太会用图表达:

 

 

1,启用netty

我们需要监听端口,这样就可以处理连接上来的tcp消息了。这一步netty用java 的NIO和OIO都封装了,我们自然选择NIO啦。

一下是启动服务器的代码:对于这个启动,你只要学习一下netty的手册例子,就马上明白了,它手册的例子也很好,建议大家看看。

复制代码
public class Start {

    public static void main(String[] args) {
        //ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml");
        System.out.println("=============show time!=============");
        initNetty();
    }
    private static final int tcpSendBufferSize = 32768;
    private static final int tcpReceiveBufferSize = 32768;
    
    // 初始化端口的监听
    public static void initNetty(){
        InetSocketAddress addr = new InetSocketAddress(8989);//需要监听的端口,即tcp连接建立的端口
        //Executors.newCachedThreadPool()的解释:
        //缓冲线程执行器,产生一个大小可变的线程池。
        //当线程池的线程多于执行任务所需要的线程的时候,对空闲线程(即60s没有任务执行)进行回收;
        //当执行任务的线程数不足的时候,自动拓展线程数量。因此线程数量是JVM可创建线程的最大数目。
        ServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool());//It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. 
        //  Creates a new group with a generated name.
        DefaultChannelGroup allChannels = new DefaultChannelGroup("pushServerChannelGroup");
        
        ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
        
        // PushServerPipelineFactory作为一个ChannelPipelineFactory产生的工厂类,我们可以把需要执行的Handler进行配置
        ChannelPipelineFactory pipelineFactory = new PushServerPipelineFactory(allChannels);
        // Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory.
        // 服务器新连接建立的时候,新的ChannelPipeline会通过我们定义的ChannelPipelineFactory产生,其实是调用了getPipeline()方法。
        bootstrap.setPipelineFactory(pipelineFactory);
        
        if (tcpReceiveBufferSize != -1) {
            bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize);
        }
        if (tcpSendBufferSize != -1) {
            bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize);
        }
        
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("child.reuseAddress", true);
        bootstrap.setOption("child.keepAlive", false);
        bootstrap.setOption("child.tcpNoDelay", true);
        
        System.out.println(" ===================netty started=====================");
        Channel serverChannel = bootstrap.bind(addr);
        allChannels.add(serverChannel);
    }
复制代码

PushServerPipelineFactory 其实就是配置了一下Handler,他叫pushServerCommandHandler,他的作用就是把接受到的信息放进receivedQueen的队列去就好了,其实就是调用了MessageManageraddSocketMessage方法。

我们看一下他的messageReceived方法就明白了,netty是事件机制的,messageReceived是重写的方法,只要是受到一个连接的消息,就会触发这个方法。

复制代码
    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent)
            throws Exception {
        CommandMessage command = (CommandMessage) messageEvent.getMessage();
        
        if (command.message.length() > 3) {
             Channel ch = channelHandlerContext.getChannel();
             ch.write("---------message received-------------");
             // 向消息队列里插消息包,通过handleMessage这个方法,
             // 插入的MessagePack其实已经更具消息的不同被选择成不同的子类
             // 我觉得这是很关键的设计,我们的业务逻辑就可以分成不同的MessagePack子类,然后实现它的onHandler方法
             messageManager.addSocketMessage(handleMessage(command.message, messageEvent));
           
        } else {
           // logger.warn("too short message.");
        }
    }
    //重点方法
    public MessagePack handleMessage(String msg, MessageEvent e) {
        MessagePack messagePack = null;

        int fid = SjsonUtil.getFIDFromMsg(msg);
 
        switch (fid) {
        case 25: // 调用TestCategoryMsg
            messagePack = new ShowTimeMessage(msg, e.getChannel());
            break;
        case 26: // 调用不同的业务逻辑
            messagePack = new TestCategoryMsg(msg, e.getChannel());
            break;
        default:
           // logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress());
        }

        return messagePack;
    }

复制代码

 

PushServerPipelineFactory 除了配置好Handler,还把MessageManager启动起来了,MessageManager是spring的配置文件中配置的。注意他的init-method,就是实例化这个bean的时候会执行它的start方法,这个比较重要,因为MessageManager就是处理消息队列的模块,所以他需要在服务器启动时启动线程池去处理消息队列。MessageManager提供的方法就是用来维护一个叫receivedQueen的队列。

  <bean id="messageManager" class="netty.gate.message.MessageManager" init-method="start">
  </bean> 

PushServerPipelineFactory

复制代码
public class PushServerPipelineFactory implements ChannelPipelineFactory {

    private DefaultChannelGroup channelGroup;

    private final PushServerCommandHandler pushServerCommandHandler;

    private final PushServerEncoder pushServerEncoder = new PushServerEncoder();

    public PushServerPipelineFactory(DefaultChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
        
        this.pushServerCommandHandler = new PushServerCommandHandler(this.channelGroup);
        
        pushServerCommandHandler.setMessageManager((MessageManager) TaskBeanFactory.getBean("messageManager"));

    }

    public final ChannelPipeline getPipeline() throws Exception {
        return Channels.pipeline(new PushServerCommandDecoder(), pushServerCommandHandler, pushServerEncoder);
    }

    protected ChannelPipelineFactory createPushServerPipelineFactory(DefaultChannelGroup allChannels) {
        return new PushServerPipelineFactory(allChannels);
    }

}
复制代码

很关键的MessageManager: 维护的是receivedQueen队列

复制代码
public class MessageManager {

    // MessageManager的消息队列,下面的addSocketMessage方法就是向这个队列塞MessagePack的
    private LinkedBlockingQueue<MessagePack> receivedQueen = new LinkedBlockingQueue<MessagePack>(512);

    private ExecutorService pool;

    private int reStartThreadCount = 0;


    public void start() {
        this.pool = Executors.newCachedThreadPool();
        pool.submit(new PushRecvThread());
    }

    private class PushRecvThread implements Runnable {

        public void run() {
            while (true) {
                MessagePack message = waitForProcessMessage();
                if (message != null) {
                    // 利用多态执行继承MessagePack的子类方法
                    message.onHandler(TaskBeanFactory.getContextInstance());
                }
            }
        }
    }
    
    public MessagePack waitForProcessMessage() {
        MessagePack message = null;
        while (message == null) {
            try {
                // 从队列中取继承MessagePack的实例
                message = receivedQueen.poll(10, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                // TODO log
            }
        }
        return message;
    }
    
    public void addSocketMessage(MessagePack message) {
        if (message != null) {
            try {
                boolean success = receivedQueen.offer(message, 15, TimeUnit.SECONDS);
                if (false == success) {
                    // maybe PushRecvThread is break,restart the thread again
                    if (reStartThreadCount < 10) {
                        pool.submit(new PushRecvThread());
                        reStartThreadCount++;
                    }
                } else {
                }
            } catch (InterruptedException e) {
                // TODO log
            }
        }
        return;
    }
复制代码

正真的处理逻辑的代码是写在这些继承MessagePack抽象类里的,里面的一个onHandler方法是必须实现的,所以使用了抽象类

下面代码的onHandler中,就可以写那些调用service层的,处理数据库,发邮件,调用接口啊等各种需求操作了。

复制代码
public class TestCategoryMsg extends MessagePack {

    private static final String MESSAGE_NAME = "TEST_MESSAGE"; // 消息名称

    public TestCategoryMsg(String msg, Channel channel) {
        super(msg, channel);
    }

    @Override
    public void onHandler(ApplicationContext ctx) {
        channel.write("---------------i dont know why--------------");
    }

    public String getName() {
        return MESSAGE_NAME;
    }

}
复制代码


到此基本上一个服务器从接受数据,到回应数据的流程已经走完了。

我想上源码,可是看不到附件... 我表示无能为力啦 !

 

 

 

 

让我们继续前行

----------------------------------------------------------------------

努力不一定成功,但不努力肯定不会成功。
共勉。

 

 
 
 
绿色通道: 好文要顶 关注我 收藏该文与我联系 
0
0
 
(请您对文章做出评价)
 
分享到:
评论

相关推荐

    开源 MQTT 服务器(基于reactor-netty实现高性能的、可扩展、支持千万级设备接入集群)

    开源 MQTT 服务器(基于reactor-netty实现高性能的、可扩展、支持千万级设备接入集群)支持 mqtt 3.1.1、mqtt 5 SMQTT基于reactor-netty(spring-webflux底层依赖) 开发,底层采用Reactor3反应堆模型,支持单机部署,...

    基于netty-socketio 进行全平台消息推送1

    Netty-SocketIO 是一个基于Netty框架的Socket.IO服务器实现,由mrniko维护。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于构建高效的网络服务。通过集成Netty-SocketIO,开发者可以利用Netty的强大...

    基于Netty消息框架的Java游戏服务器.zip

    "基于Netty消息框架的Java游戏服务器"项目,旨在利用Netty的特性来设计并实现一个高效的游戏服务器。这个压缩包可能包含源代码、配置文件和其他相关文档,帮助我们理解如何在Java中利用Netty搭建游戏服务器。 首先...

    基于netty封装的快速分布式任务开发框架

    【基于Netty封装的快速分布式任务开发框架】是一种高效、轻量级的解决方案,它将Netty的强大功能与分布式任务处理的灵活性相结合。Netty是一个高性能的异步事件驱动的网络应用程序框架,常用于构建高度可扩展的网络...

    基于netty实现mqtt协议 服务器端开发,可解码http、mqtt协议请求

    1.基于netty绑定端口监听,对于mqtt消息和http请求消息分别绑定不同的监听端口; 2.在MQTTServerInitializer中,分别添加mqtt编码解码器和http编码解码器,并分别将自定义的mqtt消息处理handle类和http消息handle类...

    基于netty和protobuf的聊天系统,客户端+服务器

    这个项目“基于netty和protobuf的聊天系统,客户端+服务器”就是这样一个实例,它展示了如何利用Java语言结合Netty框架和Protocol Buffers(protobuf)来搭建一个高性能、低延迟的聊天应用。 Netty是一个开源的异步...

    netty搭建TCP、UDP服务

    Netty是Java领域的一款高效、高性能的异步事件驱动网络应用框架,专为构建可扩展且高度并发的网络应用程序而设计。...无论是在物联网、游戏服务器、大数据传输还是其他分布式系统中,Netty都能发挥出色的作用。

    基于netty搭建websocket实现消息主动推送【Springbooot项目,可直接使用】

    而SpringBoot是基于Spring框架的微服务开发工具,简化了Spring应用的初始搭建以及开发过程。本项目结合了这三者,旨在构建一个可直接使用的WebSocket消息推送系统。 首先,我们需要理解SpringBoot如何集成WebSocket...

    基于netty的java游戏服务器.zip

    本项目以"基于Netty的Java游戏服务器"为例,详细讲解如何利用Netty来搭建高效稳定的游戏服务端。 首先,Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它的核心在于...

    基于Netty的Java WebSocket集群框架。.zip

    综上所述,"基于Netty的Java WebSocket集群框架"项目涵盖了网络编程、WebSocket协议、Java后端开发、分布式系统等多个方面的技术,是一个综合性的实战项目,对于提升开发者在实时通信领域的技能具有很高的价值。

    springboot+netty开发简易式游戏源码

    2. **游戏服务器**:Netty作为游戏服务器,监听客户端的TCP连接,处理游戏状态更新、消息广播等实时通信。 3. **WebSocket集成**:Spring Boot可以通过WebSocket Stomp协议与Netty进行交互,提供实时双向通信。 4. *...

    mqttserver:基于netty实现mqtt协议 服务器端开发

    mqttserver,基于netty 4.1.1,可解码http、mqtt协议请求。 项目包括: 1.基于netty绑定端口监听,对于mqtt消息和http请求消息分别绑定不同的监听端口; 2.在MQTTServerInitializer中,分别添加mqtt编码解码器和http...

    基于netty+mqtt3.1.1+springboot+jdk8 实现的 mqtt 服务端跟客户端.zip

    这是一个基于Java技术栈,具体是Netty、MQTT 3.1.1协议、Spring Boot框架和JDK 8实现的MQTT服务器和客户端的项目。这个项目对于学习和实践物联网(IoT)通信以及Java后端开发具有很高的价值,尤其适合作为毕业设计的...

    Java基于Netty的websocket微服务架构即时聊天即时通信APP源码,仿微信APP

    主要技术点:基于Netty NIO形式通过 WebSocket 协议进行通讯基于Redis GeoHash 数据结构实现检索用户「附近的人」基于Docker搭建FastDFS存储服务器并实现对应接口基于策略模式+工厂模式解除WebSocket的信息处理类的...

    基于Netty框架的Android内网推送demo

    本项目"基于Netty框架的Android内网推送demo"就是这样一个示例,它演示了如何利用Netty在Android设备上实现内网通信。 首先,我们需要了解Netty的核心概念。Netty的主要组件包括Bootstrap(引导类)、...

    使用Netty搭建WebSocket服务器,可修改单包大小限制

    Netty是一个高性能、异步事件驱动的网络应用程序框架,它非常适合用来构建WebSocket服务器。 Netty提供了WebSocketServerProtocolHandler来处理WebSocket协议,但在默认情况下,它会对接收到的数据包大小进行限制。...

    本科毕业设计,基于netty 的web服务器以及基本开发框架.zip

    【本科毕业设计——基于Netty的Web服务器及基础开发框架】 在本科毕业设计中,学生通常会选择一个实际问题或技术领域进行深入研究,并构建一个具有实际应用价值的项目。本项目选择的是基于Netty实现的Web服务器,...

    毕设项目:基于netty+websocket+springboot的实时聊天系统.zip

    这是一个基于Netty、WebSocket和Spring Boot技术实现的实时聊天系统的毕设项目,下面将详细介绍这个项目的相关知识点。 首先,我们来了解一下Spring Boot。Spring Boot是由Pivotal团队提供的全新框架,其设计目标是...

    rtmpServer-master_nettyrtmp_rtmp推流_netty开发rtmp_rtmpServer-master

    本项目“rtmpServer-master_nettyrtmp_rtmp推流_netty开发rtmp_rtmpServer-master”是针对RTMP协议开发的一个服务器端实现,它基于强大的Java网络库Netty。以下将详细介绍该服务器的开发背景、功能、核心技术和应用...

Global site tag (gtag.js) - Google Analytics