官方给的那个pipeline例子很直观,当我们有些一大堆任务(如分析日志)需要用多个worker来工作时,可以不用hadoop这么重量级的产品,使用pipeline完全可以做到。这里做点点修改如下:
pipeline模型图如下:
1,首先需要个任务生成器:taskcreate.php
任务生成器负责PUSH任务内容到套接口,需要指定类型为SOCKET_PUSH。生成一堆随机数模拟每个任务所需时间
如,这里运行的结果为需要花费5s
<?php
$context = new ZMQContext();
//push message mode
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->bind("tcp://*:5557");
echo "Press Enter when the workers are ready: ";
$fp = fopen('php://stdin', 'r');
$line = fgets($fp, 512);
fclose($fp);
echo "Sending tasks to workers...", PHP_EOL;
// The first message is "0" and signals start of batch
$sender->send(0);
// Send 100 tasks
$total_msec = 0; // Total expected cost in msecs
for ($task_nbr = 0; $task_nbr < 100; $task_nbr++) {
// Random workload from 1 to 100msecs
$workload = mt_rand(1, 100);
$total_msec += $workload;
$sender->send($workload);
}
printf ("Total expected cost: %d msec\n", $total_msec);
sleep (1); // Give 0MQ time to deliver
?>
2,需要worker.php来具体处理任务
worker负责处理任务,启动多个worker能增强处理能力,缩短处理时间。这里多开启了个套接口订阅广播是为了全部任务处理完毕后汇总结果的程序通知worker.php关闭时用。
同时使用poll封装实现读写监听,区分不同的套接口。
<?php
$context = new ZMQContext();
// pull message from taskcreate.php
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->connect("tcp://localhost:5557");
// push message to result.php
$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->connect("tcp://localhost:5558");
// Socket for control input
$controller = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$controller->connect("tcp://localhost:5559");
$controller->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
// Process messages from receiver and controller
$poll = new ZMQPoll();
$poll->add($receiver, ZMQ::POLL_IN);
$poll->add($controller, ZMQ::POLL_IN);
$readable = $writeable = array();
// Process messages from both sockets
while (true) {
$events = $poll->poll($readable, $writeable);
if ($events > 0) {
foreach ($readable as $socket) {
if ($socket === $receiver) {
$message = $socket->recv();
// Simple progress indicator for the viewer
echo $message, PHP_EOL;
// Do the work : times $message by 1000
usleep($message * 1000);
// Send results to sink : result=$message*1000
$sender->send($message * 1000);
}
// Any waiting controller command acts as 'KILL'
else if ($socket === $controller) {
exit();
}
}
}
}
?>
3,最后汇总结果result.php
完毕后套接口发布消息,订阅了此消息的worker都会关闭。
<?php
$context = new ZMQContext();
// Socket to receive messages on
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->bind("tcp://*:5558");
// Socket for worker control
$controller = new ZMQSocket($context, ZMQ::SOCKET_PUB);
$controller->bind("tcp://*:5559");
// Wait for start of batch
$string = $receiver->recv();
// Process 100 confirmations
$tstart = microtime(true);
$total_msec = 0; // Total calculated cost in msecs
$sum = 0;
for ($task_nbr = 0; $task_nbr < 100; $task_nbr++) {
$string = $receiver->recv();
echo $string."\n";
}
$tend = microtime(true);
$total_msec = ($tend - $tstart) * 1000;
echo PHP_EOL;
printf ("Total elapsed time: %d msec", $total_msec);
echo PHP_EOL;
// Send kill signal to workers
$controller->send("KILL");
// Finished
sleep (1);
?>
运行方式如下:
1,开启多个worker.php(如开启2个)
2,开启result.php
3,运行taskcreate.php
可以看到原本需要花费5s的任务在3s就处理完毕了,如何将任务均匀地分配给多个worker官方文档中有详细建议。
- 大小: 15.1 KB
分享到:
相关推荐
消息队列zeromq的go语言测试实例包,为学习安装部署zeromq的同学提供,注意该安装包中的测试实例为go语言版本,要根据各位安装的zeromq版本下载。
消息队列模型是一种在分布式系统和并发编程中广泛使用的架构模式,它主要用于处理异步通信和解耦系统组件。在“消息队列模型”中,应用程序通过将消息发送到一个中间的消息队列,而不是直接调用其他服务或进程,从而...
zeromq是一个强大的开源消息队列系统,它提供了一种高效、灵活且可扩展的通信框架,用于构建分布式应用程序。消息队列在软件开发中扮演着重要的角色,它允许不同进程或服务之间异步传递消息,提高了系统的并行性和...
zeromq库是一个高效的消息队列系统,常用于构建分布式应用程序。它的设计目标是提供一个简单易用、高性能且可移植的接口,使得开发者能够轻松地实现进程间通信(IPC)和网络通信。zeromq提供了多种编程语言的API,...
2. **消息队列的类型**:常见的消息队列有RabbitMQ、Kafka、ActiveMQ、ZeroMQ等。这些消息队列各有特点,例如RabbitMQ支持多种协议,Kafka擅长大数据流处理,而ZeroMQ则提供了轻量级的解决方案。 3. **消息模型**:...
zeromq(ZeroMQ)是一个开源的消息中间件,它提供了高级消息队列的功能,可以实现进程间通信(IPC)和网络间通信(IPC),支持多种协议,如TCP、UDP、PGM以及PUB/SUB、REQ/REP、DEALER/ROUTER等多种消息模式。zeromq...
主流的分布式消息队列如ZeroMQ、RabbitMQ等,实现了对AMQP协议的支持。分布式消息队列不仅需要满足基本的分布式和队列特性,还需关注基本消息模型、功能和性能、接口封装度、可靠性和扩展性等关键需求指标。 综上所...
零MQ(ZeroMQ)是一种高性能、轻量级的消息队列系统,它被广泛应用于分布式计算环境中,用于在不同进程间高效地传输数据。ZeroMQ不仅仅是一个消息队列,它更像一个网络通信框架,提供了多种高级通信模式,如发布/...
此外,还可以使用ZeroMQ,这是一个轻量级的库,提供了一套消息队列的接口,可以在C和C++中直接使用。 在压缩包中的“C,C++源码”可能包含了实现消息队列的示例代码或者使用这些库的教程。通过这些源码,你可以学习...
ZeroMQ,又称为0MQ或ØMQ,是一个高度可扩展的、高性能的开源消息队列系统,它在设计上借鉴了传统的消息中间件概念,但更注重轻量级和灵活性。这个库允许开发者构建分布式应用,通过在进程间传递消息来实现异步通信...
在我们的系统中,ZeroMQ可以作为消息队列内部通信的一种补充,例如,当消息需要跨进程传输或者与其他系统交互时,zmq可以提供低延迟、高效率的解决方案。 五、配置文件(para.ini) para.ini文件通常用于存储系统...
### 消息队列的理解与应用 #### 一、什么是消息队列? 消息队列是一种在消息的传输过程中用于保存消息的容器。这里的“消息”指的是在两台计算机之间传送的数据单元,它可以是非常简单的文本字符串,也可以是包含...
消息队列在IT行业中是一种非常重要的中间件技术,主要用于解耦系统组件,提供异步通信的能力,以及提高系统的处理能力和可扩展性。在本话题中,我们将深入探讨消息队列的实现,特别是与VC(Visual C++)相关的实现...
在实际生产环境中,常见的消息队列中间件有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ和RocketMQ等。 **消息队列工作原理** 消息队列通常包含三个角色:队列服务端、消息生产者和消息消费者。服务端负责接收和...
ZeroMQ,又称ØMQ或0MQ,是一个轻量级的、高性能的消息队列系统,广泛应用于分布式计算和微服务架构中。这本书的高清PDF版本带有书签和完整目录,方便读者快速定位和查阅相关内容。 ZeroMQ的核心理念是提供一种简单...
ZeroMQ,又称为0MQ或ØMQ,是一个开源的消息队列系统,被广泛应用于云环境中的高速消息通信。它提供了一种轻量级、高性能、模式化的通信机制,使得应用程序可以像发送和接收数据到内存一样简单高效地进行跨网络通信...
在Linux环境中,ZeroMQ通过消息队列机制实现了进程间的通信,使得数据可以在不同程序之间安全、有序地传输。这个版本的压缩包文件名后缀是 ".tar.gz",表明这是一个经过GNU tar工具压缩的归档文件,通常用于在Unix-...
**零MQ(ZeroMQ)**,也常写作0MQ,是一种高效、轻量级的消息队列中间件,它被广泛赞誉为史上最强大的消息中间件。它的核心特性在于能够在极低的延迟下——如30微秒内——完成消息的传递,这种极致的性能使其在实时...
ZeroMQ(有时也被称作0MQ或 ØMQ)是一种高级的分布式应用程序框架,它为开发人员提供了高效、灵活的消息队列机制,旨在解决分布式计算环境下的消息传递问题。ZeroMQ的设计理念是简单易用,并且支持多种编程语言,...