`
pjwqq
  • 浏览: 81567 次
社区版块
存档分类
最新评论

pyzmq的Polling and Sockets

阅读更多

  Polling and Sockets

  一个线程中有多个sokect,同时需要收发数据,zmq提供polling sockets实现,不用在recv()时阻塞。

  下面这个例程中创建一个command server来告诉worker何时退出,worker从Publisher获得订阅并打印,('exit'时退出)。

    1.PUSH server ,命令服务

import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1) 

   2.PUB server,发布消息

def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1) 

   3.客户端

def client(port_push, port_sub):
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # 初始化Poller
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

 运行

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    Process(target=client, args=(server_push_port,server_pub_port,)).start()

看一下api

poll(timeout=None)

Poll the registered 0MQ or native fds for I/O.

Parameters: Returns: Return type:
timeout (float, int) – The timeout in milliseconds. If None, no timeout (infinite). This is in milliseconds to be compatible with select.poll().
events – The list of events that are ready to be processed. This is a list of tuples of the form (socket, event), where the 0MQ Socket or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. It is common to callevents = dict(poller.poll()), which turns the list of tuples into a mapping ofsocket : event.
list of tuples

  至于POLLIN,POLLOUT:

  • flag (int, default=POLLIN|POLLOUT) – 0MQ poll flags. If flag|POLLIN, recv events will be flushed. If flag|POLLOUT, send events will be flushed. Both flags can be set at once, which is the default.

  如此这般,不停的轮询注册在poller中的sockter状态,类似与java nio中将channel注册到selector。发现某个socket数据接收就绪(POLLIN),执行业务代码。 

 

  但是,用'if '的处理方式有点丑,所以pyzmq提供实现tornador ioloop的IOStream 的类:ZMQStream 来处理polling event,并且这样就可以使用回调。

  首先,安装tornador : pip install tornado

 然后,改造上面的代码:

import zmq
import time
import sys
import random
from  multiprocessing import Process

from zmq.eventloop import ioloop, zmqstream
ioloop.install()
    ioloop.install()用来告诉tornador.ioloop.IOLoop使用zmq的poller。
    PUSH server和PUB server的代码不用改

    把2个处理业务的函数拎出来做回调

def getcommand(msg):
	print "Received control command: %s" % msg
	if msg[0] == "Exit":
		print "Received exit command, client will stop receiving messages"
		should_continue = False
		ioloop.IOLoop.instance().stop()#退出请停止
        
def process_message(msg):
	print "Processing ... %s" % msg

    客户端改成这样:

def client(port_push, port_sub):    
	context = zmq.Context()
	socket_pull = context.socket(zmq.PULL)
	socket_pull.connect ("tcp://localhost:%s" % port_push)
	stream_pull = zmqstream.ZMQStream(socket_pull)
	stream_pull.on_recv(getcommand)
	print "Connected to server with port %s" % port_push
	
	socket_sub = context.socket(zmq.SUB)
	socket_sub.connect ("tcp://localhost:%s" % port_sub)
	socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
	stream_sub = zmqstream.ZMQStream(socket_sub)
	stream_sub.on_recv(process_message)
	print "Connected to publisher with port %s" % port_sub
	ioloop.IOLoop.instance().start()
	print "Worker has stopped processing messages."


if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    Process(target=client, args=(server_push_port,server_pub_port,)).start()

    将原来的socket装饰成zmqstream,然后将ioloop实例run起来,其它就不需要我操心了,妥妥的傻瓜式。

 

 

 

3
2
分享到:
评论

相关推荐

    GolangHTTP的longpolling库Golongpoll.zip

    Golongpoll 是 golang HTTP 的 longpolling 库,可以使构建 web pub-sub 更加容易。基本用法:import "github.com/jcuga/golongpoll" // This launches a goroutine and creates channels for all the ...

    SDIO-Polling mode.rar

    在本项目"SDIO-Polling mode.rar"中,我们关注的是如何在STM32F103ZETX微控制器上使用轮询模式(Polling mode)来实现SDIO的数据读写。 STM32F103ZETX是意法半导体(STMicroelectronics)生产的基于ARM Cortex-M3...

    Python库 | polling2-0.4.7.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:polling2-0.4.7.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Vacation and polling models with retrials

    轮询系统(Polling Systems)** 轮询系统是一种单服务器交替访问多个队列的排队模型,按照预定的顺序进行。这类系统也被广泛研究,并且可以应用于各种不同的服务策略和服务纪律规则。关于轮询系统的文献综述可见于...

    polling system simulation

    这个名为"polling system simulation"的程序正是为研究轮询算法提供了一个模拟平台,有助于深入理解和优化这类算法。 轮询系统的工作原理是,中央处理器(CPU)周期性地检查各个设备,看它们是否有数据需要传输或...

    Polling_dma_

    lpc17xx_libcfg.h: Library configuration file - include needed driver library for this example makefile: Example's makefile (to build with GNU toolchain)adc_burst_test.c: Main program

    Peter's Polling Package

    Peter's Polling Package Highly customizable Polling/Voting controls for asp.net web sites

    STM32实现button polling & interrupt方式点亮LED

    本文将详细讨论如何通过两种不同的方法——轮询(polling)和中断(interrupt)来实现STM32控制LED的亮灭。 首先,我们来看轮询方式。轮询是程序不断检查某个条件是否满足的过程。在STM32中,如果我们要用一个按钮...

    2-M3-USART1(polling)_uart_

    两个12位带缓冲的DAC通道可以用于转换2路数字信号成为2路模拟电压信号并输出。这项功能内部是通过集成的电阻串和反向的放大器实现。这个双数字接口支持下述功能:参照2009年3月 STM32F103xCDE数据手册 英文第5版

    CF Polling v0.94

    在不依赖MySQL的情况下,"CF Polling v0.94"可能会使用文件存储或者替代的数据库系统(如SQLite)来存储投票数据。这样做的优点包括简化部署流程,减少服务器资源需求,同时对初学者或小型项目来说更易于管理和维护...

    UNIX Network Programming Volume 1, Third Edition (Unix网络编程卷1第3版英文版)

    Source Code and Errata Availability Acknowledgments Part 1: Introduction and TCP/IP Chapter 1. Introduction Section 1.1. Introduction Section 1.2. A Simple Daytime Client Section 1.3. ...

    WinHTTP WebSocket 代码

    HTML 5 Web Sockets is a powerful and effective technique for real-time information processing. There exists many techniques such as Poling, Long Poling...Here goes a comparison of polling vs Web Sockets.

    CF Polling v0.94.zip

    由于CF Polling v0.94.zip的源代码是开放的,这意味着任何用户都可以阅读和修改程序代码。开源社区的力量可以帮助这个程序持续进化,加入新功能,如提供多语言支持、集成社交媒体分享功能、增加数据分析和报告等。这...

    ADC_RegularConversion_Polling_adc_

    在嵌入式系统中,模拟数字转换器(ADC)是一个至关重要的组件,它允许设备将来自传感器或其他模拟信号源的数据转换为数字值,以便微控制器能够处理这些信息。本教程将详细阐述如何使用HAL(硬件抽象层)API,在轮询...

    The Signal and the Noise: Why So Many Predictions Fail — but Some Don&quot;t

    s brilliant and elegant tour of the modern science slash art of forecasting shows what happens when Big Data meets human nature Baseball weather forecasting earthquake prediction economics and polling...

    L475+SDMMC+polling SD卡

    SDMMC驱动SD卡程序,见附件,

    libgdx PollingTest Polling

    libgdx PollingTest Polling

    modbus_master_polling.zip

    在本压缩包“modbus_master_polling.zip”中,包含了一个西门子主站(Master)使用MODBUS RTU协议进行轮询(Polling)的示例程序,名为“modbus master polling.mwp”。这个程序可能是用特定的编程语言或工具编写的...

    lemoal-nvme-polling-vault-2017-final_0.pdf

    https://events.static.linuxfound.org/sites/events/files/slides/lemoal-nvme-polling-vault-2017-final_0.pdf

Global site tag (gtag.js) - Google Analytics