`
rxin2009
  • 浏览: 17360 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

netty基于事件的过程流转

 
阅读更多

本为分析netty源码中的过程流转实现

 

netty中的处理器的流转体现在DefaultChannelPipeline类中,在添加处理器时一般调用addLast(String, ChannelHandler),下面来看看这个方法

public synchronized void addLast(String name, ChannelHandler handler) {
        if (name2ctx.isEmpty()) {
            init(name, handler);
        } else {
            checkDuplicateName(name);
            DefaultChannelHandlerContext oldTail = tail;
            DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);

            callBeforeAdd(newTail);

            oldTail.next = newTail;
            tail = newTail;
            name2ctx.put(name, newTail);

            callAfterAdd(newTail);
        }
    }

 

在第一次调用的时候会执行init(name, handler),也就是添加第一个处理器,他会初始化DefaultChannelPipeline的全局变量head、tail,分别表示第一个和最后一个处理器,还有name2ctx,一个map,表示名称和上下文键值对,每一个处理器就有一个上下文类DefaultChannelHandlerContext。

以下是全局变量

static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
    static final ChannelSink discardingSink = new DiscardingChannelSink();

    private volatile Channel channel;
    private volatile ChannelSink sink;
    private volatile DefaultChannelHandlerContext head;
    private volatile DefaultChannelHandlerContext tail;
    private final Map<String, DefaultChannelHandlerContext> name2ctx =
        new HashMap<String, DefaultChannelHandlerContext>(4);

 当再次添加处理器时,多个处理器会构成一个双向链表,这个在构造方法中实现

DefaultChannelHandlerContext(
                DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
                String name, ChannelHandler handler) {

            if (name == null) {
                throw new NullPointerException("name");
            }
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            canHandleUpstream = handler instanceof ChannelUpstreamHandler;
            canHandleDownstream = handler instanceof ChannelDownstreamHandler;


            if (!canHandleUpstream && !canHandleDownstream) {
                throw new IllegalArgumentException(
                        "handler must be either " +
                        ChannelUpstreamHandler.class.getName() + " or " +
                        ChannelDownstreamHandler.class.getName() + '.');
            }

            this.prev = prev;
            this.next = next;
            this.name = name;
            this.handler = handler;
        }

 

这个时候多个处理器的上下文就构成一个双向链表,同时上下文类还清晰的标明了处理器是属于上游处理器还是下游处理器。

 

一个上游事件流入管道,它是从head流入的,而下游处理器是从tail流入的,如代码

public void sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        if (head == null) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "The pipeline contains no upstream handlers; discarding: " + e);
            }

            return;
        }

        sendUpstream(head, e);
    }

    void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        try {
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    }

    public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
        if (tail == null) {
            try {
                getSink().eventSunk(this, e);
                return;
            } catch (Throwable t) {
                notifyHandlerException(e, t);
                return;
            }
        }

        sendDownstream(tail, e);
    }

    void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        if (e instanceof UpstreamMessageEvent) {
            throw new IllegalArgumentException("cannot send an upstream event to downstream");
        }

        try {
            ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
        } catch (Throwable t) {
            // Unlike an upstream event, a downstream event usually has an
            // incomplete future which is supposed to be updated by ChannelSink.
            // However, if an exception is raised before the event reaches at
            // ChannelSink, the future is not going to be updated, so we update
            // here.
            e.getFuture().setFailure(t);
            notifyHandlerException(e, t);
        }
    }

 

 

一个上游事件在执行完第一个上游处理器的代码后,会调用上下文DefaultChannelHandlerContext的sendUpstream方法,如下代码

public void sendUpstream(ChannelEvent e) {
            DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
            if (next != null) {
                DefaultChannelPipeline.this.sendUpstream(next, e);
            }
        }

 这个方法的作用就是找到下一个上游处理器,对比下游事件的流转代码就是找到下一个下游处理器

public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
            if (prev == null) {
                try {
                    getSink().eventSunk(DefaultChannelPipeline.this, e);
                } catch (Throwable t) {
                    notifyHandlerException(e, t);
                }
            } else {
                DefaultChannelPipeline.this.sendDownstream(prev, e);
            }
        }

 下游事件是从tail节点开始向head方向找处理器,如果没有处理器了,那么将执行事件下沉,就是实现底层的数据传输,这个一般不用用户插手。而上游事件达到处理器末端后是什么都不做,表示一个上游事件处理完毕。

 

事件的过程流转,总的来说就是,处理事件时DefaultChannelPipeline找到第一个匹配的处理器上下文,然后执行处理器代码,在通过上下文类DefaultChannelHandlerContext找到下一个匹配的处理器。这里有一个值得注意的地方就是事件是否继续流转到下一个处理器是由处理器自身决定的,如ctx.sendUpstream(e),这样就可以灵活控制事件的流转了。

分享到:
评论

相关推荐

    基于netty的长链硬件服务端部分源码

    发送报文给硬件的过程则涉及到了Netty的`ChannelHandlerContext`的`writeAndFlush`方法。这个方法会将消息写入到写出缓冲区,并触发flush操作,确保消息立即被发送出去。为了控制硬件,我们可能需要构建特定的报文...

    netty及时通讯通讯DEMO

    Netty基于Java NIO(Non-blocking I/O)库构建,这使得Netty能够高效地处理大量并发连接。NIO是非阻塞的,意味着当读写操作不可用时,线程不会被阻塞,而是可以继续处理其他任务。 3. **ByteBuf**: ByteBuf是...

    maven-netty-client

    Maven Netty Client 是一个基于Java的高性能网络应用框架,它主要用于构建可伸缩、高并发的网络客户端。Netty 提供了一套强大而灵活的API,使得开发者能够高效地处理TCP、UDP、HTTP、HTTPS等各种协议的网络通信。本...

    jt808-tcp-netty_nettyjt808_jt808_

    这个项目"jt808-tcp-netty"显然是一个基于Java的实现,利用Netty框架来处理JT808协议的TCP通信。 Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发可伸缩且高度并发的服务器和客户端。在JT808协议的实现...

    Netty使用心得,Netty实战

    1. **异步非阻塞 I/O**:Netty 采用基于通道(Channel)和事件处理器(EventLoop)的异步模型,使得它能够高效处理大量并发连接,减少资源消耗。在高并发场景下,非阻塞 I/O 能有效提高系统吞吐量。 2. **零拷贝**...

    Netty简介 Netty线程模型和EventLoop Codec编码与解码 ByteBuf容器

    Netty的线程模型是基于NIO(非阻塞I/O)的EventLoop设计。EventLoop是Netty的核心组件,它是一个单线程执行任务的循环,负责处理I/O事件并分发到对应的处理器。每个EventLoop都包含一个Selector,用于监听多个通道的...

    SpringBoot_Netty实例(参照文本网址)

    SpringBoot的使用可以帮助我们快速构建基于Netty的应用,并且通过Spring的依赖注入和自动配置能力,使得服务的管理和扩展变得更加简单。 首先,我们需要理解SpringBoot如何与Netty集成。在SpringBoot应用中引入...

    netty实现简易tomcat

    Netty 的架构基于 Channel、EventLoop、EventLoopGroup 和 ChannelHandler。Channel 是网络连接的抽象,用于读写数据;EventLoop 是处理 I/O 事件的线程;EventLoopGroup 是一组 EventLoop 的集合,通常用于分配任务...

    netty分隔符和定长解码器的应用

    在Netty中,解码器是处理接收数据的重要组件,用于将接收到的原始字节流转换为可操作的对象。本篇将深入探讨Netty中的两种解码器:分隔符解码器(DelimiterBasedFrameDecoder)和定长解码器...

    netty技术文档,socket技术第七章

    5. **ChannelHandlerContext**:在处理编解码的过程中,ChannelHandlerContext 是一个关键接口,它提供了对上下文的访问,包括发送和接收数据、触发事件以及获取和配置通道属性的能力。 6. **Pipeline**:Netty 的 ...

    使用netty使用http协议开发文件服务器

    在初始化过程中,我们通常会添加解码器(如HttpObjectDecoder)来将原始的字节流转换为HTTP对象,以及编码器(如HttpResponseEncoder)来将HTTP响应转换回字节流。之后,我们添加自定义的HttpRequestHandler到...

    itstack-demo-netty-master.zip

    《深入剖析Netty框架——基于itstack-demo-netty-master.zip》 Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。本项目"itstack-demo-netty-master.zip"是针对...

    netty权威指南书签高清第二版+随书完整源码

    1. **异步非阻塞I/O**:Netty基于Java NIO(非阻塞I/O)构建,能够处理大量并发连接,减少了CPU等待I/O操作的时间,提高了系统的整体性能。 2. **高性能和低内存开销**:Netty通过零拷贝、内存池等技术,实现了高效...

    掘金小册《Netty 入门与实战:仿写微信 IM 即时通讯系统》代码

    1. **NIO (Non-blocking I/O)**: Netty 基于 Java NIO 模型,利用选择器(Selector)和通道(Channel)实现非阻塞I/O,提高了并发性能。理解如何创建和管理多路复用器,以及如何处理ChannelEvent,对于构建高性能网络...

    《netty实战》http协议、自定义协议、自定义RPC模块学习源码.zip

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在深入探讨Netty之前,我们先来理解一下HTTP协议、自定义协议和RPC(远程过程调用)。 HTTP(HyperText ...

    JAVA 架构练习项目:

    1. **非阻塞I/O模型**:Netty基于NIO(非阻塞I/O)和EPOLL事件驱动模型,提供了一种高效的方式来处理大量并发连接。这种模型允许服务器在等待数据到来时不会被阻塞,从而提高了系统的整体性能。 2. **Channel和...

    rtsp2rtmp.zip

    【rtsp2rtmp.zip】是一个包含JavaWeb推流技术的项目,主要目的是将RTSP视频流转换并封装成RTMP格式,然后推送到由Netty实现的RTMP服务器。这个压缩包中包含了多个文件,如`.gitignore`用于定义Git忽略的文件,`...

    如何实现一个简单的RPC框架

    1. **网络通信库**:Netty是一个高性能、异步事件驱动的网络应用框架,常用于构建高并发、低延迟的服务器。在RPC框架中,Netty可以处理客户端与服务端之间的网络连接,接收和发送数据。 2. **序列化与反序列化**:...

    004 springCloudAlibaba Gateway

    10. **日志和监控**:集成Zipkin、Skywalking等分布式追踪系统,可以追踪请求在整个系统中的流转过程,有助于排查问题。同时,可以配合Prometheus和Grafana进行性能指标监控。 总的来说,SpringCloud Alibaba ...

    web端 多路 视频实时播放rtsp流

    SpringBoot是基于Spring框架的轻量级开发工具,简化了创建独立的、生产级别的基于Spring的应用程序过程。在这个项目中,SpringBoot作为后端服务的基础,提供了RESTful API接口,用于处理WebSocket连接和处理FFmpeg的...

Global site tag (gtag.js) - Google Analytics