`
vern
  • 浏览: 17723 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

NSQ源码剖析之NSQD

 
阅读更多

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

NSQ 由 3 个守护进程组成:
nsqd 是接收、保存和传送消息到客户端的守护进程。
nsqlookupd 是管理的拓扑信息,维护着所有nsqd的状态,并提供了最终一致发现服务的守护进程
nsqadmin 是一个 Web UI 来实时监控集群(和执行各种管理任务)

这篇文章介绍的是nsq重要组件nsqd的实现。

Topic与Channel

Topic与Channel是NSQ中重要的两个概念。
生产者将消息写到Topic中,一个Topic下可以有多个Channel,每个Channel都是Topic的完整副本。
消费者从Channel处订阅消息,如果有多个消费者订阅同一个Channel,Channel中的消息将被传递到一个随机的消费者。

图片标题

  1. type NSQD struct{
  2. //一个nsqd实例可以有多个Topic
  3. topicMap map[string]*Topic
  4. }
  5. type Topicstruct{
  6. name string
  7. //一个Topic实例下有多个Channel
  8. channelMap map[string]*Channel
  9. memoryMsgChan chan *Message
  10. }
  11. //golang中goroutine之间的是通过chan来通信的,如果想要往该topic发布消息,只需要将消息写到Topic.memoryMsgChan中
  12. //创建Topic时会开启一个新的goroutine(messagePump)负责监听Topic.memoryMsgChan,当有新消息时会将将消息复制N份发送到该Topic下的所有Channel中
  13. func NewTopic(topicName string)*Topic{
  14. t :=&Topic{
  15. name: topicName,
  16. channelMap: make(map[string]*Channel),//该Topic下的所有Channel
  17. memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize),
  18. exitChan: make(chan int),
  19. }
  20. //开启一个goroutine负责监听写到该Topic的消息
  21. t.waitGroup.Wrap(func(){ t.messagePump()})
  22. return t
  23. }
  24. func (t *Topic) messagePump(){
  25. var msg *Message
  26. var chans []*Channel
  27. var memoryMsgChan chan *Message
  28. //取出该Topic下所有的Channel
  29. for _, c := range t.channelMap {
  30. chans = append(chans, c)
  31. }
  32. for{
  33. //从memoryMsgChan中取出一个消息,并将消息复制N份,发送到N个Channel中
  34. select{
  35. case msg =<-memoryMsgChan:
  36. case<-t.exitChan:
  37. return
  38. }
  39. for i, channel := range chans {
  40. chanMsg :=NewMessage(msg.ID, msg.Body)
  41. chanMsg.Timestamp= msg.Timestamp
  42. err := channel.PutMessage(chanMsg)
  43. }
  44. }
  45. }
  46. //Channel.memoryMsgChan负责接收写到该Channel的所有消息
  47. //创建创建Channel时会开启一个新的goroutine(messagePump)负责监听Channel.memoryMsgChan,当有消息时会将该消息写到Channel.clientMsgChan中,订阅该channel的consumer都会试图从clientMsgChan中取消息,一条消息只能被一个consumer抢到
  48. //Channel还负责消息的可靠传递,当消息发往consumer时,Channel会记录下该消息的发送时间,如果在一定时间内(msg-timeout参数)没有接受到consumer对该消息的确认,Channel会将该消息重新写到Channel.memoryMsgChan中,再次发送给客户端。
  49. type Channelstruct{
  50. name string//channel的名称
  51. memoryMsgChan chan *Message
  52. clientMsgChan chan *Message
  53. clients map[int64]Consumer
  54. }
  55. func NewChannel(topicName string, channelName string
  56. c :=&Channel{
  57. topicName: topicName,
  58. name: channelName,
  59. memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize),
  60. clientMsgChan: make(chan *Message),
  61. exitChan: make(chan int),
  62. }
  63. go c.messagePump()
  64. return c
  65. }
  66. //往channel中写入消息。
  67. func (c *Channel) put(m *Message) error {
  68. select{
  69. case c.memoryMsgChan <- m:
  70. }
  71. returnnil
  72. }
  73. func (c *Channel) messagePump(){
  74. var msg *Message
  75. for{
  76. select{
  77. case msg =<-c.memoryMsgChan:
  78. case<-c.exitChan:
  79. gotoexit
  80. }
  81. c.clientMsgChan <- msg
  82. }
  83. exit:
  84. close(c.clientMsgChan)
  85. }

要理解Topic Channel中各种chan的作用,关键是要理解golang中如何在并发环境下如何操作一个结构体(多个goroutine同时操作topic),与 C/C++多线程操作同一个结构体时加锁(mutex,rwmutex)不同,go语言中一般是为这个结构体(topic,channel)开启一个主 goroutine(messagePump函数),所有对该结构体的改变的操作都应是该主goroutine完成的,也就不存在并发的问题了,其它 goroutine如果想要改变这个结构体则应该向结构体提供的chan中发送消息(msgchan)或者通知 (exitchan,updatechan),主goroutine会一直监听所有的chan,当有消息或者通知到来时做相应的处理。

数据的持久化

了解数据的持久化之前,我们先来看两个问题?
1. 往Topic中写入消息就是将消息发送到Topic.memoryMsgChan中,但是memoryMsgChan是一个固定内存大小的内存队列,如果队列满了怎么办呢?会阻塞吗?
2. 如果消息都存放在memoryMsgChan这个内存队列中,程序退出了消息就全部丢失了吗?

NSQ是如何解决的呢,nsq在创建Topic、Channel的时候都会创建一个DiskQueue,DiskQueue负责向磁盘文件中写入消息、从磁盘文件中读取消息,是NSQ实现数据持久化的最重要结构。
以Topic为例,如果向Topic.memoryMsgChan写入消息但是memoryMsgChan已满时,nsq会将消息写到topic.DiskQueue中,DiskQueue会负责将消息内存同步到磁盘上。
如果从Topic.memoryMsgChan中读取消息时,但是memoryMsgChan并没有消息时,就从topic.DiskQueue中取出同步到磁盘文件中的消息。

  1. func NewTopic(topicName stringctx *context)*Topic{
  2. ...//其它初始化代码
  3. // ctx.nsqd.opts都是一些程序启动时的命令行参数
  4. t.backend = newDiskQueue(topicName,
  5. ctx.nsqd.opts.DataPath,
  6. ctx.nsqd.opts.MaxBytesPerFile,
  7. ctx.nsqd.opts.SyncEvery,
  8. ctx.nsqd.opts.SyncTimeout,
  9. ctx.nsqd.opts.Logger)
  10. return t
  11. }
  12. //将消息写到topic的channel中,如果topic的memoryMsgChan已满则将topic写到磁盘文件中
  13. func (t *Topic) put(m *Message) error {
  14. select{
  15. case t.memoryMsgChan <- m:
  16. default:
  17. //从buffer池中取出一个buffer接口,将消息写到buffer中,再将buffer写到topic.backend的wirteChan中
  18. //buffer池是为了避免重复的创建销毁buffer对象
  19. b := bufferPoolGet()
  20. t.backend.WriteChan<- b
  21. bufferPoolPut(b)
  22. }
  23. returnnil
  24. }
  25. func (t *Topic) messagePump(){
  26. ...//参见上文代码
  27. for{
  28. //从memoryMsgChan及DiskQueue.ReadChan中取消息
  29. select{
  30. case msg =<-memoryMsgChan:
  31. case buf =<- t.backend.ReadChan():
  32. msg, _ = decodeMessage(buf)
  33. case<-t.exitChan:
  34. return
  35. }
  36. ...//将msg复制N份,发送到topic下的N个Channel中
  37. }
  38. }

我们看到topic.backend(diskQueue)负责将消息写到磁盘并从磁盘中读取消息,diskQueue提供了两个chan供外部使用:readChan与writeChan。
我们来看下diskQueue实现中的几个要点。

  1. diskQueue在创建时会开启一个goroutine,从磁盘文件中读取消息写到readChan中,外部goroutine可以从readChan中获取消息;随时监听writeChan,当有消息时从wirtechan中取出消息,写到本地磁盘文件。
  2. diskQueue既要提供文件的读服务又要提供文件的写服务,所以要记录下文件的读位置(readIndex),写位置 (writeIndex)。每次从文件中读取消息时使用file.Seek(readindex)定位到文件读位置然后读取消息信息,每次往文件中写入消 息时都要file.Seek(writeIndex)定位到写位置再将消息写入。
  3. readIndex,writeIndex很重要,程序退出时要将这些信息(meta data)写到另外的磁盘文件(元信息文件)中,程序启动时首先读取元信息文件,在根据元信息文件中的readIndex writeIndex操作存储信息的文件。
  4. 由于操作系统层也有缓存,调用file.Write()写入的信息,也可能只是存在缓存中并没有同步到磁盘,需要显示调用file.sync() 才可以强制要求操作系统把缓存同步到磁盘。可以通过指定创建diskQueue时传入的syncEvery,syncTimeout来控制调用 file.sync()的频率。syncTimeout是指每隔syncTimeout秒调用一次file.sync(),syncEvery是指每当写 入syncEvery个消息后调用一次file.sync()。这两个参数都可以在启动nsqd程序时通过命令行指定。

网络架构

nsq是一个可靠的、高性能的服务端网络程序,通过阅读nsqd的源码来学习如何搭建一个可靠的网络服务端程序。

  1. //首先是监听端口,当有请求到来时开启一个goroutine去处理该链接请求
  2. func TCPServer(listener net.Listener){
  3. for{
  4. clientConn, err := listener.Accept()
  5. go Handle(clientConn)
  6. }
  7. }
  8. func Handle(clientConn net.Conn){
  9. //客户端首先需要发送一个四字节的协议编号,表示客户端当前所使用的协议
  10. //这样便于以后平滑的协议升级,服务端可以根据客户端的协议编号做不同的处理
  11. buf := make([]byte,4)
  12. _, err := io.ReadFull(clientConn, buf)
  13. protocolMagic :=string(buf)
  14. var prot util.Protocol
  15. switch protocolMagic {
  16. case" V2":
  17. prot =&protocolV2{ctx: p.ctx}
  18. default:
  19. return
  20. }
  21. //成功建立连接,按照相应的协议编号去处理该链接
  22. err = prot.IOLoop(clientConn)
  23. return
  24. }
  25. }

客户端已成功的与服务器建立链接了,每一个客户端建立连接后,nsqd都会创建一个Client接口体,该结构体内保存一些client的状态信息。
每一个Client都会有两个goroutine,一个goroutine负责读取客户端主动发送的各种命令,解析命令,处理命令并将处理结果回复给客户端。
另一个goutine负责定时发送心跳信息给客户端,如果客户端订阅某个channel的话则将channel中的将消息通过网络发送给客户端。

如果服务端不需要主动推送大量消息给客户端,一个连接只需要开一个goroutine处理请求并发送回复就可以了,这是最简单的方式。开启两个goroutine操作同一个conn的话就需要注意加锁了。

  1. func (p *protocolV2)IOLoop(conn net.Conn) error {
  2. //创建一个新的Client对象
  3. clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence,1)
  4. client := newClientV2(clientID, conn, p.ctx)
  5. //开启另一个goroutine,定时发送心跳信息,客户端收到心跳信息后要回复。
  6. //如果nsqd长时间未收到该连接的心跳回复说明连接已出问题,会断开连接,这就是nsq的心跳实现机制
  7. go p.messagePump(client)
  8. for{
  9. //如果超过client.HeartbeatInterval * 2时间间隔内未收到客户端发送的命令,说明连接处问题了,需要关闭此链接。
  10. //正常情况下每隔HeartbeatInterval时间客户端都会发送一个心跳回复。
  11. client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval*2))
  12. //nsq规定所有的命令以 “\n”结尾,命令与参数之间以空格分隔
  13. line, err = client.Reader.ReadSlice('\n')
  14. //params[0]为命令的类型,params[1:]为命令参数
  15. params:= bytes.Split(line, separatorBytes)
  16. //处理客户端发送过来的命令
  17. response, err := p.Exec(client,params)
  18. if err !=nil{
  19. sendErr := p.Send(client, frameTypeError,[]byte(err.Error()))
  20. if _, ok := err.(*util.FatalClientErr); ok {
  21. break
  22. }
  23. continue
  24. }
  25. //将命令的处理结果发送给客户端
  26. if response !=nil{
  27. err = p.Send(client, frameTypeResponse, response)
  28. }
  29. }
  30. //连接出问题了,需要关闭连接
  31. conn.Close()
  32. close(client.ExitChan)//关闭client的ExitChan
  33. //client.Channel记录的是该客户端订阅的Channel,客户端关闭的时候需要从Channel中移除这个订阅者。
  34. if client.Channel!=nil{
  35. client.Channel.RemoveClient(client.ID)
  36. }
  37. return err
  38. }
  39. func (p *protocolV2)Exec(client *clientV2,params[][]byte)([]byte, error){
  40. switch{
  41. case bytes.Equal(params[0],[]byte("FIN")):
  42. return p.FIN(client,params)
  43. case bytes.Equal(params[0],[]byte("RDY")):
  44. return p.RDY(client,params)
  45. case bytes.Equal(params[0],[]byte("PUB")):
  46. return p.PUB(client,params)
  47. case bytes.Equal(params[0],[]byte("NOP")):
  48. return p.NOP(client,params)
  49. case bytes.Equal(params[0],[]byte("SUB")):
  50. return p.SUB(client,params)
  51. }
  52. returnnil, util.NewFatalClientErr(nil,"E_INVALID", fmt.Sprintf("invalid command %s",params[0]))
  53. }

我们来看下NSQ中几个比较重要的命令:

  • NOP 心跳回复,没有实际意义
  • PUB 发布一个消息到 话题(topic)
    PUB \n
    [ 四字节消息的大小 ][ N字节消息的内容 ]
  • SUB 订阅话题(topic) /通道(channel)
    SUB \n
  • RDY 更新 RDY 状态 (表示客户端已经准备好接收N 消息)
    RDY \n
  • FIN 完成一个消息 (表示成功处理)
    FIN \n

生产者产生消息的过程比较简单,就是一个PUB命令,PUB命令先读取四字节的消息大小,然后根据消息大小读取消息内容,然后将内容写到topic.MessageChan中。
我们重点来看下消费者是如何从nsq中读取消息的。
1. 消费者首先需要发送SUB命令,告诉nsqd它想订阅哪个Channel,然后nsqd将该Client与Channel建立对应关系。
2. 消费者发送RDY命令,告诉服务端它以准备好接受count个消息,服务端则向消费者发送count个消息,如果消费者想继续接受消息就需要不断发送RDY命令告诉服务端自己准备好接受消息。

  1. func (p *protocolV2) SUB(client *clientV2,params[][]byte)([]byte, error){
  2. topicName :=string(params[1])
  3. channelName :=string(params[2])
  4. topic := p.ctx.nsqd.GetTopic(topicName)
  5. channel := topic.GetChannel(channelName)
  6. //将Client与Channel建立关联关系
  7. channel.AddClient(client.ID, client)
  8. client.Channel= channel
  9. // update message pump
  10. client.SubEventChan<- channel
  11. return okBytes,nil
  12. }
  13. func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool){
  14. subEventChan := client.SubEventChan
  15. heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
  16. for{
  17. //IsReadyForMessages就是检查Client的RDY命令所设置的ReadyCount,判断是否可以继续向Client发送消息
  18. if subChannel ==nil||!client.IsReadyForMessages(){
  19. //客户端还未做好准备则将clientMsgChan设置为nil
  20. clientMsgChan =nil
  21. }else{
  22. //客户端做好准备,则试图从订阅的Channel的clientMsgChan中读取消息
  23. clientMsgChan = subChannel.clientMsgChan
  24. }
  25. select{
  26. //接收到客户端发送的RDY命令后,则会向ReadyStateChan中写入消息,下面的case条件则可满足,重新进入for循环
  27. case<-client.ReadyStateChan:
  28. //接收到客户端发送的SUB命令后,会向subEventChan中写入消息,subEventChan则被置为nil,所以一个客户端只能订阅一次Channel
  29. case subChannel =<-subEventChan:
  30. // you can't SUB anymore
  31. subEventChan =nil
  32. //发送心跳消息
  33. case<-heartbeatChan:
  34. err = p.Send(client, frameTypeResponse, heartbeatBytes)
  35. //会有N个消费者共同监听channel.clientMsgChan,一条消息只能被一个消费者抢到
  36. case msg, ok :=<-clientMsgChan:
  37. if!ok {
  38. gotoexit
  39. }
  40. //以消息的发送时间排序,将消息放在一个最小时间堆上,如果在规定时间内收到对该消息的确认回复(FIN messageId)说明消息以被消费者成功处理,会将该消息从堆中删除。
  41. //如果超过一定时间没有接受 FIN messageId,会从堆中取出该消息重新发送,所以nsq能确保一个消息至少被一个i消费处理。
  42. subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
  43. client.SendingMessage()
  44. //通过网络发送给消费者
  45. err = p.SendMessage(client, msg,&buf
  46. case<-client.ExitChan:
  47. gotoexit
  48. }
  49. }
  50. exit:
  51. heartbeatTicker.Stop()
  52. }

参考文献

NSQ 指南
使用消息队列的 10 个理由
关于go同步和异步模式的疑惑

 

原文:http://shanks.leanote.com/post/NSQ%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90%E4%B9%8BNSQD?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

分享到:
评论

相关推荐

    nsq源码分析-简单易懂,首创

    nsq源码分析 涉及nsqd、nsqlookup、nsqadmin 通过此ppt可以掌握nsq的基本原理,熟悉nsq源码架构,对于进一步读取源码很有帮助

    docker-compose 容器部署 nsq

    NSQ由两个主要组件构成:nsqd(消息代理)和nsqlookupd(发现服务)。在Docker Compose环境中,这两个组件将分别被配置为单独的服务。 在部署NSQ时,我们需要创建一个`docker-compose.yml`文件,其中包含以下关键...

    nsq-1.1.0.windows-amd64.go1.10.3.tar.gz

    【nsq-1.1.0.windows-amd64.go1.10.3.tar.gz】这个文件是针对Windows 64位操作系统编译的nsq软件包,版本号为1.1.0,它使用Go语言1.10.3版本进行构建。nsq是一个高性能、分布式的消息队列系统,常用于构建实时的、大...

    go-start-nsq:帮助程序启动 nsqd nsqlookupd nsqadmin 节点以进行本地开发

    开始-nsq 启动 nsqd / nsqlookupd / nsqadmin 节点进行本地开发的小助手程序。 安装 $ go get github.com/segmentio/go-start-nsq 用法 一个 NSQD: $ go-start-nsq 三个 NSQD 节点: $ go-start-nsq -n 3 ...

    php-nsq 是nsq的php客户端php-nsq-master.zip

    安装完成后,你可以根据项目需求配置客户端,例如设置NSQD(NSQ守护进程)的地址和端口: ```php use NSQ\Consumer; use NSQ\Producer; $config = [ 'lookupd_http_addresses' =&gt; ['http://localhost:4161'], // ...

    Go-go-nsq-NSQ的官方Go包

    - **连接管理**: Go-nsq支持建立到NSQD(NSQ守护进程)和NSQLookupd(NSQ查找节点)的TCP连接,自动重连和心跳机制保证了连接的稳定性。 - **消息消费**: 提供了简单的API接口用于订阅主题和读取消息,同时支持...

    nsq_win+linux_amd64_1.0.0.zip

    解压此文件后,用户可以找到运行nsq所需的各种可执行文件,如`nsqd`(nsq守护进程)、`nsqlookupd`(nsqlookup服务)和`nsqadmin`(管理界面)等。 【nsq-1.0.0-compat.windows-amd64.go1.8.tar】则是对应于Windows...

    nsq-0.3.8.windows-amd64.go1.6.2.tar.gz

    6. **nsq_to_nsq**: 用于在不同的nsqd实例之间转发消息,可以实现消息的跨节点传输。 7. **nsq_pubsub**: 一个简单的命令行工具,用于发布消息到nsq主题,或者订阅并消费消息。 8. **nsq_stat**: 提供实时的nsqd和...

    nsq.js:适用于Node.js的NSQ客户端

    nsq.js JavaScript NSQ客户端WIP...nsq:reader connect nsqd 0.0.0.0:4150 events/ingestion [5] +0ms nsq:connection connect: 0.0.0.0:4150 V2 +0ms nsq:connection command: IDENTIFY null +2ms nsq:connection co

    nsq linux 安装文档

    这通常涉及下载源代码、编译并安装 NSQ 的各个组件,包括 `nsqlookupd`、`nsqd` 和 `nsqadmin`。 2. **启动 nsqlookupd**: `nsqlookupd` 是 NSQ 的服务发现组件,负责提供节点间的信息查询。在终端中输入以下命令...

    nsq-1.2.0.linux-amd64.go1.12.9.tar.gz

    1. nsqd:这是nsq的核心组件,负责接收、存储和分发消息。它有两个主要接口,`nsqd TCP/HTTP`端点用于发布消息,`nsqd TCP`端点用于消费消息。通过topic和channel的概念,nsqd实现了消息的发布与订阅模式。 2. ...

    nsq-master源码

    The following steps will run a small NSQ cluster on your local machine and walk through publishing, consuming, and archiving messages to disk. follow the instructions in the INSTALLING doc. in one ...

    nsq-1.2.0.windows-amd64.go1.12.9.tar.gz

    nsq的特点之一是其强大的消息持久化能力。通过磁盘队列,nsq能够确保即使在服务器故障的情况下,也能保证消息不丢失,从而达到高可用性的要求。同时,它还支持多种消息确认机制,如单次消费确认和批量确认,确保消息...

    nsq1.10.3安装包(包括windows和Linux)

    - **Windows**: 下载对应版本的nsq二进制文件,解压后将`nsqd.exe`和`nsqlookupd.exe`放入PATH环境变量下的目录,然后通过命令行启动这两个服务。 - **Linux**: 使用`wget`或`curl`下载二进制文件,解压后通过执行...

    nsq-strategies:在 Node.js 中使用 NSQ 的典型策略

    nsq-策略 介绍 在 Node.js 中使用典型策略。 它是具有不同策略的官方客户端库( )的包装器。 动机 很方便,但它要求你发送带有已知 nsqd 地址的消息,这既不切实际,也不符合避免 SPOF 的原则。 最佳实践总是使用...

    nsq.1.2.1.i386.tar.gz

    NSQ由两个主要组件构成:nsqd和nsqlookupd。nsqd是消息队列服务器,负责接收、存储和分发消息;nsqlookupd则是一个服务发现组件,用于协调和管理nsqd节点,提供主题(topic)和频道(channel)的信息查询。此外,...

    nsq auth 权限认证服务

    nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务nsq auth 权限认证服务

    nsq-0.3.7.linux-amd64.go1.6

    1. **NSQ架构**:NSQ由两部分组成:nsqd(消息代理)和nsqlookupd(服务发现)。nsqd负责接收、存储和分发消息,而nsqlookupd则用于节点发现和服务协调,帮助客户端找到正确的nsqd节点来发布或订阅消息。 2. **消息...

    前端开源库-nsq-lookup-jc

    【nsq-lookup-jc】是一个专为前端开发者设计的开源库,主要用于与nsqlookupd服务进行交互,帮助开发者方便地查找并连接到nsqd节点。nsq是一款由Golang编写的高可用、高性能的消息队列系统,常用于实时处理大规模数据...

Global site tag (gtag.js) - Google Analytics