`

深入NSQ 之旅

阅读更多

介绍 
 

NSQ 是一个实时的分布式消息平台。它的设计目标是为在多台计算机上运行的松散服务提供一个现代化的基础设施骨架。 
 

这篇文章介绍了 基于go语言的NSQ的内部架构,它能够为高吞吐量的网络服务器带来 性能的优化,稳定性和鲁棒性。 
 

可以说, 如果不是因为我们在 bitly 使用go语言,NSQ就不会存在。这里既会讲NSQ的功能也会涉及语言提供的特征。当然,语言会影响思维,这次也不例外。 
 

现在回想起来,选择使用go语言已经收到了十倍的回报。由语言带来的兴奋和社区的积极反馈为这个项目提供了极大的帮助。

概要 
 

NSQ是由3个进程组成的:

  • nsqd 是一个接收、排队、然后转发消息到客户端的进程。

  • nsqlookupd 管理拓扑信息并提供最终一致性的发现服务。

  • nsqadmin 用于实时查看集群的统计数据(并且执行各种各样的管理任务)。

NSQ中的数据流 模型是由 streams 和 consumers 组成的tree。topic是一种独特的 stream。channel是一个订阅了给定topic的consumers 逻辑分组。

单个nsqd可以有多个topic,每个topic可以有多个channel。channel接收这个topic所有消息的副本,从而实现多播分发,而channel上的 每个消息 被分发给它的订阅者,从而实现负载均衡。

这些基本成员组成了 一个 可以表示各种 简单和复杂拓扑结构 的 强大框架 。 
有关NSQ的设计的更多信息请参见 设计文档 。

Topics 和 Channels

Topics 和 channels,是NSQ的核心成员,它们是如何使用go语言的特点来设计系统的最好示例。

Go的channels(为防止歧义,以下简称为“go-chan”)是表达队列的一种自然方式,因此一个NSQ的topic/channel,其核心就是一个存放消息指针的go-chan缓冲区。缓冲区的大小由 --mem-queue-size 配置参数确定。

读取数据后,向topic发布消息的行为包括:

  • 实例化消息结构 (并分配消息体的字节数组)

  • read-lock 并获得 Topic

  • read-lock 并检查是否可以发布

  • 发送到go-chan缓冲区

为了从一个topic和它的channels获得消息,topic不能按典型的方式用go-chan来接收,因为多个goroutines在一个go-chan上接收将会 分发 消息,而期望的结果是把每个消息 复制 到所有channel(goroutine)中。

此外,每个topic维护3个主要goroutine。第一个叫做 router,负责从传入的go-chan中读取新发布的消息,并存储到一个队列里(内存或硬盘)。

第二个,称为 messagePump, 它负责复制和推送消息到如上所述的channel中。

第三个负责 DiskQueue IO,将在后面讨论。

Channels稍微有点复杂,它的根本目的是向外暴露一个单输入单输出的go-chan(事实上从抽象的角度来说,消息可能存在内存里或硬盘上);

另外,每一个channel维护2个时间优先级队列,用于延时和消息超时的处理(并有2个伴随goroutine来监视它们)。

并行化的改善是通过管理每个channel的数据结构来实现,而不是依靠go运行时的全局定时器。

注意:在内部,go运行时使用一个优先级队列和goroutine来管理定时器。它为整个time包(但不局限于)提供了支持。它通常不需要用户来管理时间优先级队列,但一定要记住,它是一个有锁的数据结构,有可能会影响 GOMAXPROCS>1 的性能。请参阅 runtime/time.goc 。

Backend / DiskQueue

NSQ的一个设计目标是绑定内存中的消息数目。它是通过DiskQueue(它拥有前面提到的的topic或channel的第三个goroutine)透明的把消息写入到磁盘上来实现的。

由于内存队列只是一个go-chan,没必要先把消息放到内存里,如果可能的话,退回到磁盘上: 
 

for msg := range c.incomingMsgChan {
    select {
    case c.memoryMsgChan <- msg:
    default:
        err := WriteMessageToBackend(&msgBuf, msg, c.backend)
        if err != nil {
            // ... handle errors ...
        }
    }
}

利用go语言的select语句,只需要几行代码就可以实现这个功能:上面的default分支只有在memoryMsgChan 满的情况下才会执行。

NSQ也有临时channel的概念。临时channel会丢弃溢出的消息(而不是写入到磁盘),当没有客户订阅后它就会消失。这是一个Go接口的完美用例。Topics和channels有一个的结构成员被声明为Backend接口,而不是一个具体的类型。一般的 topics 和channels使用DiskQueue,而临时channel则使用了实现Backend接口的DummyBackendQueue。

减少垃圾回收的压力

在任何带有垃圾回收的环境里,你都会多多少少感受到吞吐量(工作有效性)、延迟(响应能力)、驻留集大小(内存使用量)的压力。

就 Go 1.2 而言,垃圾回收有标记-清除(并发的)、不再生、不紧凑、阻止一切运行、大体精准的特点。大体精准是因为剩下的工作没有及时的完成(这是 Go 1.3 的计划)。

Go 的垃圾回收机制当然会持续改进,但普遍的真理是:创建的垃圾越少,回收垃圾的时间越少。

首先,理解垃圾回收是如何在实际的工作负载中运行的是非常重要的。为此, nsqd 以 statsd 的格式 (与其它内部指标一起) 发布垃圾回收的统计信息。 nsqadmin 显示这些指标的图表,可以让你深入了解它在频率和持续时间两方面产生的影响:


 

为了减少垃圾,你需要知道它们是在哪生成的。再次回到Go的工具链,它提供的答案如下:

  • 使用 testing 包和go test -benchmen来基准测试热点代码路径。它配置了每个迭代分配的数字(基准的运行可与 benchcmp 进行比较)。

  • 使用 go build -gcflags -m 创建,将会输出 逃逸分析 的结果。

除此之外,它还提供了 nsqd 的如下优化:

  • 避免把[]byte 转化为字符串类型.

  • 重复使用缓存或者对象(有时也许是 sync.Pool 又称为 issue4720 ).

  • 预分配切片(特别是make的能力)并总是知晓链中各个条目的数量和大小。

  • 提供各种配置面板(如消息大小)的限制。

  • 避免封装(如使用interface{})或者不必要的包装类(例如 用一struct给一个多值的go-chan).

  • 在热代码路径(它指定的)中避免使用defer。

TCP 协议

NSQ的TCP协议 是一个闪亮的会话典范,在这个会话中垃圾回收优化的理论发挥了极大的效用。

协议的结构是一个有很长的前缀框架,这使得协议更直接,易于编码和解码。

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size      frame ID     data

因为框架的组成部分的确切类型和大小是提前知道的,所以我们可以规避了使用方便的编码二进制包的Read()和Write()封装(及它们外部接口的查找和会话)反之我们使用直接调用 binary.BigEndian 方法。

为了消除socket 输入输出的系统调用,客户端net.Conn被封装了 bufio.Reader 和 bufio.Writer 。这个Reader通过暴露 ReadSlice() ,复用了它自己的缓冲区。这样几乎消除了读完socket时的分配,这极大的降低了垃圾回收的压力。这可能是因为与数据相关的大多数命令并没有逃逸(在边缘情况下这是假的,数据被强制复制)。

在更低层,MessageID 被定义为 [16]byte,这样可以将其作为 map 的 key(slice 无法用作 map 的 key)。然而,考虑到从 socket 读取的数据被保存为 []byte,胜于通过分配字符串类型的 key 来产生垃圾,并且为了避免从 slice 到 MessageID 的支撑数组产生复制操作,unsafe 包被用来将 slice 直接转换为 MessageID:

id := *(*nsq.MessageID)(unsafe.Pointer(&msgID))

注意: 这是个技巧。如果编译器对此已经做了优化,或者 Issue 3512 被打开可能会解决这个问题,那就不需要它了。 issue 5376 也值得通读,它讲述了在无须分配和拷贝时,和 string 类型可被接收的地方,可以交换使用的“类常量”的 byte 类型。

类似的,Go 标准库仅仅在 string 上提供了数值转换方法。为了避免 string 的分配,nsqd 使用了 惯用的十进制转换方法 ,用于对 []byte 直接操作。

这些看起来像是微优化,但 TCP 协议包含了一些最热的代码执行路径。总体来说,以每秒数万消息的速度来说,它们对分配和系统开销的数量有着显著的影响:

benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Data           3575         1963  -45.09%

benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Sub256        57964        14568  -74.87%
BenchmarkProtocolV2Sub512        58212        16193  -72.18%
BenchmarkProtocolV2Sub1k         58549        19490  -66.71%
BenchmarkProtocolV2Sub2k         63430        27840  -56.11%

benchmark                   old allocs   new allocs    delta
BenchmarkProtocolV2Sub256           56           39  -30.36%
BenchmarkProtocolV2Sub512           56           39  -30.36%
BenchmarkProtocolV2Sub1k            56           39  -30.36%
BenchmarkProtocolV2Sub2k            58           42  -27.59%

HTTP

NSQ的HTTP API是基于 Go's net/http 包实现的. 就是 常见的HTTP应用,在大多数高级编程语言中都能直接使用而无需额外的三方包。

简洁就是它最有力的武器,Go的 HTTP tool-chest最强大的就是其调试功能. net/http/pprof 包直接集成了 HTTP server,可以方便的访问 CPU, heap, goroutine, and OS 进程文档 .gotool就能直接实现上述操作:

$ go tool pprof http://127.0.0.1:4151/debug/pprof/profile

这对于调试和 实时 监控进程非常有用!

此外,/stats端端返回JSON或是美观的文本格式信息,这让管理员使用命令行实时监控非常容易 :

$ watch -n 0.5 'curl -s http://127.0.0.1:4151/stats | grep -v connected'

打印出的结果如下:

此外, Go 1.2 还有很多监控指标 measurable HTTP performance gains . 每次更新Go版本后都能看到性能方面的改进,真是让人振奋!

依赖关系

源于其它生态系统,使用GO(理论匮乏)语言的依赖管理还得花点时间去适应

NSQ 就并不是单一的整个 repo库, 通过 _relative imports_ 而无需区别内部的包资源, 最终产生结构化的依赖管理。

主流的观点有以下两个:

  • Vendoring :拷贝应用需要的正确版本号到本地仓库并修改 import 路径到本地库地址

  • Virtual Env : 列出构建是需要的版本信息,创建包含相关信息的GOPATH环境变量

Note: 这仅仅应用于二级制包,对于可导入的包版本不起作用

NSQ使用 godep 提供 (2) 中的实现.

它的实现原理是复制依赖关系到 Godeps 文件中, 之后生成GOPATH环境变量。构建时,它使用Go环境中的工具链 来完成工作。

它还支持go的get. 例如,构建一个 NSQ版本:

$ godep get github.com/bitly/nsq/...

测试

Go语言提供了内置的测试和基线。由于其简单的并发操作建模,在测试环境里加入nsqd 实例轻而易举。

但是,在测试初始化的时候会有个问题:全局状态。最明显的就是引用运行态 nsqd 实例的全局变量 i.e.var nsqd *NSQd.

于是某些测试就无可避免的使用局部变量去保存该值i.e.nsqd := NewNSQd(...).这也就意味着全局状态并未指向运行态的值,使测试失去了意义。

应对这个问题,Context结构体被引入以保存配置项metadata和实时 nsqd 的父类。所有全局状态的子引用都通过访问该Context来安全的获取相应值(主题,渠道,协议处理等等),这样测试起来也更有保障。

可靠性

一个系统,如果在面对变幻的网络环境和不可预知的事件时不具备可靠性,将不会是一个表现良好的分布式生产环境。

NSQ的设计和实现方式,使它能容忍错误并以一种始终如一的,可预期的和稳定的方式来运行。

它的首要的设计哲学是快速失败,认为错误都是致命的,并提供一种方式来调试遇到的任何问题。

不过,为了能有所行动,你必须要能够检测异常环境...

心跳检测和 超时

NSQ的TCP协议是需要推送的.在经过建立连接,三次握手,客户在 aRDYstate的 订阅数被置为0.当准备接受消息时,通过更新 RDYstate来控制将要接受的消息数目。 NSQ 客户端libraries将在后台持续管理这一环节,最终形成相应的消息流。

周期性的, nsqd 会发送心跳检测连接状态.客户端可以设置这个间隔时间但 nsqd需要在发送下调指令前收到上条请求的回复。

应用层面的心跳检测和RDYstate组合能够避免 head-of-line blocking ,它会是心跳检测失效 (i.e.如果用户等待处理消息前OS的缓存已满,则心跳检测失效).

为了确保进程的正常工作,所有的网络IO都会依据心跳检测的间隔时间来设置边界.这意味着你甚至可以断开客户端和 nsqd 的网络连接,而不必担心问题被发现并恰当的处理。

一旦发现致命错误,客户连接将被强关。发送中的消息超时并从新加入新的客户端接受队列。最后,错误日志会被保存并增加内部评价矩阵内容。

管理Goroutines

启用goroutines很简单,但后续工作却不是那么容易弄好的。避免出现死锁是一个挑战。通常都是因为在排序上出了问题,goroutine可能在接到上游的消息前就收到了go-chan的退出信号。

为啥提到这个?简单,一个未正确处理的goroutine就是内存泄露。更深入的分析,nsqd 进程含有多个激活的goroutines。从内部情况来看,消息的所有权是不停在变得。为了能正确的关掉goroutines,实时统计所有的进程信息是非常重要的。虽没有什么神奇的方法,但下面的几点能让工作简单一点 ...

WaitGroups

sync 包提供了 sync.WaitGroup , 它可以计算出激活态的goroutines数(比提供退出的平均等待时间)

为了使代码简洁 nsqd 使用如下wrapper:

type WaitGroupWrapper struct {
    sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

// can be used as follows:
wg := WaitGroupWrapper{}
wg.Wrap(func() { n.idPump() })
// ...
wg.Wait()

退出信号

在含有多个子goroutines中触发事件最简单的办法就是用一个go-chan,并在完成后关闭。所有当中暂停的动作将被激活,这就无需再向每个goroutine发送相关的信号了

type WaitGroupWrapper struct {
    sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

// can be used as follows:
wg := WaitGroupWrapper{}
wg.Wrap(func() { n.idPump() })
// ...
wg.Wait()

同步退出

想可靠的,无死锁,所有路径都保有信息的实现是很难的。下面是一些提示:

  • 理想情况下,在go-chan发送消息的goroutine也应为关闭消息负责 .

  • 如果消息需要保留,确保相关go-chans被清空(尤其是无缓冲的!),以保证发送者可以继续进程 .

  • 另外,如果消息不再是相关的,在单个go-chan上的进程应该转换到包含推出信号的select上 (如上所述)以保证发送者可以继续进程 .

一般的顺序应该是:

  • 停止接受新的连接(停止监听)

  • 向goroutines发出退出信号(见上文)

  • 等待WaitGroup的goroutine中退出(见上文)

  • 恢复缓冲数据

  • 剩下的部分保存到磁盘

 

http://www.tuicool.com/articles/IR7Nzy

分享到:
评论

相关推荐

    nsq auth 权限认证服务

    nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务

    php-nsq 是nsq的php客户端php-nsq-master.zip

    NSQ是一款由Disque启发的、实时分布式消息平台,设计目标是处理大量实时数据,具有高并发、低延迟的特点。PHP-NSQ为PHP开发者提供了与NSQ进行通信的接口,使PHP应用程序能够发布消息到NSQ话题或者从NSQ订阅并消费...

    Go-go-nsq-NSQ的官方Go包

    本文将深入探讨Go-nsq库的核心概念、功能特性、使用方法以及在实际项目中的应用。 1. **核心概念** - **NSQ**: NSQ是一种轻量级的消息中间件,设计用于处理高并发、低延迟的数据传输。它支持发布/订阅模式和主题/...

    nsq-1.2.0.windows-amd64.go1.12.9.tar.gz

    nsq的特点之一是其强大的消息持久化能力。通过磁盘队列,nsq能够确保即使在服务器故障的情况下,也能保证消息不丢失,从而达到高可用性的要求。同时,它还支持多种消息确认机制,如单次消费确认和批量确认,确保消息...

    nsq1.10.3安装包(包括windows和Linux)

    这个PPT可能涵盖了NSQ的架构设计、核心模块、数据结构、并发控制等方面,帮助你更深入地学习NSQ的内部机制。 5. **使用场景** - 实时日志处理:将应用程序日志实时发送到NSQ,然后由其他服务进行分析和存储。 - ...

    nsq客户端的php扩展

    “PHP开发”标签表明这是关于PHP编程的讨论,而“其它杂项”可能意味着这个扩展并非PHP的核心部分,而是众多可用的第三方库之一。它可能是开源的,由社区维护,或者由特定团队开发,并且可能涵盖了一些非标准或不...

    nsq-1.2.0.linux-amd64.go1.12.9.tar.gz

    《深入理解nsq:从nsq-1.2.0.linux-amd64.go1.12.9.tar.gz谈起》 nsq是一款由淘宝开源的、高性能、分布式的消息队列系统,专为实时数据处理和日志聚合而设计。在我们的讨论中,我们将深入探讨nsq的核心特性和使用...

    前端开源库-nsq-stream

    **nsq-stream** 是一个专为前端开发设计的开源库,它提供了与 **NSQ** 消息队列交互的流接口。NSQ 是一款由 Golang 编写的高性能、分布式的消息队列系统,常被用于实时处理大量数据。nsq-stream 的目标是简化前端...

    docker-compose 容器部署 nsq

    本篇文章将深入探讨如何使用Docker Compose来部署NSQ,一个分布式的实时消息队列系统。 首先,我们来看NSQ。NSQ是用Go语言编写的,设计目标是处理海量实时消息。它提供低延迟、高并发的特性,适合构建实时数据管道...

    改良版的nsq_stat 版本NSQ v1.2.0

    现在的nsq_stat 会经常出现负值,经过代码的修改计算方式基本不会出现负数的情况,这也可以用做计算NSQ 的QPS

    nsq_win+linux_amd64_1.0.0.zip

    【nsq】是一款高度可扩展、高性能的消息队列系统,主要设计用于实时处理大量数据流。这个压缩包“nsq_win+linux_amd64_1.0.0.zip”包含了nsq的Windows和Linux平台的AMD64架构版本,支持Go 1.8编译的二进制文件。 ...

    nsq.1.2.1.i386.tar.gz

    《NSQ:基于Golang构建的消息队列在Linux i386环境下的实践》 NSQ,一个由Golang编写的高效、实时、分布式的消息队列系统,它旨在简化实时数据流处理并构建可靠的大规模分布式系统。由于官方并未提供针对32位Linux...

    nsq-0.3.8.windows-amd64.go1.6.2.tar.gz

    【nsq-0.3.8.windows-amd64.go1.6.2.tar.gz】是一个针对Windows操作系统,AMD64架构的软件包,由nsq项目发布,版本号为0.3.8。该软件包是用Go编程语言1.6.2版本编译构建的。Nsq是一款分布式消息队列系统,它被设计...

    nsq-0.3.7.linux-amd64.go1.6

    NSQ是一款高效、轻量级的实时消息平台,专为大规模分布式系统设计。这个压缩包“nsq-0.3.7.linux-amd64.go1.6”包含了NSQ在0.3.7版本的Linux x86_64架构下的二进制文件,表明它是针对Go 1.6版本编译的。这个版本可能...

    Go-NSQ一个实时分布式消息传递平台

    通过深入研究这些源码,我们可以更好地理解和优化NSQ在特定场景下的性能和功能。 总的来说,Go-NSQ凭借其高效的性能、灵活的设计以及强大的可靠性,成为了构建实时分布式消息传递平台的首选工具。无论是在微服务...

    nsqtoswoole使用swoole扩展实现的一个NSQ强大php客户端

    NSQ是一个分布式消息队列系统,常用于实时数据处理和解耦应用程序。而Swoole则是PHP的异步、并行、高性能的网络通信引擎,能够显著提升PHP在处理I/O密集型任务时的性能。 【描述】:“nsq_to_swoole使用swoole扩展...

    nsq-0.3.2.linux

    《深入理解nsq在Linux环境下的应用》 nsq是一款由Golang编写的实时分布式消息队列系统,其设计目标是具有低延迟、高吞吐量的特点,适用于大规模数据处理和实时业务场景。在本篇中,我们将深入探讨nsq在Linux环境下...

    前端开源库-nsq-lookup-jc

    如果你打算对库进行定制或深入研究,可以下载这个文件,解压后查看源代码。源代码通常包含详细的文档和示例,可以帮助你更好地理解库的工作原理和如何使用它。 总的来说,`nsq-lookup-jc`是一个针对前端开发者的...

Global site tag (gtag.js) - Google Analytics