- 浏览: 2551758 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
AMAZON SQS(1)PHP Producer
1. Some Basic Information
Price: 1 million 2$
Send Throughput:
Receive Throughput:
Message Latency: 500 ms
Message Size: 256 KB
Batch Size: 10 message
2. Env Set Up
Install composer
> curl -sS https://getcomposer.org/installer | php
Install the latest Package
> php composer.phar require aws/aws-sdk-php
this will install all the dependencies
> php composer.phar install
Here is my composer.json which will identify the dependencies.
{
"require": {
"aws/aws-sdk-php": "3.0.6",
"predis/predis": "1.0.1"
}
}
And command below will do the magic, something maven or ivy
>php composer.phar install
Some import classes will be as follow,
SQSConnector.inc
<?php
require 'vendor/autoload.php';
use Aws\Sqs\SqsClient;
class SQSConnector {
const queueURL = "https://sqs.us-east-1.amazonaws.com/xx_test";
protected $client = null;
function __construct(){
try {
$this->client = SqsClient::factory(array(
'region' => 'us-east-1',
'version' => 'latest'
));
echo "Successfully connected to SQS\n";
} catch (Exception $e) {
echo "Couldn't connected to SQS";
echo $e->getMessage();
}
}
}
A time track class which can provide us more detail about the time consuming during the process, TimeTracker.inc
<?php
class TimeTracker implements Countable {
protected $count = 0;
protected $distribution;
protected $distributionPrecision = 0;
protected $startTime;
protected $totalTime;
protected $name;
public function __construct($name, $distributionPrecision = 2) {
$this->name = $name;
$this->distribution = array();
$this->distributionPrecision = $distributionPrecision;
$this->start();
}
public function count() {
return $this->count;
}
public function start($time = null) {
return $this->startTime = ($time === null) ? microtime(true) : $time;
}
public function stop($count = 1, $time = null) {
$interval = (($time === null) ? microtime(true) : $time) - $this->startTime;
$this->totalTime += $interval;
$key = (string)round($interval, $this->distributionPrecision);
if (!array_key_exists($key, $this->distribution)) $this->distribution[$key] = 0;
$this->distribution[$key]++;
$this->count += $count;
return $interval;
}
public function __toString() {
ksort($this->distribution);
//return "{$this->name}: " . print_r($this->distribution, true) .
return "{$this->name}: {$this->count}m " . round($this->totalTime,2) . 's ' .
round($this->count/$this->totalTime,1) . "m/s\n";
}
}
Async Send the message via PHP in SQSSender.php
#!/usr/bin/php
<?php
require_once 'TimeTracker.inc';
require_once 'SQSConnector.inc';
use GuzzleHttp\Promise;
define('MSGNUM_REQUEST', 10);
define('MAXSIZE_REQUEST', 262144);
class SQSSender extends SQSConnector {
function sendMessage($job) {
$timer = new TimeTracker('sendMessage');
$promise_array = array();
if(strlen($job) >= MAXSIZE_REQUEST)
{
echo "The size of message is larger than 256KB\n";
var_dump($job);
return;
}
$promise = $this->client->sendMessageAsync(array(
'QueueUrl' => self::queueURL,
'MessageBody' => $job,
));
$promise->wait();
$timer->stop();
print $timer;
}
function sendMessageBatch($jobs, $timer) {
$promise_array = array();
$id=1;
$message = '';
$msg_num = 0;
foreach($jobs as $job)
{
$maxsize_msg = (int)(MAXSIZE_REQUEST/MSGNUM_REQUEST);
$tmp_msg = $message.$job;
if(strlen($job) >= $maxsize_msg)
{
$this->sendMessage($job);
}
else if(strlen($tmp_msg) >= $maxsize_msg)
{
$entries[] = array(
'Id' => $id,
'MessageBody' => $message
);
$id++;
$message = $job;
if($id > 10){
$promise = $this->client->sendMessageBatchAsync(array(
'QueueUrl' => self::queueURL,
'Entries' => $entries,
));
$entries = array();
$id = 1;
$promise_array['key' . count($timer)] = $promise;
/*if(count($promise_array) % 50 == 0){
Promise\unwrap($promise_array); //unwrap the promise list and wait for all promise complete
$promise_array = array();
}*/
$timer->stop($msg_num);
$msg_num = 0;
$timer->start();
}
$msg_num++;
}
else
{
$message = $tmp_msg;
$msg_num++;
}
}
$entries[] = array(
'Id' => $id,
'MessageBody' => $message
);
$promise = $this->client->sendMessageBatchAsync(array(
'QueueUrl' => self::queueURL,
'Entries' => $entries,
));
$promise_array['key' . count($timer)] = $promise;
Promise\unwrap($promise_array); //unwrap the promise list and wait for all promise complete
$promise_array = array();
$timer->stop($msg_num);
print $timer;
$timer->start();
}
}
/*$sender = new SQSSender;
$promise = $sender->send(500);
if(count($promise) > 0){
Promise\unwrap($promises);
}*/
?>
Before we run the test, the AMAZON require us to configure the key here>
carl-mac:.aws carl$ pwd
/Users/carl/.aws
carl-mac:.aws carl$ ls -l
total 8
-rw-r--r-- 1 carl staff 116 Jul 6 17:00 credentials
carl-mac:.aws carl$
The content will be as follow:
[default]
aws_access_key_id = xxxxxxxx
aws_secret_access_key = xxxxxxxxxxxxx
References:
http://colby.id.au/benchmarking-sqs/
http://www.warski.org/blog/2014/06/benchmarking-sqs/
http://nsono.net/amazon-sqs-vs-rabbitmq/
kafka
http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
SQS
http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html
http://aws.amazon.com/documentation/sqs/
http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/promises.html
http://docs.aws.amazon.com/aws-sdk-php/v3/api/index.html
scala queue
https://github.com/adamw/mqperf/tree/master/src/main/scala/com/softwaremill/mqperf/mq
SQS auto scaling
https://aws.amazon.com/articles/Amazon-SQS/1464
1. Some Basic Information
Price: 1 million 2$
Send Throughput:
Receive Throughput:
Message Latency: 500 ms
Message Size: 256 KB
Batch Size: 10 message
2. Env Set Up
Install composer
> curl -sS https://getcomposer.org/installer | php
Install the latest Package
> php composer.phar require aws/aws-sdk-php
this will install all the dependencies
> php composer.phar install
Here is my composer.json which will identify the dependencies.
{
"require": {
"aws/aws-sdk-php": "3.0.6",
"predis/predis": "1.0.1"
}
}
And command below will do the magic, something maven or ivy
>php composer.phar install
Some import classes will be as follow,
SQSConnector.inc
<?php
require 'vendor/autoload.php';
use Aws\Sqs\SqsClient;
class SQSConnector {
const queueURL = "https://sqs.us-east-1.amazonaws.com/xx_test";
protected $client = null;
function __construct(){
try {
$this->client = SqsClient::factory(array(
'region' => 'us-east-1',
'version' => 'latest'
));
echo "Successfully connected to SQS\n";
} catch (Exception $e) {
echo "Couldn't connected to SQS";
echo $e->getMessage();
}
}
}
A time track class which can provide us more detail about the time consuming during the process, TimeTracker.inc
<?php
class TimeTracker implements Countable {
protected $count = 0;
protected $distribution;
protected $distributionPrecision = 0;
protected $startTime;
protected $totalTime;
protected $name;
public function __construct($name, $distributionPrecision = 2) {
$this->name = $name;
$this->distribution = array();
$this->distributionPrecision = $distributionPrecision;
$this->start();
}
public function count() {
return $this->count;
}
public function start($time = null) {
return $this->startTime = ($time === null) ? microtime(true) : $time;
}
public function stop($count = 1, $time = null) {
$interval = (($time === null) ? microtime(true) : $time) - $this->startTime;
$this->totalTime += $interval;
$key = (string)round($interval, $this->distributionPrecision);
if (!array_key_exists($key, $this->distribution)) $this->distribution[$key] = 0;
$this->distribution[$key]++;
$this->count += $count;
return $interval;
}
public function __toString() {
ksort($this->distribution);
//return "{$this->name}: " . print_r($this->distribution, true) .
return "{$this->name}: {$this->count}m " . round($this->totalTime,2) . 's ' .
round($this->count/$this->totalTime,1) . "m/s\n";
}
}
Async Send the message via PHP in SQSSender.php
#!/usr/bin/php
<?php
require_once 'TimeTracker.inc';
require_once 'SQSConnector.inc';
use GuzzleHttp\Promise;
define('MSGNUM_REQUEST', 10);
define('MAXSIZE_REQUEST', 262144);
class SQSSender extends SQSConnector {
function sendMessage($job) {
$timer = new TimeTracker('sendMessage');
$promise_array = array();
if(strlen($job) >= MAXSIZE_REQUEST)
{
echo "The size of message is larger than 256KB\n";
var_dump($job);
return;
}
$promise = $this->client->sendMessageAsync(array(
'QueueUrl' => self::queueURL,
'MessageBody' => $job,
));
$promise->wait();
$timer->stop();
print $timer;
}
function sendMessageBatch($jobs, $timer) {
$promise_array = array();
$id=1;
$message = '';
$msg_num = 0;
foreach($jobs as $job)
{
$maxsize_msg = (int)(MAXSIZE_REQUEST/MSGNUM_REQUEST);
$tmp_msg = $message.$job;
if(strlen($job) >= $maxsize_msg)
{
$this->sendMessage($job);
}
else if(strlen($tmp_msg) >= $maxsize_msg)
{
$entries[] = array(
'Id' => $id,
'MessageBody' => $message
);
$id++;
$message = $job;
if($id > 10){
$promise = $this->client->sendMessageBatchAsync(array(
'QueueUrl' => self::queueURL,
'Entries' => $entries,
));
$entries = array();
$id = 1;
$promise_array['key' . count($timer)] = $promise;
/*if(count($promise_array) % 50 == 0){
Promise\unwrap($promise_array); //unwrap the promise list and wait for all promise complete
$promise_array = array();
}*/
$timer->stop($msg_num);
$msg_num = 0;
$timer->start();
}
$msg_num++;
}
else
{
$message = $tmp_msg;
$msg_num++;
}
}
$entries[] = array(
'Id' => $id,
'MessageBody' => $message
);
$promise = $this->client->sendMessageBatchAsync(array(
'QueueUrl' => self::queueURL,
'Entries' => $entries,
));
$promise_array['key' . count($timer)] = $promise;
Promise\unwrap($promise_array); //unwrap the promise list and wait for all promise complete
$promise_array = array();
$timer->stop($msg_num);
print $timer;
$timer->start();
}
}
/*$sender = new SQSSender;
$promise = $sender->send(500);
if(count($promise) > 0){
Promise\unwrap($promises);
}*/
?>
Before we run the test, the AMAZON require us to configure the key here>
carl-mac:.aws carl$ pwd
/Users/carl/.aws
carl-mac:.aws carl$ ls -l
total 8
-rw-r--r-- 1 carl staff 116 Jul 6 17:00 credentials
carl-mac:.aws carl$
The content will be as follow:
[default]
aws_access_key_id = xxxxxxxx
aws_secret_access_key = xxxxxxxxxxxxx
References:
http://colby.id.au/benchmarking-sqs/
http://www.warski.org/blog/2014/06/benchmarking-sqs/
http://nsono.net/amazon-sqs-vs-rabbitmq/
kafka
http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
SQS
http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html
http://aws.amazon.com/documentation/sqs/
http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/promises.html
http://docs.aws.amazon.com/aws-sdk-php/v3/api/index.html
scala queue
https://github.com/adamw/mqperf/tree/master/src/main/scala/com/softwaremill/mqperf/mq
SQS auto scaling
https://aws.amazon.com/articles/Amazon-SQS/1464
发表评论
-
Stop Update Here
2020-04-28 09:00 316I will stop update here, and mo ... -
NodeJS12 and Zlib
2020-04-01 07:44 475NodeJS12 and Zlib It works as ... -
Docker Swarm 2020(2)Docker Swarm and Portainer
2020-03-31 23:18 368Docker Swarm 2020(2)Docker Swar ... -
Docker Swarm 2020(1)Simply Install and Use Swarm
2020-03-31 07:58 369Docker Swarm 2020(1)Simply Inst ... -
Traefik 2020(1)Introduction and Installation
2020-03-29 13:52 336Traefik 2020(1)Introduction and ... -
Portainer 2020(4)Deploy Nginx and Others
2020-03-20 12:06 431Portainer 2020(4)Deploy Nginx a ... -
Private Registry 2020(1)No auth in registry Nginx AUTH for UI
2020-03-18 00:56 436Private Registry 2020(1)No auth ... -
Docker Compose 2020(1)Installation and Basic
2020-03-15 08:10 374Docker Compose 2020(1)Installat ... -
VPN Server 2020(2)Docker on CentOS in Ubuntu
2020-03-02 08:04 455VPN Server 2020(2)Docker on Cen ... -
Buffer in NodeJS 12 and NodeJS 8
2020-02-25 06:43 385Buffer in NodeJS 12 and NodeJS ... -
NodeJS ENV Similar to JENV and PyENV
2020-02-25 05:14 478NodeJS ENV Similar to JENV and ... -
Prometheus HA 2020(3)AlertManager Cluster
2020-02-24 01:47 423Prometheus HA 2020(3)AlertManag ... -
Serverless with NodeJS and TencentCloud 2020(5)CRON and Settings
2020-02-24 01:46 337Serverless with NodeJS and Tenc ... -
GraphQL 2019(3)Connect to MySQL
2020-02-24 01:48 247GraphQL 2019(3)Connect to MySQL ... -
GraphQL 2019(2)GraphQL and Deploy to Tencent Cloud
2020-02-24 01:48 451GraphQL 2019(2)GraphQL and Depl ... -
GraphQL 2019(1)Apollo Basic
2020-02-19 01:36 328GraphQL 2019(1)Apollo Basic Cl ... -
Serverless with NodeJS and TencentCloud 2020(4)Multiple Handlers and Running wit
2020-02-19 01:19 314Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(3)Build Tree and Traverse Tree
2020-02-19 01:19 318Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(2)Trigger SCF in SCF
2020-02-19 01:18 294Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(1)Running with Component
2020-02-19 01:17 312Serverless with NodeJS and Tenc ...
相关推荐
// create custom producer (supporting all opts as per the API docs: http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#constructor-property) const producer = Producer . create ( { ...
Squiss, 用于 node.js的Amazon SQS轮询器 Squiss node.js 4和更高版本的Amazon SQS轮询器和单个队列客户端const poller = new Squiss({ queueName: 'my-sqs-queue', bodyForma
亚马逊SQS大数据云架构
亚马逊简单队列服务(Amazon Simple Queue Service,简称SQS)是亚马逊云服务(AWS)提供的一种高度可扩展、可靠且完全托管的消息队列服务。它允许应用程序之间通过消息进行通信,而无需直接调用彼此,从而降低了...
这是一个PHP类,旨在为Amazon的Simple Queue Service(SQS)提供基于REST的简单接口。
官方版本,亲测可用
**Laravel 开发与 Amazon SQS (Simple Queue Service) 集成详解** 在 Laravel 开发中,我们经常需要处理异步任务,以提高应用程序的响应速度和性能。Amazon SQS(Simple Queue Service)是一种完全托管的消息队列...
**Python-SimpleQ:亚马逊SQS的简单可伸缩队列实现** 在现代软件开发中,事件驱动和异步处理是提升系统性能和可扩展性的关键。Python SimpleQ 是一个针对亚马逊 Simple Queue Service (SQS) 的轻量级库,它提供了一...
Amazon SQS Java 消息传递库 Amazon SQS Java Messaging Library包含 Java Message Service 兼容类,用于与 Amazon Simple Queue Service 进行通信。 该项目构建在适用于 Java 的 AWS 开发工具包之上,以使用 Amazon...
SQS ( { region : '' , accessKeyId : '' , secretAccessKey : ''} )var SQSWorker = require ( 'sqs-taskqueue' ) , queue = new SQSWorker ( sqs , 'https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXX/' , '
SQS 是亚马逊云服务(AWS)提供的一种完全托管的消息队列服务。它允许应用程序组件之间进行异步通信,解耦处理任务,提高系统的可扩展性和可靠性。SQS 提供两种队列类型:标准队列和 FIFO (First-In-First-Out) 队列...
有关在Amazon SQS上使用Broadway的更多详细信息,请参阅。安装将:broadway_sqs与您选择的HTTP客户端(默认为:hackney mix.exs一起添加到mix.exs中的依赖项列表中: def deps do [ { :broadway_sqs , " ~> 0.6.0 " }...
带有Spring Cloud AWS和JMS的Amazon SQS示例该存储库是一个示例项目,如何设置Spring Boot + Spring Cloud AWS + SQS JMS客户端以读取消息并将消息发送到SQS。 用途: 用于SQS和JMS集成用于区域和AWS凭证配置的
1. **Amazon Simple Queue Service (SQS)**:AWS的SQS是一种完全托管的消息队列服务,允许应用程序之间异步通信,处理大量数据,确保消息传递的可靠性和持久性。 2. **Symfony Messenger**:Symfony的组件,负责...
3. **命令与消息队列**:Producer可能是一个命令类,通过Artisan命令行工具运行,或者它可能涉及消息队列(如RabbitMQ或SQS)。消息队列允许Producer发送消息,其他进程(消费者)可以接收并处理这些消息。 4. **...
支持队列Enqueue是麻省理工学院许可的开源项目,其持续发展完全取决于社区和我们客户的... 如果您想加入他们,请考虑:亚马逊SQS运输 这是队列互操作规范的实现。 它允许您使用服务发送和使用消息。资源执照它是根据。
适用于Vert.x的Amazon SQS客户端 此Vert.x客户端以两种方式允许Amazon SQS访问: 作为@VertxGen服务到Amazon SQS异步客户端方法的桥梁 作为Amazon SQS队列消耗Verticle Gradle/ Maven 添加添加以下依赖项: uy....
docker-SQS-local:在本地运行Amazon Simple Queue Service(Amazon SQS)的Docker映像
Amazon SQS Java临时队列客户端使用临时队列客户端,您可以创建轻型临时队列,这些临时队列在不再使用时会自动删除。 您可以将“临时队列客户端”用于常见的消息传递模式,例如“请求-响应”。 该库提供了两个互补的...
一种用于探索来自 Amazon SQS 的重复消息事件的工具,这是一种有利于可用性和至少一次交付的分布式消息队列。 dupeScore 旨在跨多个服务器操作队列工作者池,从单个 SQS 队列进行长轮询。 消息体经过 SHA1 哈希处理...