`

netty学习之ChannelSink(NioClientSocketPipelineSink)

阅读更多

       这个东西应该是netty里面最难理解的,或者最关键的组件了,这个我会慢慢的进行分析。在Pipeline传送完后,都必须都通ChannelSink进行处理。Sink默认处理了琐碎的操作,例如连接、读写等等。

         ChannelSink这个组件是来处理downstream请求和产生upstream时间的一个组件,是所有io操作的执行者。也就是传输的逻辑层吧。当channel创建的时候就有一个ChannelSink和它想绑定。

        传输层的代码实现一般来说都是比较麻烦的,相比来说客户端的实现一般来说比服务端的实现要简单一些,服务端一般要处理状态变换和数据交换等,我们一点一点来看ChannelSink。

public interface ChannelSink {

    /**
     * Invoked by {@link ChannelPipeline} when a downstream {@link ChannelEvent}
     * has reached its terminal (the head of the pipeline).
     */
    void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception;

    /**
     * Invoked by {@link ChannelPipeline} when an exception was raised while
     * one of its {@link ChannelHandler}s process a {@link ChannelEvent}.
     */
    void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception;
	}

         接口里面只定义了两个操作,一个是eventSunk,一个是exceptionCaught,eventSunk我不知道怎么翻译,姑且理解为让event处理吧,一般来说应该有一个abstract骨架程序实现,我们就来看吧。

        AbstractChannelSink里面只实现了exceptionCaught,来看一眼:

        

public void exceptionCaught(ChannelPipeline pipeline,
            ChannelEvent event, ChannelPipelineException cause) throws Exception {
        Throwable actualCause = cause.getCause();
        if (actualCause == null) {
            actualCause = cause;
        }

        fireExceptionCaught(event.getChannel(), actualCause);
    }

        这个实际上是向上层报告一个DefaultExceptionEvent的upstream事件。

 

      下来我们就直奔主题来看NioClientSocketPipelineSink这个类吧。

       首先来看局部变量

private static final AtomicInteger nextId = new AtomicInteger();

    final int id = nextId.incrementAndGet();
    final Executor bossExecutor;

    private final Boss[] bosses;
    private final NioWorker[] workers;

    private final AtomicInteger bossIndex = new AtomicInteger();
    private final AtomicInteger workerIndex = new AtomicInteger();

      上面是sink的id,这个地方用到了AtomicInteger。还有Boss和NioWorker,boss的个数现在默认是1,NioWorker的个数是处理器的个数的2倍。后面两个是bossIndex和workerIndex。

     在构造函数里面初始化了Boss和NioWorker

NioClientSocketPipelineSink(
            Executor bossExecutor, Executor workerExecutor,
            int bossCount, int workerCount) {
        this.bossExecutor = bossExecutor;
        
        bosses = new Boss[bossCount];
        for (int i = 0; i < bosses.length; i ++) {
            bosses[i] = new Boss(i + 1);
        }
        
        workers = new NioWorker[workerCount];
        for (int i = 0; i < workers.length; i ++) {
            workers[i] = new NioWorker(id, i + 1, workerExecutor);
        }
    }

 我们接下来看看eventSunk的实现:

 

 public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioClientSocketChannel channel =
                (NioClientSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();

            switch (state) {
            case OPEN:
                if (Boolean.FALSE.equals(value)) {
                    channel.worker.close(channel, future);
                }
                break;
            case BOUND:
                if (value != null) {
                    bind(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
            case CONNECTED:
                if (value != null) {
                    connect(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
            case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                break;
            }
        } else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            boolean offered = channel.writeBuffer.offer(event);
            assert offered;
            channel.worker.writeFromUserCode(channel);
        }
    }
         在这个里面如果是Channel的state变化的事件,则执行相应的操作,最终实现都将交给work去操作,关于worker的这些操作,我们以后会详细的介绍。

这个里面如果是bind操作的话会自己调用bind,如下:

 

private void bind(
            NioClientSocketChannel channel, ChannelFuture future,
            SocketAddress localAddress) {
        try {
            channel.socket.socket().bind(localAddress);
            channel.boundManually = true;
            channel.setBound();
            future.setSuccess();
            fireChannelBound(channel, channel.getLocalAddress());
        } catch (Throwable t) {
            future.setFailure(t);
            fireExceptionCaught(channel, t);
        }
    }

       这个里面,channel、future、localAddress都是Event里面携带的一些参数,其实这个里面实现了socket的bind操作,并触发一些事件。

最后我们看一下connect的实现:

    

private void connect(
            final NioClientSocketChannel channel, final ChannelFuture cf,
            SocketAddress remoteAddress) {
        try {
            if (channel.socket.connect(remoteAddress)) {
                channel.worker.register(channel, cf);
            } else {
                channel.getCloseFuture().addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture f)
                            throws Exception {
                        if (!cf.isDone()) {
                            cf.setFailure(new ClosedChannelException());
                        }
                    }
                });
                cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                channel.connectFuture = cf;
                nextBoss().register(channel);
            }

        } catch (Throwable t) {
            cf.setFailure(t);
            fireExceptionCaught(channel, t);
            channel.worker.close(channel, succeededFuture(channel));
        }
    }

    这个里面会执行真正的注册动作,如果失败的话会触发一个connectFuture,这条语句 nextBoss().register(channel)不知道是什么意思?

      关于客户端channelSink就先看到这个地方,接下来会讲解Netty的线程模型,讲完之后我们再回来看服务端的channelSink的实现。

 

分享到:
评论

相关推荐

    NettyChannel

    NettyChannel是一个基于Netty框架的测试程序,它包含了客户端(client)和服务器端(server)的实现,目的是为了深入理解Netty中的Channel概念及其工作流程。Netty是一个高性能、异步事件驱动的网络应用程序框架,...

    netty学习之ServerChannel

    Netty学习之ServerChannel Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在本篇中,我们将深入探讨ServerChannel这一核心概念,它是Netty中用于接收客户端...

    Netty进阶之路-跟着案例学Netty

    《Netty进阶之路-跟着案例学Netty》是由知名技术专家李林峰撰写的一本专为Java开发者深入理解Netty框架而准备的书籍。这本书旨在通过实例教学,帮助读者全面掌握Netty的核心特性和实战技巧,提升网络编程的能力。 ...

    Netty学习资料.zip

    课件则会系统地呈现 Netty 的各个知识点,可能包括Netty的基本架构(如BossGroup和WorkerGroup)、Channel、EventLoop、Pipeline等概念,以及如何配置和使用它们。课件中的实例和图示将帮助你形象地理解这些抽象概念...

    Netty全套学习资源(包括源码、笔记、学习文档等)

    其中,Channel、EventLoop、ByteBuf 等关键类是理解 Netty 的基础,而 BossGroup、WorkerGroup 和 ChannelHandlerContext 等概念则有助于构建完整的网络通信模型。 二、Netty 学习笔记 学习笔记通常包含了作者在...

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    《Netty进阶之路 跟着案例学Netty》.rar

    Java进阶技术-netty进阶之路

    netty官网学习手册中文版

    这个“netty官网学习手册中文版”针对的是Netty的3.1版本,虽然现在的Netty已经发展到了5.x版本,但3.1版本的知识仍然具有历史参考价值,特别是对于那些初次接触或需要理解Netty基础概念的开发者来说。 1. **Netty...

    《Netty进阶之路 跟着案例学Netty》_李林锋_

    Netty进阶之路 跟着案例学Netty 整本书无密码,Netty进阶之路 跟着案例学Netty

    Netty核心之Channel源码学习

    在这个Netty核心之Channel源码的学习过程中,我们将主要关注服务端启动的核心路径,包括`newChannel()`、`init()`、`register()`和`doBind()`等关键步骤。 1. **创建Channel对象**: 在服务端启动时,首先通过`new...

    Netty进阶之路:跟着案例学Netty 完整版.pdf

    《Netty进阶之路:跟着案例学Netty》中的案例涵盖了Netty的启动和停止、内存、并发多线程、性能、可靠性、安全等方面,囊括了Netty绝大多数常用的功能及容易让人犯错的地方。在案例的分析过程中,还穿插讲解了Netty...

    netty-netty-4.1.19.Final.zip_netty_netty学习_rocketmq

    在Netty中,EventLoop(事件循环)负责处理I/O事件,而Channel(通道)则代表网络连接,它们是进行数据传输的基础。ChannelHandler(通道处理器)是业务逻辑的载体,通过ChannelPipeline(通道管道)组织成处理链,...

    java netty学习资料

    这个学习资料包“java netty学习资料”很可能是为了帮助开发者理解和掌握Netty的核心概念和实践应用。 在Netty中,定义消息协议通讯是构建网络应用的关键步骤。Netty提供了一种灵活的方式来定义自定义的编解码器,...

    netty学习之心跳.rar

    在“netty学习之心跳.rar”这个压缩包中,包含的是关于Netty如何实现心跳处理的示例代码,分别针对Netty 3和Netty 5两个版本。心跳机制在网络通信中扮演着至关重要的角色,它确保了连接的活跃性,并能及时检测到网络...

    高清_书签_Netty进阶之路 跟着案例学Netty.zip

    在本书中,作者将在过去几年实践中遇到的问题,以及Netty学习者咨询的相关问题,进行了归纳和总结,以问题案例做牵引,通过对案例进行剖析,讲解问题背后的原理,并结合Netty源码分析,让读者能够真正掌握Netty,在...

    Netty实践学习案例

    4. **强大的管道(Channel Pipeline)架构**:Netty 的 Channel Pipeline 模式允许开发者自定义处理链,每个处理节点(Handler)负责特定的任务,增强了代码的可扩展性和可维护性。 5. **易于使用和调试**:Netty ...

    Netty学习与实战Demo-netty-learn.zip

    这个“Netty学习与实战Demo-netty-learn.zip”压缩包显然是为帮助学习者深入理解和实践Netty而准备的。下面将详细探讨Netty的核心概念、关键特性以及如何通过提供的示例进行学习。 1. **Netty核心概念** - **NIO...

    netty学习资料001

    这个“netty学习资料001”压缩包可能是为了帮助初学者或有经验的开发者深入理解Netty的工作原理和用法。下面我们将详细探讨Netty的核心概念、特性以及它在实际应用中的价值。 一、Netty简介 Netty最初由JBOSS团队...

    netty学习教程

    这个“netty学习教程”压缩包包含了19个PDF文档,旨在全面介绍Netty的基础知识以及实际应用。以下是根据这些文档标题和描述可能涵盖的主要知识点: 1. **Netty基础** - Netty架构:理解Netty的核心组件,如Event...

Global site tag (gtag.js) - Google Analytics