`
liumengfan
  • 浏览: 33139 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Distributed Semaphores with RabbitMQ

阅读更多
翻译自(http://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/)

在这篇博客里面,我们将定位“在一个分布式系统里面,如何实现对特殊资源的访问控制”的问题,解决该问题的方案在计算机界广为人知,那就是被成为信号量的东东。“信号量”是在1965年Dijkstra的“Cooperating Sequential Processes”论文里出现的,下面我们将要讨论如何使用AMQP的构建快(consumers,producers,queues)来实现信号量。

需要信号量的地方
在我们讨论解决方案之前,让我们先看一下什么时候需要用到它
假如说:我们的应用有很多进程从队列里面取数据,然后将数据插入到数据库,我们可能想限制同时工作的进程的数量

相似的,很多进程在处理那些需要通过网络存储到远程服务器上的图片,我们要防止由于图片的传输导致网络拥塞,所以我们要限制同时传输图片的进程的数量。这种情况下,一旦某个进程可以使用网络连接(一旦进程可以运行),他们可以尽可能快的传输图片到远程服务器。

另一个例子和rabbitMQ有关:你的应用程序可能需要只允许一个生产者(producer)发送消息到exchanger,但是一旦那个进程停止了,你希望另一个producer马上开始发送消息。可能这样做由很多原因。

另一方面,可能有时候想让cosumers去争着访问queue,但是当AMQP支持了专用队列exclusive queues和exclusive consumer之后,空闲的consumer就没办法知道queue什么时候可以被别的consumer访问了,因此,我们可以使用上面的方法使consumers排队访问queue了。

值得注意的是,让我们有一个以上的进程去访问特殊资源。比如:我们由是个producer,但是我们只想让其中的五个同时去发布消息。使用信号量,我们也可以实现。

上面的例子都需要有一个额外的条件:竞争资源的进程或者其它的协调者不应该轮询rabbitMQ。理想情况下,他们应该在等待的时候处于空闲的状态,一旦资源可用了,rabbitMQ马上通知下一个进程,以便它自动的开始工作。

让我们来转向如何实现

实现信号量

我们的信号量将使用queues和messages来实现

我们首先声明一个叫做“resource.semaphore”的queue,resource是我们的信号量将要控制的资源的名字,可能是“images”,“database”,“file_server”,或者是任何符合我们应用程序的名字。

我们发布一个message到resource.semaphore的queue里面,然后我们启动访问这个message的进程。每个进程都要从resource.semaphore里面获取message;第一个到达的进程将得到这个message,同时其它进程将处于空闲状态去等待这个message。技巧是这些进程从来不去确认这个message,但是他们以ack_mode=on的模式来从resource.semaphore的queue里面获取message。所以rabbitMQ将持续跟踪这个message,如果这些进程crash或者exists的时候,这个message将重新回到queue里面,然后被传送给下一个要从这个queue获取message的进程里。

使用这样一个简单的技术,我们可以实现在同一时刻只有一个进程能访问特定的资源,并且我们还能保证在这个进程crashes或者exists的时候,不会带走该message。当然,我们假设所有的进程都是正常的访问该资源。例如:他们从来不确认该message。如果他们那样做了,rabbitMQ将删除该message,最终导致该组里的其它进程“饿死”。

当一个进程想放弃该资源的时候,我们该怎么办,它能返回这个“message”吗?当然这个进程可以突然关闭这个channel,然后rabbitMQ将自动接管这个message。但是也有一个礼貌的方式来实现。该进程可以使用basic.reject来告诉rabbitMQ重新给该message排队,使它回到semaphore的queue里面。

让我们来看看代码是怎么实现的,我们假设已经得到了一个connection和channel,下面是安装我们信号量的代码:

channel.queueDeclare("resource.semaphore", true, false, false, null);
String message = "resource";
channel.basicPublish("", "resource.semaphore", null, message.getBytes());


我们创建一个durable(持久的)queue:“resource.semaphore”,然后我们用默认的exchange发送了一条message。

下面是使用该信号量的代码:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume("resource.semaphore", false, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();

  // here we access the resource controlled by the semaphore.  

  if(shouldStopProcessing()) {
    channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
  }
}


这儿,我们创建了一个QueueingConsumer来等待来自“resource.semaphore”queue的message。我们通过在basic.qos里设置prefetch-count等于1来确保我们的进程只拿取一条message。一旦有一条message到达该队列,这个进程将开始使用该resource。当shouldStopProcessing()的条件满足的时候,这个进程将basicReject这个message,告诉rabbitMQ去重新给这个message排队。一定要记住,这个consumer是使用ack-mode启动的,并且将从不确认从semaphore的queue里收到的message。如果它确认了,将被认为是buggy。

优先访问信号量
有没有可能实现优先访问信号量。完全可以,从3.2.0以来,rabbitMQ支持Consumer Priorities。通过使用Consumer Priorities,我们可以告诉rabbitMQ哪个进程可以优先访问来自semaphore queue的message。

二进制信号量和计数信号量
目前为止,我们实现的是被成为二进制信号量的机制,即只允许在同一时刻只有一个进程来访问resource。如果我们要允许多个进程同时访问该resource,同时我们也有一个个数的限制,这样我们可以实现计数信号量来实现。为了实现它,我们建立一个信号量,现在不是只发布一个message,而是发布多个message(message的个数是允许同时访问resource的进程数),我们需要确保在我们使用之前,prefetch-count的值是1.

alerting the count
我们的进程可以随着时间来增加额外的message来增长进程的数量。如果我们想减少同时访问该resource的进程呢?我们将开启一个新的带有高优先级的consumer,这样他就可以消耗掉多余的message,然后确认他们,这样rabbitMQ将会把它们从队列里移除掉。

阅读资源
就像你看到的一样,使用AMQP来实现信号量很简单,使用rabbitMQ我们还可已提高访问resource的优先级。

最后,我们和大家分享一些关于信号量的文章。首先是Dijkstra的论文“ Cooperating Sequential Processes(http://www.cs.utexas.edu/users/EWD/transcriptions/EWD01xx/EWD123.html)”。最后是阐述里一些信号量定义的wiki文章(http://en.wikipedia.org/wiki/Semaphore_(programming))
分享到:
评论

相关推荐

    Distributed Computing with Python azw3

    Distributed Computing with Python 英文azw3 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    Distributed Computing with Python [2016]

    Distributed Computing with Python by Francesco Pierfederici AZW3/MOBI/EPUB/PDF 多版本 This book will teach you how to perform parallel execution of computations by distributing them across multiple ...

    SSD8 Manning - Distributed Programming with JAVA

    《SSD8 Manning - Distributed Programming with JAVA》是关于使用Java进行分布式编程的参考资料,主要针对想要深入理解并实践Java在分布式系统中的应用的学习者。分布式编程是现代软件开发中的一个重要领域,它允许...

    Distributed Computing with Python

    总的来说,"Distributed Computing with Python"涵盖了Python在分布式计算领域的核心知识点,包括各种库的使用方法和实战技巧。通过深入学习,你将能够构建和优化自己的分布式系统,应对大数据和高性能计算的挑战。

    RabbitMQ-in-Action-Distributed-Messaging-for-Everyone.pdf

    had existed when we started with RabbitMQ two years earlier. Neither of us came from a traditional messaging background, which made us fast friends and has largely informed the tone of RabbitMQ in ...

    Distributed.Computing.with.Python.178588969

    Distributed application with Celery Python in the Cloud Python on an HPC cluster Test and debug distributed applications About the Author Francesco Pierfederici is a software engineer who loves Python...

    Demo - EhCache Distributed Caching With Terracotta in GlassFish v3

    Demo of ehCache distributed caching with terracotta in glassFish v3 可以参考:http://blog.csdn.net/guobin0719/archive/2011/04/25/6361940.aspx

    Distributed Applications with Microsoft Visual Basic 6.0 MS

    Distributed Applications with Microsoft Visual Basic 6.0 MS

    Programming Distributed Applications with COM and Visual Basic 6.0

    Any developer who wants to create LAN-based, WAN-based, and Web-based applications using Microsoft Windows NT Server as a foundation must use many separate pieces of software. Some of these pieces ...

    Manning - Distributed Programming with JAVA.

    Manning - Distributed Programming with JAVA.

    Distributed Programming with Java Technology

    标题“Distributed Programming with Java Technology”直截了当地指出了本书的主题:通过Java技术进行分布式编程。这表明本书旨在教授如何利用Java语言来构建分布式系统。 #### 描述解析 描述部分重复了标题,虽然...

    Distributed.Computing.with.Go

    Chapter 5, Introducing Goophr, opens the discussion on what is meant by a distributed search engine, using OpenAPI specification to describe REST APIs and describing the responsibilities of the ...

    Distributed Computing with Python mobi

    Distributed Computing with Python 英文mobi 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    Distributed Computing with Python epub

    Distributed Computing with Python 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    Distributed Programming with Ice

    • Part V, Ice Services, covers the services provided with Ice, such as IceGrid (a sophisticated deployment tool), Glacier2 (the Ice firewall solution), IceStorm (the Ice messaging service), and ...

Global site tag (gtag.js) - Google Analytics