A Simple Distributed Queue in Ruby
One of the cooler yet little used gems in the Ruby standard library is DRb - Distributed Ruby. DRb makes it really easy to share objects between different processes. Today, Darcy Laycock will show, how to build a simple (and relatively naive) shared queue with DRb.
A Single Process Example
First off, we’re going to reuse as much code as possible. Ruby provides a built-in thread-safe queue class which we can leverage to build it. So, for our first step, we’re going to start by requiring thread (to access) it and adding a few objects to it / manipulating it to show the results.
require 'thread'
queue = Queue.new
[:a, :b, :c, :d].each do |item|
queue.push item
puts "Pushed #{item.inspect} onto the queue - It now has #{queue.size} items"
end
until queue.empty?
item = queue.pop
puts "Popped #{item.inspect} off the queue, size now is #{queue.size}"
end
Copying to a file, queue_test.rb, and running it, you should now see the following output:
引用
Pushed :a onto the queue - It now has 1 items
Pushed :b onto the queue - It now has 2 items
Pushed :c onto the queue - It now has 3 items
Pushed :d onto the queue - It now has 4 items
Popped :a off the queue, size now is 3
Popped :b off the queue, size now is 2
Popped :c off the queue, size now is 1
Popped :d off the queue, size now is 0
Making it Distributed
The next step is to use DRb to make a simple distributed queue. The first thing we need to do, is to set DRb to make it available. For the moment, we’re going to run it on port 3491. Inside a new file, queue_server.rb, write the following:
require 'thread'
require 'drb'
queue = Queue.new
DRb.start_service("druby://:3491", queue)
DRb.thread.join
The DRb.start_service takes a uri which tells it what address and port to bind to (if the port is nil / absent, it will randomly choose a port) and then DRb will make the object available to other processes at said location. We need to call DRb.thread.join so that the process exits when the DRb thread does - otherwise, as soon as lines above had run, the program would exit.
Now, we’ll create two more files - queue_consumer.rb and queue_provider.rb First, opening up queue_consumer.rb we’ll write the following:
require 'thread'
require 'drb'
DRb.start_service
queue = DRbObject.new(nil, "druby://localhost:3491")
loop do
queue.push "Hello from #{Process.pid} at #{Time.now}"
sleep 2
end
Which will connect to the remote queue (that we wrote above) and every 5 seconds will push a string onto it. Now, inside queue_consumer.rb we’re simply going to do the following:
require 'thread'
require 'drb'
DRb.start_service
queue = DRbObject.new(nil, "druby://localhost:3491")
loop do
unless queue.empty?
puts "#{Process.pid} got #{queue.pop.inspect} from the remote queue"
end
sleep 1
end
All this does is poll the queue every second, checking if it has items. If it does, it will print out the processes pid and the message it got. Starting up queue_server.rb, a couple of queue_provider.rb's (I did 2) and a queue_consumer.rb in seperate terminals you should get output something like:
引用
22703 got "Hello from 22690 at Sun Dec 07 22:01:27 +0900 2008" from the remote queue
22703 got "Hello from 22691 at Sun Dec 07 22:01:28 +0900 2008" from the remote queue
22703 got "Hello from 22690 at Sun Dec 07 22:01:29 +0900 2008" from the remote queue
22703 got "Hello from 22691 at Sun Dec 07 22:01:30 +0900 2008" from the remote queue
22703 got "Hello from 22690 at Sun Dec 07 22:01:31 +0900 2008" from the remote queue
Of course, your pid’s are going to be different.
Conclusion
In just a few minutes of work you can see we’ve written a basic distributed ruby queue in very few lines of code (5 lines of code + 4 requires) and whilst it may not be the best performing implementation out their, it’s incredibly simple and it hopefully introduced the power of DRb to you.
Lastly, there is a wealth of other stuff you can do with DRb - Ruby’s Rinda classes provide automatic discovery of services and tuplespaces whilst DRb provides simple access control and a bunch of other nifty stuff. If you want to learn more, I suggest checking out Mark Bates’ rubyconf talk which shows off a bunch of cool stuff that can be done and Eric Hodel’s fantastic set of articles.
分享到:
相关推荐
Hadoop是MapReduce的一个流行实现,它提供了一个强大的分布式计算环境。 **3.1 streaming介绍** Hadoop Streaming是一种机制,允许使用任何可执行脚本语言(如Python、Perl或Ruby)编写MapReduce程序。这为那些不...
RabbitMQ作为一款流行的开源消息队列系统,是许多分布式架构中的关键组件。在这个"分布式中间件.zip"压缩包中,我们可以预见到关于RabbitMQ以及分布式技术的相关资料。 RabbitMQ是一种基于AMQP(Advanced Message ...
Go 中使用 Redis 实现分布式互斥锁Redsync Redsync 为 Go 提供了一个基于 Redis 的分布式互斥锁实现,详见本文。Ruby的参考库(由antirez开发)可在github.com/antirez/redlock-rb上找到。安装使用 go get 命令安装 ...
Ruby-OurPC是一个针对gRPC框架的实验性实现,它为开发者提供了在Ruby环境中构建gRPC客户端和服务器的能力。gRPC是一个高性能、开源和通用的RPC(远程过程调用)框架,它基于HTTP/2协议设计,支持多种编程语言,包括...
这个gem提供了一个Raft::Node类,该类处理跨对等节点群集的日志复制。 有关RPC协议,并发机制,错误处理和数据持久性的设计决策留给客户端。 为了方便和测试,提供了基于Goliath和EventMachine的示例实现,并具有...
以下是一个简单的示例,展示了如何使用Ruby的Redis客户端连接到服务器并执行基本操作: ```ruby require 'redis' redis = Redis.new # 设置键值对 redis.set('key', 'value') # 获取键的值 value = redis.get('...
安装完成后,可以创建一个简单的服务端示例: ```ruby require 'hprose' service = Hprose::Server.new service.add('add', ->(x, y) { x + y }) service.bind 'http://localhost:8000' service.start ``` 接着,...
- **Puma的线程池实现**:Puma是一个高性能的Ruby Web服务器,它使用了一个简单的线程池实现来处理并发请求。通过分析其实现细节,可以更好地理解如何在实际项目中应用线程池。 #### 结语 通过深入学习《Working ...
Ruby-ActiveRecord Turntable 是一个针对 ActiveRecord 的扩展,专门用于实现数据库分片功能。 **一、ActiveRecord简介** ActiveRecord是Ruby on Rails框架的一部分,它是一种对象关系映射(ORM)工具,用于简化...
例如,通过模拟一个频繁访问数据库的场景,我们可以看到引入Memcached后,查询速度的显著提升。此外,学习如何处理分布式环境中的一致性问题,如使用一致性哈希算法来分配和查找键值,也是重要的一部分。 在缓存...
这主要依赖于一个名为`redis`的Rubygem,它为Ruby提供了完整的Redis命令集,使得操作Redis变得非常便捷。这个压缩包“ruby和redis接口.zip”很可能包含了关于如何在Ruby项目中配置和使用Redis的相关示例或教程。 ...
Elasticsearch是一个基于Lucene的分布式、RESTful风格的搜索和数据分析引擎。它提供实时的数据存储、搜索、分析和可视化功能,广泛应用于日志分析、实时监控、全文搜索等领域。Ruby的Elasticsearch客户端库使得在...
Selenium 是一个广泛用于 Web 应用程序自动化测试的开源工具。它支持多种编程语言,包括 Java、Python、C#、Ruby 和 JavaScript 等,适用于多种浏览器,如 Chrome、Firefox、Safari 和 Internet Explorer。Selenium ...
同理,Reduce函数的功能是将一个列表缩减为一个标量值,它也通常依赖于递归实现。值得注意的是,尽管递归在命令式编程语言中看起来是一个时间复杂度较高的操作,但实际上,由于编译器或解释器的优化,递归往往在执行...
MongoDB的Ruby驱动程序,即`mongo-ruby-driver`,是用于与MongoDB数据库进行交互的一个关键组件。这个驱动程序允许Ruby开发者利用MongoDB的强大功能,包括文档存储、分布式数据处理以及灵活的数据模型。在本文中,...
MongoDB为开发人员提供了一个灵活、高效且易于扩展的数据库解决方案。通过Ruby与MongoDB的集成,可以构建出强大且灵活的应用程序。无论是简单的CRUD操作还是复杂的关联查询,MongoDB都能很好地满足需求。此外,Ruby...
Elasticsearch 是一个流行的开源全文搜索引擎,它提供了分布式、实时、弹性搜索和分析的能力。在Ruby开发中,`elasticsearch-ruby` 是官方提供的客户端库,使得开发者能够方便地在Ruby应用程序中与Elasticsearch进行...
MongoDB 是一个基于分布式文件存储的开源数据库系统。它旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。MongoDB 将数据存储为一个文档,数据结构由键值(key=>value)对组成。MongoDB 文档类似于 JSON 对象。 ...