`
pjwqq
  • 浏览: 81182 次
社区版块
存档分类
最新评论

pyzmq的Monitor Queue

阅读更多

    前面讲过zmq的device,用来充当客户端与服务端的中间件,以增加灵活性,让服务端也变成可插拔。然而device是zmq封装好的,怎样才能一窥内部的数据流呢?看图

    一看这图就明白了,MonitoredQueue在创建Queue同时,还提供第3个PUB socket来发布途经这个Queue的进出信息。

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes
from multiprocessing import Process
import random

frontend_port = 5559
backend_port = 5560
monitor_port = 5562
number_of_workers = 2

   创建这个MonitorQueue

def monitordevice():
    in_prefix=asbytes('in')
    out_prefix=asbytes('out')
    monitoringdevice = MonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, in_prefix, out_prefix)
    
    monitoringdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port)
    monitoringdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
    monitoringdevice.bind_mon("tcp://127.0.0.1:%d" % monitor_port)
    
    monitoringdevice.setsockopt_in(zmq.HWM, 1)
    monitoringdevice.setsockopt_out(zmq.HWM, 1)
    monitoringdevice.start()  
    print "Program: Monitoring device has started"

   REP server

def server(backend_port):
    print "Program: Server connecting to device"
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://127.0.0.1:%s" % backend_port)
    server_id = random.randrange(1,10005)
    while True:
        message = socket.recv()
        print "Server: Received - %s" % message  
        socket.send("Response from server #%s" % server_id)

   REQ client

def client(frontend_port, client_id):
    print "Program: Worker #%s connecting to device" % client_id
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://127.0.0.1:%s" % frontend_port)
    request_num = 1
    socket.send ("Request #%s from client#%s" % (request_num, client_id))
    #  Get the reply.
    message = socket.recv_multipart()
    print "Client: Received - %s" % message

   最后来一个SUB客户端接收从MonitorQueue的PUB端发布的消息

def monitor():
    print "Starting monitoring process"
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    print "Collecting updates from server..."
    socket.connect ("tcp://127.0.0.1:%s" % monitor_port)
    socket.setsockopt(zmq.SUBSCRIBE, "")
    while True:
        string = socket.recv_multipart()
        print "Monitoring Client: %s" % string

   然后在不同进程中分别启动以上各项

monitoring_p = Process(target=monitordevice)
monitoring_p.start()  
server_p = Process(target=server, args=(backend_port,))
server_p.start()  
monitorclient_p = Process(target=monitor)
monitorclient_p.start()  
time.sleep(2)   

for client_id in range(number_of_workers):
    Process(target=client, args=(frontend_port, client_id,)).start()

time.sleep(10)
server_p.terminate()
monitorclient_p.terminate()
monitoring_p.terminate()

    如此即可观察到路过Queue的数据流。

    整个过程简单明了,没有什么弯弯绕,感觉zmq上手还是很容易的。

 

 

0
0
分享到:
评论

相关推荐

    Laravel开发-laravel-queue-monitor

    在本文中,我们将深入探讨Laravel开发中的一个关键组件——`laravel-queue-monitor`,这是一个专门用于监视队列作业的Laravel包。队列在Web应用开发中扮演着重要角色,尤其在处理耗时任务时,它们可以提高应用程序的...

    C语言头文件 QUEUE.H

    C语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言头文件 QUEUE.HC语言...

    Laravel-Queue-Monitor:使用数据库监视Laravel作业

    composer require romanzipp/laravel-queue-monitor 配置 将配置和迁移复制到您的项目: php artisan vendor:publish --provider="romanzipp\QueueMonitor\Providers\QueueMonitorProvider" 迁移队列监视表。 可以...

    C# MessageQueue示例

    MessageQueue,又称消息队列,是C#中处理异步通信和解耦组件的重要技术。它允许应用程序之间通过消息传递数据,而无需彼此直接交互。下面将详细介绍C#中的MessageQueue以及如何使用它来发送和接收消息。 1. **...

    Laravel开发-laravel-queue-monitor .zip

    在本文中,我们将深入探讨Laravel框架中的队列监控,主要基于提供的压缩包"laravel-queue-monitor .zip"。Laravel是一个优雅的PHP web应用框架,它提供了一系列强大的工具来帮助开发者更高效地构建高质量的web应用。...

    thinkphp5.0.24+queue 队列信息完整源码

    《ThinkPHP5.0.24与Queue队列技术详解》 在PHP开发领域,ThinkPHP框架因其简洁高效的特性而广受欢迎,特别是在企业级应用中,其提供的队列功能能够帮助开发者实现异步任务处理,提高系统性能。本文将详细探讨在...

    Queue与Topic的比较

    Queue 与 Topic 的比较 Queue 和 Topic 是 JMS(Java Message Service)中两种基本的消息模式,分别对应 Point-to-Point 和 Publish/Subscribe 模式。 Queue 模式 在 Queue 模式中,一条消息仅能被一个消费者...

    活用Android的Message Queue

    在Android开发中,Message Queue是一种重要的机制,用于在不同线程间进行异步通信和任务调度。理解并熟练运用Message Queue、Looper和Handler是构建高效、响应性良好的Android应用的关键。 1. **Message Queue...

    前端开源库-promise-queue

    Promise Queue 是一个专门用于解决此类问题的开源库,它允许我们以有序、控制流的方式执行基于Promise的异步任务。这个库的核心理念是通过队列机制限制同时运行的任务数量,从而避免系统资源过度消耗,提高应用性能...

    前端开源库-promise-queue-plus

    "Promise Queue Plus" 就是一个专门为解决此类问题设计的开源库,它基于Promise实现,提供了超时、重试等高级特性,极大地增强了异步任务的处理能力。 Promise 是JavaScript中的一个关键特性,用于处理异步操作,...

    C++ Queue(带上限的)

    在C++编程语言中,`Queue`是一种常用的数据结构,它遵循“先进先出”(First In First Out, FIFO)的原则。通常,C++标准库提供了`<queue>`头文件来实现基本的队列操作,但这个标准队列并没有设置上限。在某些特定...

    tp5.1消息队列 think-queue

    标题 "tp5.1消息队列 think-queue" 指的是使用ThinkPHP5.1框架集成的消息队列组件——think-queue。消息队列在软件开发中扮演着重要角色,它允许应用程序异步处理耗时任务,提高系统响应速度和整体性能。think-queue...

    Laravel开发-laravel-queue-monitor .zip.zip

    在本文中,我们将深入探讨Laravel框架中的队列监控,主要基于提供的压缩包"laravel-queue-monitor .zip"。Laravel是一个优雅的PHP web应用框架,它提供了一系列强大的工具来帮助开发者更高效地构建高质量的web应用。...

    Unity3d 队列 方法 Queue

    ### Unity3D中的队列(Queue)方法解析与应用实例 #### 一、概述 在Unity3D开发中,队列是一种非常实用的数据结构,它遵循先进先出(First In First Out, FIFO)的原则,即最先加入队列的元素会最先被移除。队列在...

    解决Can't locate ThreadQueue.pm

    标题 "解决Can't locate ThreadQueue.pm" 指出的问题是,在尝试运行一个Perl脚本时,系统无法找到模块“ThreadQueue”。这个问题通常出现在你试图使用一个依赖于ThreadQueue模块的Perl程序,但该模块尚未在你的Perl...

    Queue-Queue-Queue

    Queue-Queue-Queue

    消息队列 Queue与Topic区别.docx

    ### 消息队列Queue与Topic的区别 #### 一、概念概述 消息队列(Message Queue)是一种应用程序间通信机制,允许程序之间通过发送和接收消息进行通信,而不必直接建立连接。它提供了异步处理机制,使得消息的发送者...

    STL中priority_queue

    STL 中的 priority_queue priority_queue 是 STL 中的一种容器,可以实现优先级队列的功能。下面,我们将详细介绍 priority_queue 的使用方法和实现原理。 priority_queue 的基本概念 priority_queue 是一种特殊...

    kodi-sync-queue_6.0.0.0.zip

    《Kodi-Sync-Queue:Jellyfin的增强利器》 Kodi-Sync-Queue,版本6.0.0.0,是一个专为Jellyfin设计的官方插件,其核心功能是为用户提供一个同步播放队列的工具,极大地提升了Jellyfin媒体中心的用户体验。在了解这...

Global site tag (gtag.js) - Google Analytics