- 浏览: 361546 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
zzy2011266:
以上是特殊情况, 正确的方式是打开windowsshowVi ...
Android 出现 Your project contains errors, please fix them 。。。。 -
小奶牛:
BFGFG[b][/b]
PHP计算字符串长度 -
小奶牛:
[b][/b]YTYTY
PHP计算字符串长度 -
yuankunliu:
这样处理ppt里边的文本框在图片中有边框,知道怎么去掉不请问? ...
java转换ppt为图片 -
vtrtbb:
哦,我试试
简单的JS分页代码
We've been using Twitter's kestrel queue server for a while now at work, but only from our service layer, which is written in python. Now that we have some queueing needs from our application layer, written in PHP, I spent a few days this week adding queue support to our web application. I thought I'd share what I learned, and how I implemented it.
Goals
The kestrel server itself was pretty straightforward to get up and running. The only thing I would point out is that I recommend sticking to release branches, as master was fairly unstable when I tried to use it. Regarding implementing the client, there were a few goals I had in mind when I started:
Since kestrel is built on the memcache protocol, try and leverage an existing memcache client rather than build one from scratch
Utilize our existing batch job infrastructure, which I covered previously here, and make sure our multi-tenant needs are met
Keep the queue interface generic in case we change queue servers later
Utilize existing kestrel management tools, only build out the the functionality we need
With these goals in mind, I ended up with 4 components: a kestrel client, a producer, a consumer, and a very small CLI harness for running the consumer. But before I even coded anything, I set up kestrel web, a web UI for kestrel written by my co-worker Matt Erkkila. Kestrel web allows you to view statistics on kestrel, manage queues, as well as sort and filter queues based on manual inputs. Having this tool up and running from the get go made it easy to watch jobs get added and consumed from my test queue, and also easily flush out the queues as needed.
The Kestrel Client
I couldn't find any existing kestrel clients for PHP, so I started looking at the two memcache extensions: the older memcache, and Andrei Zmievski's memcached, the latter of which is based on the libmemcached library. I started with memcache, and while it worked fine initially, I quickly found that I could not modify timeouts. This interfered with the way kestrel recommends you poll it for new jobs, and I would see timeout errors from the memcache extension if you tried to set the poll timeout to 1 second or higher (the memcache default). The memcached extension does not have these issues, so I went with it.
The first gotcha I ran into was serialization. You can use memcached's serializer for writing to kestrel, but when it reads the data back, it doesn't recognize that it is serialized. So I just serialize the data manually in my client, and things work fine. One other thing to note is that you'll want to disable compression, or do it manually, as the memcached extension will automatically compress anything over 100 bytes by default, and will not decompress it when reading from kestrel.
The other issue is that if you want to use any custom kestrel commands, you can't. Since the application layer doesn't need anything fancy, the memcached extension will work fine for it. Once we need support for the upcoming monitor (batching) in kestrel 2, we may need to implement a kestrel client from scratch. Kestrel web supplies everything else we need right now.
Once the decision was made to use memcached, I wrote a light decorator for it, EC_KestrelClient. This handles instantiation of the memcached client, serialization, and helpers for some kestrel specific options to the GET command. It also has support for passing memcached specific options through it. The class ended up looking like this:
<?php /** * A thin kestrel client that wraps Memcached (libmemcached extension) * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_KestrelClient { /** * The Memcached instance * * @var Memcached */ protected $_memcached = null; /** * The Kestrel server IP * * @var string */ protected $_host = '127.0.0.1'; /** * The Kestrel server port * * @var string */ protected $_port = 22133; /** * Optional options, not currently used * * @var array */ protected $_options = array(); /** * Sets the host, port, and options to be used * * @param string $host The host to use, defaults to 127.0.0.1 * @param int $port The port to use, defaults to 22133 * @param array $options Memcached options, not currently used * * @return void */ public function __construct( $host = '127.0.0.1', $port = 22133, array $options = array() ) { $this->_host = $host; $this->_port = $port; $this->setOptions($options); } /** * Sets job data on the queue, json_encoding the value to avoid problematic * serialization. * * @param string $queue The queue name * @param mixed $data The data to store * * @return bool */ public function set($queue, $data) { // Local json serialization, as kestrel doesn't send serialization flags return $this->getMemcached()->set($queue, json_encode($data)); } /** * Reliably read an item off of the queue. Meant to be run in a loop, and * call closeReliableRead() when done to make sure the final job is not left * on the queue. * * @param mixed $queue The queue name to read from * @param int $timeout The timeout to wait for a job to appear * * @return array|false * @see closeReliableRead() */ public function reliableRead($queue, $timeout = 1000) { $queue = $queue . '/close/open/t=' . $timeout; $result = $this->getMemcached()->get($queue); if ($result === false) { return $result; } // Local json serialization, as kestrel doesn't send serialization flags return json_decode($result, true); } /** * Closes any existing open read * * @param string $queue The queue name * * @return false */ public function closeReliableRead($queue) { $queue = $queue . '/close'; return $this->getMemcached()->get($queue); } /** * Aborts an existing reliable read * * @param string $queue The queue name * * @return false */ public function abortReliableRead($queue) { $queue = $queue . '/abort'; return $this->getMemcached()->get($queue); } /** * Set an option to be used with the Memcached client. Not used. * * @param string $name The option name * @param value $value The option value * * @return void */ public function setOption($name, $value) { $this->_options[$name] = $value; } /** * Sets multiple options * * @param array $options Array of key/values to set * * @return void */ public function setOptions(array $options) { foreach ($options as $name => $value) { $this->setOption($name, $value); } } /** * Gets a current option's value * * @param string $name The option name * * @return mixed */ public function getOption($name) { if (isset($this->_options[$name])) { return $this->_options[$name]; } return null; } /** * Gets all current options * * @return array */ public function getOptions() { return $this->_options; } /** * Gets a singleton instance of the Memcached client * * @return Memcached */ public function getMemcached() { if ($this->_memcached === null) { $this->_initMemcached(); } return $this->_memcached; } /** * Initialized the Memcached client instance * * @return void */ protected function _initMemcached() { $this->_memcached = $this->_getMemcachedInstance(); foreach ($this->_options as $option => $value) { $this->_memcached->setOption($option, $value); } $this->_memcached->addServer($this->_host, $this->_port); $this->_memcached->setOption(Memcached::OPT_COMPRESSION, false); } // @codeCoverageIgnoreStart /** * Returns a new instance of Memcached. Abstracted for testing. * * @return Memcached */ protected function _getMemcachedInstance() { return new Memcached(); } // @codeCoverageIgnoreEnd }
view raw EC_KestrelClient.php This Gist brought to you by GitHub.
The Producer
The producer is very simple. It just formats the data into a standard structure, including current tenant information, namespaces the queue so it doesn't collide with other projects, and adds it to the queue. The producer looks like this:
<?php /** * Interface for adding jobs to a queue server * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_Producer { /** * Adds a job onto a queue * * @param string $queue The queue name to add a job to * @param string $jobName The job name for the consumer to run * @param mixed $data Optional additional data to pass to the job * * @return bool */ public function addJob($queue, $jobName, $data = null) { $item = array( 'instance' => EC::getCurrentInstanceName(), 'jobName' => $jobName ); if ($data !== null) { $item['data'] = $data; } // Namespace queue with project $queue = 'enterprise_' . $queue; $client = $this->_getKestrelClient(); return $client->set($queue, $item); } // @codeCoverageIgnoreStart /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */ protected function _getKestrelClient() { if (APPLICATION_ENV === 'testing') { throw new Exception(__METHOD__ . ' was not mocked when testing'); } static $client = null; if ($client === null) { $host = EC::getConfigOption('kestrel.host'); $port = EC::getConfigOption('kestrel.port'); $client = new EC_KestrelClient($host, $port); } return $client; } // @codeCoverageIgnoreEnd }
view raw EC_Producer.php This Gist brought to you by GitHub.
The Consumer
The consumer has a bit more to it, though still pretty straightforward. It's intended to be run from a monitoring tool like daemontools or supervisord, so there is a very small CLI harness that just passes the CLI arguments into EC_Consumer and runs it. After parsing the CLI arguments, EC_Consumer polls kestrel for new jobs, and runs them through our standard batch job infrastructure. Until we have more confidence in PHP's long running process ability, I added an optional maxium jobs argument, which will stop the consumer from processing more than X jobs and then terminate. The monitoring service (supervisord) will then just restart it in a matter of seconds. I also added an optional debug argument for testing, so you can see every action as it happens. The CLI harness looks like this:
#!/bin/env php <?php // External application bootstrapping require_once __DIR__ . '/cli_init.php'; // Instantiate and run the consumer $consumer = new EC_Consumer($argv); $consumer->run();
view raw consumer_cli.php This Gist brought to you by GitHub.
And the main consumer class, EC_Consumer, looks something like this:
<?php /** * Enterprise queue consumer interface, called by bin/consumer_cli.php * * @author Bill Shupp <hostmaster@shupp.org> * @copyright 2010-2011 Empower Campaigns */ class EC_Consumer { /** * Instance of {@link Zend_Console_Getopt} * * @var Zend_Console_Getopt */ protected $_opt = null; /** * Which APPLICATION_ENV to run under (see -e) * * @var string */ protected $_environment = null; /** * The kestrel server IP * * @var string */ protected $_host = null; /** * The kestrel server port * * @var int */ protected $_port = null; /** * The kestrel queue name to connect to * * @var string */ protected $_queue = null; /** * Whether we should show debug output * * @var bool */ protected $_debug = false; /** * Maximum # of jobs for this process to perform (for memory fail safe) * * @var int */ protected $_maxJobs = null; /** * Current job count * * @var int */ protected $_jobCount = 0; /** * Parses arguments from the command line and does error handling * * @param array $argv The $argv from bin/ecli.php * * @throw Zend_Console_Getopt_Exception on failure * @return void */ public function __construct(array $argv) { try { $opt = new Zend_Console_Getopt( array( 'environment|e=s' => 'environment name (e.g. development)' . ', required', 'server|s=s' => 'kestrel server, format of host:port' . ', required', 'queue|q=s' => 'queue name (e.g. crawler_campaign)' . ', required', 'max-jobs|m=s' => 'max jobs to run before exiting' . ', optional', 'debug|d' => 'show debug output' . ', optional', ) ); $opt->setArguments($argv); $opt->parse(); // Set environment if ($opt->e === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing environment' ); } $this->_environment = $opt->e; // @codeCoverageIgnoreStart if (!defined('APPLICATION_ENV')) { define('APPLICATION_ENV', $this->_environment); } // @codeCoverageIgnoreEnd // Set server if ($opt->s === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing server' ); } $parts = explode(':', $opt->s); if (count($parts) !== 2) { throw new Zend_Console_Getopt_Exception( 'Error: invalid server: ' . $opt->s ); } $this->_host = $parts[0]; $this->_port = $parts[1]; // Set queue if ($opt->q === null) { throw new Zend_Console_Getopt_Exception( 'Error: missing queue' ); } $this->_queue = $opt->q; // Set max-jobs if ($opt->m !== null) { $this->_maxJobs = $opt->m; } // Set debug if ($opt->d !== null) { $this->_debug = true; } } catch (Zend_Console_Getopt_Exception $e) { echo "\n" . $e->getMessage() . "\n\n"; echo $opt->getUsageMessage(); // @codeCoverageIgnoreStart if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') { exit(1); } // @codeCoverageIgnoreEnd } $this->_opt = $opt; } /** * Polls the queue server for jobs and runs them as they come in * * @return void */ public function run() { $client = $this->_getKestrelClient(); $queue = 'enterprise_' . $this->_queue; while ($this->_keepRunning()) { // Pull job from queue $job = $client->reliableRead($queue, 500); if ($job === false) { $this->_debug('Nothing on queue ' . $queue); continue; } if (!isset($job['instance'])) { echo 'Instance not set in queue job: ' . print_r($job, true); continue; } $instance = $job['instance']; if (!isset($job['jobName'])) { echo 'Job name not set in queue job: ' . print_r($job, true); continue; } $jobName = $job['jobName']; $data = null; if (isset($job['data'])) { $data = $job['data']; } // Run the job $returnCode = $this->runJob($instance, $jobName, $data); if ($returnCode !== 0) { $client->abortReliableRead($queue); continue; } } $client->closeReliableRead($queue); } /** * Runs the job via bin/ecli.php * * @param string $instance The instance name to run the job under * @param string $jobName The job name * @param string $data Optional extra data * * @return int */ public function runJob($instance, $jobName, $data) { $cmd = BASE_PATH . '/bin/ecli.php ' . '-e ' . $this->_environment . ' -i ' . $instance . ' -j ' . $jobName; if ($data) { $cmd .= " '" . base64_encode(json_encode($data)) . "'"; } $returnCode = $this->_passthru($cmd); $this->_jobCount++; $this->_debug('Job count: ' . $this->_jobCount); return $returnCode; } /** * Check to see if the job limit has been reached * * @return bool */ protected function _keepRunning() { return ($this->_maxJobs === null) ? true : ($this->_jobCount < $this->_maxJobs); } /** * Show debug messages * * @param mixed $message * * @return void */ protected function _debug($message) { if (!$this->_debug) { return; } echo $message . "\n"; } // @codeCoverageIgnoreStart /** * Calls the passthru() function and returns the exit code. Abstracted * for testing. * * @param string $cmd The command to execute * * @return int */ protected function _passthru($cmd) { passthru($cmd, $returnCode); return $returnCode; } /** * Gets a single instance of EC_KestrelClient. Abstracted for testing. * * @return void */ protected function _getKestrelClient() { if (APPLICATION_ENV === 'testing') { throw new Exception(__METHOD__ . ' was not mocked when testing'); } return new EC_KestrelClient($this->_host, $this->_port); } // @codeCoverageIgnoreEnd }
view raw EC_Consumer.php This Gist brought to you by GitHub.
Putting it together
Now that all the pieces are put together, let's take a look at in action. Adding example job "HelloWorld" to the queue "hello_world" from within our application looks something like this:
<?php $producer = new EC_Producer(); $producer->addJob('hello_world', 'HelloWorld', array('foo' => 'bar')); ?> view raw gistfile1.php This Gist brought to you by GitHub.
And finally, here's an example of running the consumer from the CLI harness, along with some example debug output of processing the job:
./bin/consumer_cli.php -e development -s 127.0.0.1:22133 -q hello_world -d -m 2
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
[foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 1
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Nothing on queue enterprise_hello_world
Running EC_Job_HelloWorld on instance dev under environment development
Hello, world! Here is my data array:
stdClass Object
(
[foo] => bar
)
And here are my args: ./bin/ecli.php eyJmb28iOiJiYXIifQ==
Completed job in 0 seconds.
Job count: 2
view raw example.txt This Gist brought to you by GitHub.
That's it! I'd be interested to hear how other folks are interfacing with kestrel from PHP.
发表评论
-
Apache2.4取不到Authorization参数解决办法
2018-11-22 13:02 1636请求时在请这里是列表文本求头上带上Authorization ... -
composer 安装PHPUnit
2016-09-01 14:45 730折腾了几天的PHPUnit 安装,开始采用pear的方式安 ... -
pear PHPUnit 错误处理
2016-08-29 19:01 492在执行 pear install pear.phpunit. ... -
php正则解析apache日志文件文件
2016-03-01 19:11 1802可以把日志按小时滚 ... -
redis操作类(支持主从)
2016-01-25 18:11 1008基本配置文件: <?php $CONFIG_RE ... -
Yii框架数据库分库设计
2015-11-21 12:57 860现在项目中多数会用到数据库多库的切换场景,在Yii中是如何实 ... -
Yii框架数据库写分离设计
2015-11-20 11:51 527Yii框架在php中算是个流行的框架了,目前多数开发者都在使 ... -
php-ext-trie-filter 过滤关键词
2015-11-06 12:14 1524关键词过滤扩展,用于检查一段文本中是否出现敏感词 ... -
Yii框架基本操作
2014-04-25 18:12 840public function getMinLimit () ... -
关键词过滤扩展,用于检查一段文本中是否出现敏感词
2014-03-12 11:06 1742https://github.com/wulijun/php ... -
CI框架传递数组到视图View层
2013-09-13 11:17 13755视图是用户用户能看到你的网站的所有。 他们使用一个统一的接口 ... -
php CI数据库操作整理
2013-09-09 15:15 5023php框架codeigniter数据库操作整理 1. $ ... -
CI 框架从哪里看起?CI框架怎么开始学习,CI的初始设置
2013-09-08 11:14 833配置CI: application/config/conf ... -
coreseek 生成索引时xmlpipe2 support NOT compiled in. To use xmlpipe2, install missing
2013-08-28 23:45 2297coreseek 生成索引时: $ cd testp ... -
centos6.2安装coreseek4 错误
2013-08-28 23:12 3222安装mmseg3时出现config.st ... -
centos6.2 安装php5.3错误
2013-08-26 23:14 846安装过程中出现小问题: ./configure --pr ... -
[转]php 处理 csv文件
2013-02-27 18:08 992<form enctype="multipa ... -
php 监测远程文件是否存在
2012-10-12 15:21 992function remote_file_exists ... -
BugFree说明
2012-08-06 11:55 796BugFree 说明 Fixed 已修复 ... -
Php比较字符串相似度函数的利用
2012-07-30 17:47 1033similar_text() 函数计算两个字符串的匹配字符的数 ...
相关推荐
Kestrel是一款高性能、轻量级的消息队列系统,最初由Twitter开发并开源。它主要被设计用来处理实时流数据,提供了一个简单的基于HTTP的API来发送和接收消息。Kestrel的一个关键特性是其持久化能力,这使得即使在...
Kestrel是一个高性能、异步的分布式消息队列,而XMemcached则是一个广泛使用的Java客户端,用于连接到Memcached缓存服务器。在这里,我们将会探讨这两个技术的基本概念、它们在IT领域的应用以及如何将它们结合使用。...
首先,Kestrel是一个开源的、基于内存的分布式消息队列系统,它主要由Twitter开发并维护。Kestrel以其高吞吐量和低延迟而著名,被广泛用于构建实时处理系统和微服务架构。它的核心特性包括持久化、多客户端支持以及...
IIS 充当起了反向代理,将流量转发给 Kestrel 并管理 Kestrel 进程。在 Linux 上,我们通常使用 NGINX 作为 Kestrel 的反向代理。 三、设置 Kestrel 在 ASP.NET Core 3.0 及更高版本,Kestrel 属于框架默认的配置...
【标题】:“征服 Kestrel” Kestrel,这个名字在IT行业中通常指的是Microsoft开发的一款开源、高性能、异步网络库,它是ASP.NET Core框架的一部分,用于构建web服务器。Kestrel设计的目标是提供一个轻量级、可靠且...
在ASP.NET Core中,如果在Kestrel中想使用HTTPS对站点进行加密传输,可以按照如下方式 申请证书 这一步就不详细说了,有免费的和收费的,申请完成之后会给你一个*.pfx结尾的文件。 添加NuGet包 nuget中...
本测试报告涉及到多个消息队列服务的安装部署及性能测试,包括ActiveMQ、HTTPSQS、Kestrel和MemcacheQ。下面将对这些知识点进行详细的阐述。 首先,ActiveMQ是Apache软件基金会开发的一款开源消息代理,它是基于...
这个项目名为"kestrel-task-executor",它结合了Kestrel消息队列、XMemcached缓存客户端以及Spring的TaskExecutor框架,构建了一个强大的任务处理平台。以下将详细解析这个项目的组成部分和它们如何协同工作。 1. *...
**Kestrel框架详解** Kestrel框架是ASP.NET Core的一部分,它是一个高度可配置、高性能的Web服务器,被设计用于构建跨平台的现代Web应用。Kestrel自.NET Core 1.0版本起就已成为默认的Web服务器,支持Windows、...
资源分类:Python库 所属语言:Python 资源全名:kestrel-lang-1.0.5.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
addlog-kestrel
红隼节点Node.js 的 Kestrel 客户端安装 npm install kestrel.node用法 var Kestrel = require ( 'kestrel.node' ) ;var client = new Kestrel ( 'localhost:22133' ) ;// get can optionally take a timeout in ...
Kestrel是不是Unix或Windows的内核。
kestrel项目源文件包
这篇文章主要是记录如何将Kestrel的服务封装在WindowService中 关于WindowsServer 请参考如下这篇文章 .netcore worker service (辅助角色服务) 的上手入门,包含linux和windows服务部署 开发服务 之前做过.net5...
Kestrel以其灵活性和可靠性而著称,被广泛用于ASP.NET Core应用程序的部署。 **一、KestrelHttpServer与libuv的关系** libuv是一个跨平台的异步I/O库,它为多种操作系统提供了统一的API,用于处理网络连接和文件...
在某些情况下,当 Nginx 配置不当,尤其是 `Connection` 字段设置为 `Upgrade` 时,可能会导致 Kestrel(ASP.NET Core 的内置 web 服务器)返回 400 错误。这个问题通常与 WebSocket 协议升级有关。 WebSocket 是一...
"kestrel_lang-1.1.0-py3-none-any.whl"就是这样一个Python库的发行版,它适用于Python 3解释器。 首先,我们来了解一下`.whl`文件。`.whl`是Python的二进制分发格式,它是Python Wheel项目的产物。Wheel格式旨在...