`
zhou.xingbo
  • 浏览: 53145 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

pyzmq 使用

阅读更多

1. The Socket API

  • Creating and destroying sockets, which go together to form a karmic circle of socket life (see zmq_socketzmq_close).
  • Configuring sockets by setting options on them and checking them if necessary (see zmq_setsockoptzmq_getsockopt).
  • Plugging sockets onto the network topology by creating ØMQ connections to and from them (see zmq_bindzmq_connect).
  • Using the sockets to carry data by writing and receiving messages on them (see zmq_sendzmq_recv).

2.Plugging Sockets Into the Topology

  • They go across an arbitrary transport (inprocipctcppgm or epgm). See zmq_inproczmq_ipczmq_tcpzmq_pgm, and zmq_epgm.
  • They exist when a client does zmq_connect to an endpoint, whether or not a server has already done zmq_bind to that endpoint.
  • They are asynchronous, and have queues that magically exist where and when needed.
  • They may express a certain "messaging pattern", according to the type of socket used at each end.
  • One socket may have many outgoing and many incoming connections.
  • There is no zmq_accept() method. When a socket is bound to an endpoint it automatically starts accepting connections.
  • Your application code cannot work with these connections directly; they are encapsulated under the socket.
A server node can bind to many endpoints and it can do this using a single socket. This means it will accept connections across different transports.


3.Using Sockets to Carry Data
  • ØMQ sockets carry messages, rather than bytes (as in TCP) or frames (as in UDP). A message is a length-specified blob of binary data. We'll come to messages shortly, their design is optimized for performance and thus somewhat tricky to understand.
  • ØMQ sockets do their I/O in a background thread. This means that messages arrive in a local input queue, and are sent from a local output queue, no matter what your application is busy doing. These are configurable memory queues, by the way.
  • ØMQ sockets can, depending on the socket type, be connected to (or from, it's the same) many other sockets. Where TCP emulates a one-to-one phone call, ØMQ implements one-to-many (like a radio broadcast), many-to-many (like a post office), many-to-one (like a mail box), and even one-to-one.
  • ØMQ sockets can send to many endpoints (creating a fan-out model), or receive from many endpoints (creating a fan-in model).

 

4.Core Messaging Patterns

 

  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Publish-subscribe, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, connects nodes in a fan-out / fan-in pattern that can have multiple steps, and loops. This is a parallel task distribution and collection pattern.

We looked at each of these in the first chapter. There's one more pattern that people tend to try to use when they still think of ØMQ in terms of traditional TCP sockets:

  • Exclusive pair, which connects two sockets in an exclusive pair. This is a low-level pattern for specific, advanced use-cases. We'll see an example at the end of this chapter.
These are the socket combinations that are valid for a connect-bind pair:
  • PUB and SUB
  • REQ and REP
  • REQ and XREP
  • XREQ and REP
  • XREQ and XREP
  • XREQ and XREQ
  • XREP and XREP
  • PUSH and PULL
  • PAIR and PAIR

5.Working with Messages

Note than when you have passed a message to zmq_send, 0MQ will clear the message, i.e. set the size to zero. You cannot send the same message twice, and you cannot access the message data after sending it.

 

6.Handling Multiple Sockets 

In all the examples so far, the main loop of most examples has been:

1. wait for message on socket

2. process message

3. repeat

 

Let's start with a dirty hack, partly for the fun of not doing it right, but mainly because it lets me show you how to do non-blocking socket reads. Here is a simple example of reading from two sockets using non-blocking reads. This rather confused program acts both as a subscriber to weather updates, and a worker for parallel tasks:

 

# encoding: utf-8
#
#   Reading from multiple sockets
#   This version uses a simple recv loop
#
#   Author: Jeremy Avnet (brainsik) <spork(dash)zmq(at)theory(dot)org>
#

import zmq
import time

# Prepare our context and sockets
context = zmq.Context()

# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, "10001")

# Process messages from both sockets
# We prioritize traffic from the task ventilator
while True:

    # Process any waiting tasks
    while True:
        try:
            rc = receiver.recv(zmq.NOBLOCK)
        except zmq.ZMQError:
            break
        # process task

    # Process any waiting weather updates
    while True:
        try:
            rc = subscriber.recv(zmq.NOBLOCK)
        except zmq.ZMQError:
            break
        # process weather update

    # No activity, so sleep for 1 msec
    time.sleep(0.001)
 

 

Now let's see the same little senseless application done right, using zmq_poll

 

# encoding: utf-8
#
#   Reading from multiple sockets
#   This version uses zmq.Poller()
#
#   Author: Jeremy Avnet (brainsik) <spork(dash)zmq(at)theory(dot)org>
#

import zmq

# Prepare our context and sockets
context = zmq.Context()

# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, "10001")

# Initialize poll set
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

# Process messages from both sockets
while True:
    socks = dict(poller.poll())

    if receiver in socks and socks[receiver] == zmq.POLLIN:
        message = receiver.recv()
        # process task

    if subscriber in socks and socks[subscriber] == zmq.POLLIN:
        message = subscriber.recv()
        # process weather update

 

 

7. Handling Errors and ETERM

 

  • Methods that create objects will return NULL in case they fail.
  • Other methods will return 0 on success and other values (mostly -1) on an exceptional condition (usually failure).
  • The error code is provided in errno or zmq_errno.
  • A descriptive error text for logging is provided by zmq_strerror.
Here is the worker process, which manages two sockets (a PULL socket getting tasks, and a SUB socket getting control commands) using the zmq_poll technique we saw earlier:
# encoding: utf-8
#
#   Task worker - design 2
#   Adds pub-sub flow to receive and respond to kill signal
#
#   Author: Jeremy Avnet (brainsik) <spork(dash)zmq(at)theory(dot)org>
#

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Socket for control input
controller = context.socket(zmq.SUB)
controller.connect("tcp://localhost:5559")
controller.setsockopt(zmq.SUBSCRIBE, "")

# Process messages from receiver and controller
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(controller, zmq.POLLIN)
# Process messages from both sockets
while True:
    socks = dict(poller.poll())

    if socks.get(receiver) == zmq.POLLIN:
        message = receiver.recv()

        # Process task
        workload = int(message)  # Workload in msecs

        # Do the work
        time.sleep(workload / 1000.0)

        # Send results to sink
        sender.send(message)

        # Simple progress indicator for the viewer
        sys.stdout.write(".")
        sys.stdout.flush()

    # Any waiting controller command acts as 'KILL'
    if socks.get(controller) == zmq.POLLIN:
        break
 
Here is the modified sink application. When it's finished collecting results it broadcasts a KILL message to all workers:
# encoding: utf-8
#
#   Task sink - design 2
#   Adds pub-sub flow to send kill signal to workers
#
#   Author: Jeremy Avnet (brainsik) <spork(dash)zmq(at)theory(dot)org>
#

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Socket for worker control
controller = context.socket(zmq.PUB)
controller.bind("tcp://*:5559")

# Wait for start of batch
receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmiations
for task_nbr in xrange(100):
    receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(":")
    else:
        sys.stdout.write(".")
    sys.stdout.flush()

# Calculate and report duration of batch
tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec

# Send kill signal to workers
controller.send("KILL")

# Finished
time.sleep(1)  # Give 0MQ time to deliver
 
8. Handling Interrupt Signals
Realistic applications need to shutdown cleanly when interrupted with Ctrl-C or another signal such as SIGTERM. By default, these simply kill the process, meaning messages won't be flushed, files won't be closed cleanly, etc.

9. Detecting Memory Leaks?

10. Multipart Messages?

11. Intermediates and Devices
then we extend the application across a wider network, placing devices in specific places and scaling up the number of nodes:

11.1 A Publish-Subscribe Proxy Server
# Weather proxy device
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import zmq

context = zmq.Context()

# This is where the weather server sits
frontend = context.socket(zmq.SUB)
frontend.connect("tcp://192.168.55.210:5556")

# This is our public endpoint for subscribers
backend = context.socket(zmq.PUB)
backend.bind("tcp://10.1.1.0:8100")

# Subscribe on everything
frontend.setsockopt(zmq.SUBSCRIBE, '')

# Shunt messages out to our own subscribers
while True:
    while True:

        # Process all parts of the message
        message = frontend.recv()
        more = frontend.getsockopt(zmq.RCVMORE)
        if more:
            backend.send(message, zmq.SNDMORE)
        else:
            backend.send(message)
            break # Last message part
 
11.2 A Request-Reply Broker

Luckily there are non-blocking versions of these two sockets, called XREQ and XREP. These "extended request/reply" sockets let you extend request-reply across intermediate nodes, such as our message queuing broker.

When we extend request-reply, REQ talks to XREP and XREQ talks to REP. In between the XREQ and XREP we have to have code (like our broker) that pulls messages off the one socket and shoves them onto the other.

 

The request-reply broker binds to two endpoints, one for clients to connect to (the frontend socket) and one for services to connect to (the backend). To test this broker, you will want to change your services so they connect to the backend socket. Here are a client and service that show what I mean:

#
#   Request-reply client in Python
#   Connects REQ socket to tcp://localhost:5559
#   Sends "Hello" to server, expects "World" back
#
import zmq

#  Prepare our context and sockets
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5559")

#  Do 10 requests, waiting each time for a response
for request in range(1,10):
    socket.send("Hello")
    message = socket.recv()
    print "Received reply ", request, "[", message, "]"
 

 

#
#   Request-reply service in Python
#   Connects REP socket to tcp://localhost:5560
#   Expects "Hello" from client, replies with "World"
#
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:5560")

while True:
    message = socket.recv()
    print "Received request: ", message
    socket.send("World")

 

 

And here is the broker, in Python. You will see that it's multipart safe:

 

# author: Oleg Sidorov <4pcbr> i4pcbr@gmail.com
# this code is licenced under the MIT/X11 licence.

require 'rubygems'
require 'ffi-rzmq'

context = ZMQ::Context.new
frontend = context.socket(ZMQ::XREP)
backend = context.socket(ZMQ::XREQ)

frontend.bind('tcp://*:5559')
backend.bind('tcp://*:5560')

poller = ZMQ::Poller.new
poller.register(frontend, ZMQ::POLLIN)
poller.register(backend, ZMQ::POLLIN)

while true
  poller.poll(:blocking)
  poller.readables.each do |socket|
    if socket === frontend
      while true
        message = socket.recv_string
        more = socket.more_parts?
        backend.send_string(message, more ? ZMQ::SNDMORE : 0)
        break if !more
      end
    elsif socket === backend
      while true
        message = socket.recv_string
        more = socket.more_parts?
        frontend.send_string(message, more ? ZMQ::SNDMORE : 0)
        break if !more
      end
    end
  end
end

 

 

Using a request-reply broker makes your client-server architectures easier to scale since clients don't see services, and services don't see clients. The only stable node is the device in the middle

 

Built-in Devices

 

ØMQ provides some built-in devices, though most advanced users write their own devices. The built-in devices are:

  • QUEUE, which is like the request-reply broker.
  • FORWARDER, which is like the pub-sub proxy server.
  • STREAMER, which is like FORWARDER but for pipeline flows.
Here is the request-reply broker re-written to call QUEUE and rebadged as an expensive-sounding "message queue" (people have charged houses for code that did less):

 

 

"""

   Simple message queuing broker
   Same as request-reply broker but using QUEUE device
 
   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
  
"""


import zmq

def main():
    """ main method """
    
    context = zmq.Context(1)
    
    # Socket facing clients
    frontend = context.socket(zmq.XREP)
    frontend.bind("tcp://*:5559")
    
    # Socket facing services
    backend  = context.socket(zmq.XREQ)
    backend.bind("tcp://*:5560")
    
    zmq.device(zmq.QUEUE, frontend, backend)
    
    # We never get here...
    frontend.close()
    backend.close()
    context.term()
    


if __name__ == "__main__":
    main()
 

 

12. Multithreading with 0MQ

 

"""

   Multithreaded Hello World server
 
   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
  
"""
import time
import threading
import zmq

def worker_routine(worker_url, context):
    """ Worker routine """
    
    # Socket to talk to dispatcher
    socket = context.socket(zmq.REP)
    
    socket.connect(worker_url)
    
    while True:
        
        string  = socket.recv()
       
        print("Received request: [%s]\n" % (string))
        
        # do some 'work'
        time.sleep(1)
        
        #send reply back to client
        socket.send("World")

def main():
    """ server routine """
    
    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"
    
    # Prepare our context and sockets
    context = zmq.Context(1)
    
    # Socket to talk to clients
    clients = context.socket(zmq.XREP)
    clients.bind(url_client)
    
    # Socket to talk to workers
    workers = context.socket(zmq.XREQ)
    workers.bind(url_worker)
    
    # Launch pool of worker threads
    for i in range(5):
        thread = threading.Thread(target=worker_routine, args=(url_worker, context, ))
        thread.start()
    
    zmq.device(zmq.QUEUE, clients, workers)
    
    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    context.term()


    
if __name__ == "__main__":
    main()

 

 

13. Signaling between Threads

In this example we use PAIR sockets over the inproc transport:

 

"""

   Multithreaded relay
 
   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
  
"""

import threading
import zmq

def step1(context):
    """ step1 """
    
    # Signal downstream to step 2
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step2")
    
    sender.send("")
    


def step2(context):
    """ step2 """
    
    # Bind to inproc: endpoint, then start upstream thread
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step2")
    
    thread = threading.Thread(target=step1, args=(context, ))
    thread.start()
    
    # Wait for signal
    string = receiver.recv()

    # Signal downstream to step 3
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step3")
    sender.send("")
    
    return

def main():
    """ server routine """ 
    # Prepare our context and sockets
    context = zmq.Context(1)
    
    # Bind to inproc: endpoint, then start upstream thread
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step3")
    
    thread = threading.Thread(target=step2, args=(context, ))
    thread.start()
    
    # Wait for signal 
    string = receiver.recv()
    
    print("Test successful!\n")
    
    receiver.close()
    context.term()
    
    return


if __name__ == "__main__":
    main()

 

 

14. Node Coordination

 

#
#  Synchronized publisher
#
import zmq

#  We wait for 10 subscribers
SUBSCRIBERS_EXPECTED = 2

def main():
    context = zmq.Context()
    
    # Socket to talk to clients
    publisher = context.socket(zmq.PUB)
    publisher.bind('tcp://*:5561')

    # Socket to receive signals
    syncservice = context.socket(zmq.REP)
    syncservice.bind('tcp://*:5562')

    # Get synchronization from subscribers
    subscribers = 0
    while subscribers < SUBSCRIBERS_EXPECTED:
        # wait for synchronization request
        msg = syncservice.recv()
        # send synchronization reply
        syncservice.send('')
        subscribers += 1
        print "+1 subscriber"
    
    # Now broadcast exactly 1M updates followed by END
    for i in range(1000000):
       publisher.send('Rhubarb');

    publisher.send('END')

if __name__ == '__main__':
    main()

 

 

 

#
#  Synchronized subscriber
#
import zmq

def main():
    context = zmq.Context()
    
    # First, connect our subscriber socket
    subscriber = context.socket(zmq.SUB)
    subscriber.connect('tcp://localhost:5561')
    subscriber.setsockopt(zmq.SUBSCRIBE, "")

    # Second, synchronize with publisher
    syncclient = context.socket(zmq.REQ)
    syncclient.connect('tcp://localhost:5562')
    
    # send a synchronization request
    syncclient.send('')
    
    # wait for synchronization reply
    syncclient.recv()

    # Third, get our updates and report how many we got
    nbr = 0
    while True:
        msg = subscriber.recv()
        if msg == 'END':
            break
        nbr += 1
    
    print 'Received %d updates' % nbr

if __name__ == '__main__':
    main()

 

 

15. Transient vs. Durable Sockets

16. Pubsub Message Envelopes

 

"""

   Pubsub envelope publisher   
 
   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
  
"""
import time
import zmq

def main():
    """ main method """
    
    # Prepare our context and publisher
    context   = zmq.Context(1)
    publisher = context.socket(zmq.PUB)
    publisher.bind("tcp://*:5563")
    
    while True:
        # Write two messages, each with an envelope and content
        publisher.send_multipart(["A", "We don't want to see this"])
        publisher.send_multipart(["B", "We would like to see this"])
        time.sleep(1)
    
    # We never get here but clean up anyhow
    publisher.close()
    context.term()
    

if __name__ == "__main__":
    main()

 

 

 """
   Pubsub envelope subscriber   
 
   Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com>
  
"""
import zmq

def main():
    """ main method """
    
    # Prepare our context and publisher
    context    = zmq.Context(1)
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5563")
    subscriber.setsockopt(zmq.SUBSCRIBE, "B")
    
    while True:
        # Read envelope with address
        [address, contents] = subscriber.recv_multipart()
        print("[%s] %s\n" % (address, contents))
    
    # We never get here but clean up anyhow
    subscriber.close()
    context.term()


if __name__ == "__main__":
    main()

 

17. Making a (Semi-)Durable Subscriber

 

# encoding: utf-8
#
#   Publisher for durable subscriber
#
#   Author: Jeremy Avnet (brainsik) <spork(dash)zmq(at)theory(dot)org>
#

import zmq
import time

context = zmq.Context()

# Subscriber tells us when it's ready here
sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")

# We send updates via this socket
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5565")

# Wait for synchronization request
sync_request = sync.recv()

# Now broadcast exactly 10 updates with pause
for n in xrange(10):
    msg = "Update %d" % n
    publisher.send(msg)
    time.sleep(1)

publisher.send("END")
time.sleep(1)  # Give 0MQ/2.0.x time to flush output

 

 

 

# encoding: utf-8
#
#   Durable subscriber
#
#   Author: Jeremy Avnet (brainsik) <spork(dash)zmq(at)theory(dot)org>
#

import zmq
import time

context = zmq.Context()

# Connect our subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.IDENTITY, "Hello")
subscriber.setsockopt(zmq.SUBSCRIBE, "")
subscriber.connect("tcp://localhost:5565")

# Syncronize with the publisher
sync = context.socket(zmq.PUSH)
sync.connect("tcp://localhost:5564")
sync.send("")

# Get updates, expect random Ctrl-C death
while True:
    data = subscriber.recv()
    print data
    if data == "END":
        break

 

 

We can put this together to make a cynical publisher that is immune to slow, blocked, or absent subscribers while still offering durable subscriptions to those that need it:

 

# Publisher for durable subscriber
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import zmq
import time

context = zmq.Context()

# Subscriber tells us when it's ready here
sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")

# We send updates via this socket
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5565")

# Prevent publisher overflow from slow subscribers
publisher.setsockopt(zmq.HWM, 1)

# Specify the swap space in bytes, this covers all subscribers
publisher.setsockopt(zmq.SWAP, 25000000)

# Wait for synchronization request
sync_request = sync.recv()

# Now broadcast exactly 10 updates with pause
for n in xrange(10):
    msg = "Update %d" % n
    publisher.send(msg)
    time.sleep(1)

publisher.send("END")
time.sleep(1)  # Give 0MQ/2.0.x time to flush output

 

 

ps:

http://zguide.zeromq.org/page:all

分享到:
评论

相关推荐

    PyPI 官网下载 | pyzmq-22.1.0-cp37-cp37m-manylinux1_i686.whl

    5. **Integration with Python Concurrency**:`pyzmq`能很好地与Python的多线程和多进程模型结合,支持使用`asyncio`进行异步编程。 6. **ZeroMQ Features**:包括流控制、负载均衡、故障恢复、消息持久化等特性,...

    Python库 | pyzmq-22.0.1-cp39-cp39-win_amd64.whl

    **Python库pyzmq详解** `pyzmq`是Python中的ZeroMQ库,它为Python提供了与ZeroMQ消息传递库的接口。ZeroMQ,又称零MQ或ØMQ,是一个跨平台、高...正确理解和使用`pyzmq`,可以显著提升Python应用程序的扩展性和性能。

    PyPI 官网下载 | pyzmq-18.1.1-cp37-cp37m-manylinux1_i686.whl

    《PyPI官网下载的PyZMQ 18.1.1版详解》 ...在Python 3.7环境下,使用"pyzmq-18.1.1-cp37-cp37m-manylinux1_i686.whl"这个特定版本的PyZMQ,开发者可以充分利用ZeroMQ的功能,为他们的项目带来更强大的网络通信能力。

    pyzmq-24.0.1.tar

    在"pyzmq-24.0.1.tar"这个压缩包中,包含的是PyZMQ 24.0.1版本的源代码,用于Python环境下的安装和使用。值得注意的是,这并非一个预编译的二进制库,而是源代码形式的Python库,这意味着在使用前需要先进行编译和...

    Python库 | pyzmq-18.0.0-cp36-cp36m-manylinux1_i686.whl

    资源分类:Python库 所属语言:Python 使用前提:需要解压 资源全名:pyzmq-18.0.0-cp36-cp36m-manylinux1_i686.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | pyzmq-17.1.0-cp34-cp34m-win_amd64.whl

    资源分类:Python库 所属语言:Python 使用前提:需要解压 资源全名:pyzmq-17.1.0-cp34-cp34m-win_amd64.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | pyzmq-2.2.0-py2.7-win-amd64.egg

    资源分类:Python库 所属语言:Python 使用前提:需要解压 资源全名:pyzmq-2.2.0-py2.7-win-amd64.egg 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    pyzmq:PyZMQ:Zeromq的Python绑定

    PyZMQ应该与任何合理版本的Python(≥3.4),Python 2.7和3.3以及PyPy一起使用。 CPython使用的Cython后端支持libzmq≥2.1.4(包括3.2.x和4.x),但是PyPy使用的CFFI后端仅支持libzmq≥3.2.2(包括4.x)。 有关...

    Python库 | pyzmq-14.2.0-cp27-none-win_amd64.whl

    资源分类:Python库 所属语言:Python 使用前提:需要解压 资源全名:pyzmq-14.2.0-cp27-none-win_amd64.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | pyzmq-15.3.0.zip

    安装完成后,即可在Python代码中导入并使用`pyzmq`库: ```python import zmq # 创建上下文 context = zmq.Context() # 创建套接字 socket = context.socket(zmq.REQ) # 连接到远程服务器 socket.connect("tcp:/...

    pyzmq-16.0.2.tar.gz

    为了解决这个问题,PyZMQ应运而生,它是Python对ZeroMQ的一个封装,使得在Python环境中使用ZeroMQ变得更加便捷。本文将深入探讨PyZMQ 16.0.2这一版本,解析其核心功能和使用方法。 PyZMQ是Python的ZeroMQ接口,它...

    Python库 | pyzmq-18.1.0-cp38-cp38-manylinux1_x86_64.whl

    资源分类:Python库 所属语言:Python 使用前提:需要解压 资源全名:pyzmq-18.1.0-cp38-cp38-manylinux1_x86_64.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    Python库 | pyzmq-14.3.0-py3.3-macosx-10.6-intel.egg

    - 用户可以使用`easy_install`或`pip`工具安装pyzmq-14.3.0,解压后得到的`.egg`文件是一种Python的包格式,可以直接通过Python解释器导入使用。 - 为了更好地使用pyzmq,开发者需要了解ZeroMQ的通信模式,并根据...

    pyzmq‑22.3.0‑cp310‑cp310‑win32.whl

    离线安装包,测试可用。使用 pip install [完整包名] 进行安装

    PyPI 官网下载 | pyzmq-2.1.9-py2.7-win32.egg

    例如,使用`pyzmq`,你可以创建一个发布者(Publisher)来发送消息,同时创建一个订阅者(Subscriber)来接收这些消息,从而实现分布式系统的通信。 此外,`pyzmq`还支持其他高级特性,如管道(Pipes)、套接字选项...

    Python库 | pyzmq-14.2.0-cp34-none-win_amd64.whl

    6. **Concurrency(并发)**: `pyzmq`支持多线程和多进程并发模型,可以使用`pyzmq`的`Poller`类来轮询多个套接字的事件,提高程序的并行处理能力。 7. **Error Handling(错误处理)**: `pyzmq`提供了丰富的错误...

    pyzmq‑22.3.0‑pp37‑pypy37_pp73‑win_amd64.whl

    离线安装包,测试可用。使用 pip install [完整包名] 进行安装

    pyzmq‑22.3.0‑pp38‑pypy38_pp73‑win_amd64.whl

    离线安装包,测试可用。使用 pip install [完整包名] 进行安装

    PyPI 官网下载 | pyzmq-17.0.0-cp34-cp34m-win32.whl

    **使用pyzmq库** `pyzmq`库提供了丰富的API,可以实现以下功能: 1. 创建 ZeroMQ 套接字(sockets),包括PUB(发布者)、SUB(订阅者)、REQ(请求者)、REP(响应者)、DEALER(经销商)和ROUTER(路由器)等。 2...

    Python库 | pyzmq-17.1.0-cp27-cp27m-win32.whl

    以下是一个简单的REQ/REP模式的示例,展示了如何使用pyzmq创建一个请求-响应式的服务: ```python import zmq # 创建上下文 context = zmq.Context() # 创建客户端Socket并连接到服务器 client = context.socket...

Global site tag (gtag.js) - Google Analytics