The following article shows how to useredisto build a simple multi-producer, multi-consumer Queue with an interface similar to thepython
standardlib queue. With this queue you can easily share data between multiple processes or offload time consumig calculations to multiple worker processes.
To store the data we use theredis list data type. Redis Lists stores simple strings sorted by insertion order.
The following redis commands are used:
-
rpushInsert an element at the tail of the list
-
blpopGet an element from the head of the list, block if list is empty
-
lpopGet an element from the head of the list, return nothing list is empty
-
llenReturn the length of the list
The implementation uses theredis-pylibrary to talk to the server.
import redis
class RedisQueue(object):
"""Simple Queue with Redis Backend"""
def __init__(self, name, namespace='queue', **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db= redis.Redis(**redis_kwargs)
self.key = '%s:%s' %(namespace, name)
def qsize(self):
"""Return the approximate size of the queue."""
return self.__db.llen(self.key)
def empty(self):
"""Return True if the queue is empty, False otherwise."""
return self.qsize() == 0
def put(self, item):
"""Put item into the queue."""
self.__db.rpush(self.key, item)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args block is true and timeout is None (the default), block
if necessary until an item is available."""
if block:
item = self.__db.blpop(self.key, timeout=timeout)
else:
item = self.__db.lpop(self.key)
if item:
item = item[1]
return item
def get_nowait(self):
"""Equivalent to get(False)."""
return self.get(False)
使用方法:
>>> from RedisQueue import RedisQueue
>>> q = RedisQueue('test')
>>> q.put('hello world')
Now if we have a look at the redis database with theredis-cli
client
it shows the expected results:
redis 127.0.0.1:6379> keys *
1) "queue:test"
redis 127.0.0.1:6379> type queue:test
list
redis 127.0.0.1:6379> llen queue:test
(integer) 1
redis 127.0.0.1:6379> lrange queue:test 0 1
1) "hello world"
We can get the item from a different script with:
>>> from RedisQueue import RedisQueue
>>> q = RedisQueue('test')
>>> q.get()
'hello world'
A subsequent call ofq.get()
will block until anotherone puts a new item into the Queue.
The next step would be to an endoder/decoder (e.gpython-json) to the Queue so that you are not limited to send strings.
There alredy exists the nice and simplehotqueuelibrary which has the same interface as the above example and provides encoding/decoding.
Other mentionable queue implementations with a redis backend are:
-
flask-redisA basic Message Queue with Redis for flask.
-
celeryAn asynchronous task queue/job queue based on distributed message passing. Much more advanced. Can be used with different storage backends.
-
rqSimple python library for queueing jobs and processing them in the background with workers.
-
resqueis a Redis-backed Ruby library for creating background jobs, placing them on multiple queues, and processing them later. Used at github. Includes a nice monitoring
web interface.
-
pyresA resque clone in python.
FRom:
http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html
分享到:
相关推荐
在Python中,Redis是一个常用的键值存储系统,用于数据缓存、消息队列等功能。本文将详细介绍如何使用Python的redis库来操作Redis以及处理消息队列。 首先,我们需要导入`redis`库,然后创建一个连接池`redisPool`...
本文将详细探讨PHP中的队列技术,包括如何使用PHP-Redis扩展来实现高效的数据处理。 首先,PHP队列是PHP应用程序中用于处理大量任务的一种机制。它遵循先进先出(FIFO)原则,即最早进入队列的任务最先被处理。队列...
1. `sqlite_queue.py`:这个可能是实现队列操作的主要模块,包括入队、出队、查看队列状态等函数。 2. `sql_wrapper.py`:这个可能是一个SQL语句包装器,提供更高级别的API来执行SQL查询和更新。 3. `tests`目录:...
基于python实现的sqlite队列,方便的处理sqlite并发。SqliteQueue是继承了threading.Thread的线程,并且维护了一个向sqlite请求的队列。支持peewee请求。SqlQuery简单的封装了SQL语句
本项目是基于Go语言和Redis实现的延迟队列,借鉴了有赞(Zan)的设计思路,旨在提供一种高效、可靠的延迟服务。 首先,我们要理解Go语言的特点。Go,也称为Golang,是由Google开发的一种静态类型的编译型编程语言,...
本篇文章将详细讲解如何在Qt应用程序中利用Redis来实现一个高效的消息队列,以实现点对点的生产者-消费者模式。 首先,我们需要了解Qt和Redis的基础知识。Qt是一个跨平台的C++图形用户界面库,它提供了丰富的API...
整个延迟队列由4个部分组成: 1. JobPool用来存放所有Job的元信息。 2. DelayBucket是一组以时间为维度的有序队列,用来存放所有需要延迟的Job(这里只存放Job Id)。 3. Timer负责实时扫描各个Bucket,并将delay...
在IT行业中,构建高效、实时的在线应用是关键任务之一,而`Thinkphp6`、`Redis`、`GatewayWorker`以及`queue队列`这些技术的结合则为实现这一目标提供了强大的支持。本文将深入探讨如何利用这些技术构建一个聊天系统...
RQ(Redis 队列)是一个简单的 Python 库,用于将作业排队并在后台使用工作程序进行处理。它由 Redis 支持,并且设计为具有较低的进入门槛。它应该很容易集成到您的 Web 堆栈中。RQ 需要 Redis >= 3.0.0。 完整文档...
Laravel/Lumen 使用 Redis队列 Laravel/Lumen 框架提供了一个队列系统,允许开发者将耗时操作或高并发操作异步执行,以缓解系统压力、提高系统响应速度和负载能力。在这个系统中,Redis 可以作为队列驱动,负责存储...
标题中的“基于redis实现的消息队列”指的是使用Redis这一开源数据结构存储系统来构建消息队列(Message Queue, MQ)的解决方案。Redis以其高性能、丰富的数据结构和内存存储特性,常被用作构建消息队列的底层技术。...
数据结构学习-Java使用数组实现简单的队列操作SimpleQueue,简单易懂,适合初学者。
为了处理高并发请求并防止超卖现象,可以通过Redis的队列操作,如LPUSH(左入队列)、RPOP(右出队列)等命令实现用户的排队购买。当用户加入队列时,可以通过减少库存队列的长度来控制库存数量。对于已经在队列中的...
你也需要在`.env`文件中将`QUEUE_DRIVER`环境变量设置为`redis`,以确保应用使用Redis作为默认队列驱动。 2. **安装Redis扩展** 在使用Redis之前,确保已安装并配置了PHP的Redis扩展。你可以使用Composer来安装`...
考虑到“简单”和“易用”的特点,一个可能的实现是使用Python的内置`queue`模块,它提供了线程安全的队列操作。 2. **生产者(Producer)**:生产者是将消息放入队列的组件。你可以编写一个函数或类,接收用户输入...
任务队列 环境准备 Python Redis # pip install -i http://pypi.douban.com/simple redis RQ # pip install -i http://pypi.douban.com/simple rq Redis Server # docker pull redis:latest # docker run -d -p 6379...
以下是一个简单的使用Python和Redis实现的生产者-消费者模型: ```python import redis import time import uuid # 创建Redis连接 r = redis.Redis(host='localhost', port=6379, db=0) class Producer: def ...
3. **任务队列(Task Queue)**:存储待处理的任务,如Redis队列。 4. **通信机制**:确保生产者和消费者之间的协调,如发布/订阅模型或工作队列模型。 **使用Flask和Redis实现任务分发** 1. **任务生产者**:在...
本 Demo 演示了如何使用 Python 的标准库 queue 和 tkinter 来创建一个简单的图形用户界面(GUI)。此 Demo 的目的是展示如何通过队列实现 GUI 的即时刷新,尤其是在进行耗时操作时保持界面的响应性。 系统要求: ...
本文将深入探讨如何使用Go语言实现基于Redis的消息队列,并着重讲解其特性,包括并发队列处理、Topic与Group的注册、异常监听与重启恢复。 首先,Go语言因其轻量级线程(Goroutine)和通道(Channel)机制,非常...