- 浏览: 2551770 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
AMAZON SQS(2)Java Client Consumer and Producer
Configuration File build.sbt
"com.amazonaws" % "aws-java-sdk" % "1.10.6", //
First class, SQS Client builder, SQSQueue.scala
package com.sillycat.jobsconsumer.messagequeue
import com.sillycat.jobsconsumer.utilities.{IncludeLogger, IncludeConfig}
import com.amazonaws.services.sqs.buffered.{QueueBufferConfig, AmazonSQSBufferedAsyncClient}
import com.amazonaws.services.sqs.{AmazonSQSClient, AmazonSQSAsyncClient}
import scala.collection.JavaConverters._
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest, SendMessageBatchRequestEntry}
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.auth.BasicAWSCredentials
object SQSQueue extends IncludeLogger with IncludeConfig{
private def getCredential = {
new BasicAWSCredentials(
config.getString(envStr("sqs.keyId")),
config.getString(envStr("sqs.accessKey")))
}
def getAsyncClient = {
val client = new AmazonSQSAsyncClient(getCredential)
client.setRegion(Region.getRegion(Regions.fromName(config.getString(envStr("sqs.region")))))
client
}
}
At first, I am thinking that I can have a embedded SQS server, actually I found one, but there is AKKA system conflict there. So I do not want to speed more time on that.
package com.sillycat.jobsconsumer.messagequeue
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.{ReceiveMessageRequest}
import com.sillycat.jobsconsumer.utilities.{IncludeLogger, IncludeConfig}
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
class SQSQueueSpec extends FunSpec with Matchers with BeforeAndAfterAll with IncludeConfig with IncludeLogger{
val queueName = config.getString(envStr("sqs.queueName.rawjobs"))
var client: AmazonSQSAsyncClient= _
var queueUrl: String = _
override def beforeAll(): Unit = {
//build client
if(config.getString("build.env").equals("test")){
logger.info("No embedded SQS, ignore the test")
}else{
client = SQSQueue.getAsyncClient
queueUrl = client.getQueueUrl(queueName).getQueueUrl
}
}
override def afterAll() {
}
describe("IncludeSQSProducer") {
describe("sendMessage"){
it("Directly Send String") {
if(config.getString("build.env").equals("test")){
logger.info("No embedded SQS, ignore the test")
}else{
val expect = "134343143"
client.sendMessage(queueUrl ,expect)
val msgs = client
.receiveMessage(new ReceiveMessageRequest((
queueUrl
)).withMaxNumberOfMessages(10)).getMessages
msgs.size() should be (1)
msgs.get(0).getBody should be (expect)
client.deleteMessage(queueUrl,msgs.get(0).getReceiptHandle)
}
}
}
}
}
Write some code in the trait to make the consumer and producer easier.
The consumer trait, IncludeSQSConsumer.scala
package com.sillycat.jobsconsumer.messagequeue.consumer
import com.amazonaws.services.sqs.model.Message
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest}
import com.sillycat.jobsconsumer.messagequeue.SQSQueue
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
import scala.collection.JavaConverters._
trait IncludeSQSConsumer extends IncludeLogger with IncludeConfig{
protected def queueName = "default"
while(true){
SQSQueue.getAsyncClient
.receiveMessage(new ReceiveMessageRequest((getQueueUrl)).withMaxNumberOfMessages(10))
.getMessages
.asScala
.map(handleMessage)
.foreach(deleteMessage)
}
protected def getQueueUrl:String = {
SQSQueue.getAsyncClient.getQueueUrl(queueName).getQueueUrl
}
protected def handleMessage(msg: Message): Message
protected def deleteMessage(msg: Message) = {
logger.debug("Deleting Message after operation " + msg.getReceiptHandle)
SQSQueue.getAsyncClient.deleteMessageAsync(
new DeleteMessageRequest(getQueueUrl,msg.getReceiptHandle))
}
}
Simple Consumer, RawjobConsumer.scala
package com.sillycat.jobsconsumer.messagequeue.consumer
import com.amazonaws.services.sqs.model.{Message}
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
object RawJobConsumer extends App with IncludeSQSConsumer with IncludeLogger with IncludeConfig{
override protected def queueName = {
val queueName = config.getString(envStr("sqs.queueName.rawjobs"))
logger.debug("Polling the Raw Job Message from Queue: " + queueName)
queueName
}
protected def handleMessage(msg: Message): Message = {
logger.debug("Receiving Message from SQS " + msg.getBody)
msg
}
}
trait of producer, IncludeSQSProducer.scala
package com.sillycat.jobsconsumer.messagequeue.producer
import com.amazonaws.services.sqs.model.{SendMessageRequest, SendMessageBatchRequestEntry}
import com.sillycat.jobsconsumer.messagequeue.SQSQueue
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
trait IncludeSQSProducer extends IncludeLogger with IncludeConfig{
protected def queueName = "default"
def sendMessage(sourceMsg:Any): Unit = {
logger.debug("Send the Message to Queue " + getQueueUrl)
val msg = new SendMessageRequest(getQueueUrl,prepareMessage(sourceMsg))
SQSQueue.getAsyncClient.sendMessage(msg)
}
protected def prepareMessage(sourceMsg:Any):String
private def getQueueUrl:String = {
SQSQueue.getAsyncClient.getQueueUrl(queueName).getQueueUrl
}
}
Simple Producer, ClassifiedJobProducer.scala
package com.sillycat.jobsconsumer.messagequeue.producer
object ClassifiedJobProducer extends IncludeSQSProducer{
override protected def queueName = {
val queueName = config.getString(envStr("sqs.queueName.classifiedjobs"))
logger.debug("Send the classified Job Message to Queue: " + queueName)
queueName
}
def prepareMessage(sourceMsg:Any):String = {
sourceMsg.toString
}
}
References:
JAVA or SCALA
http://aws.amazon.com/java/
https://github.com/adamw/mqperf/tree/master/src/main/scala/com/softwaremill/mqperf/mq
old blog
https://github.com/luohuazju/sillycat-analyzer-java
https://github.com/adamw/mqperf
Message Compress
kryo
http://blog.csdn.net/rocklee/article/details/26706145
messagepack
http://web2.0coder.com/archives/347
http://www.cnblogs.com/peiandsky/archive/2012/04/24/2467766.html
protobuf
https://github.com/google/protobuf
https://github.com/drslump/Protobuf-PHP
serializing and deserializing XML
http://alvinalexander.com/scala/serializing-deserializing-xml-scala-classes
Testing SQS Server
http://maciejb.me/2012/10/17/testing-your-java-amazon-sqs-code-with-elasticmq/
https://github.com/adamw/elasticmq
Configuration File build.sbt
"com.amazonaws" % "aws-java-sdk" % "1.10.6", //
First class, SQS Client builder, SQSQueue.scala
package com.sillycat.jobsconsumer.messagequeue
import com.sillycat.jobsconsumer.utilities.{IncludeLogger, IncludeConfig}
import com.amazonaws.services.sqs.buffered.{QueueBufferConfig, AmazonSQSBufferedAsyncClient}
import com.amazonaws.services.sqs.{AmazonSQSClient, AmazonSQSAsyncClient}
import scala.collection.JavaConverters._
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest, SendMessageBatchRequestEntry}
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.auth.BasicAWSCredentials
object SQSQueue extends IncludeLogger with IncludeConfig{
private def getCredential = {
new BasicAWSCredentials(
config.getString(envStr("sqs.keyId")),
config.getString(envStr("sqs.accessKey")))
}
def getAsyncClient = {
val client = new AmazonSQSAsyncClient(getCredential)
client.setRegion(Region.getRegion(Regions.fromName(config.getString(envStr("sqs.region")))))
client
}
}
At first, I am thinking that I can have a embedded SQS server, actually I found one, but there is AKKA system conflict there. So I do not want to speed more time on that.
package com.sillycat.jobsconsumer.messagequeue
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.{ReceiveMessageRequest}
import com.sillycat.jobsconsumer.utilities.{IncludeLogger, IncludeConfig}
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
class SQSQueueSpec extends FunSpec with Matchers with BeforeAndAfterAll with IncludeConfig with IncludeLogger{
val queueName = config.getString(envStr("sqs.queueName.rawjobs"))
var client: AmazonSQSAsyncClient= _
var queueUrl: String = _
override def beforeAll(): Unit = {
//build client
if(config.getString("build.env").equals("test")){
logger.info("No embedded SQS, ignore the test")
}else{
client = SQSQueue.getAsyncClient
queueUrl = client.getQueueUrl(queueName).getQueueUrl
}
}
override def afterAll() {
}
describe("IncludeSQSProducer") {
describe("sendMessage"){
it("Directly Send String") {
if(config.getString("build.env").equals("test")){
logger.info("No embedded SQS, ignore the test")
}else{
val expect = "134343143"
client.sendMessage(queueUrl ,expect)
val msgs = client
.receiveMessage(new ReceiveMessageRequest((
queueUrl
)).withMaxNumberOfMessages(10)).getMessages
msgs.size() should be (1)
msgs.get(0).getBody should be (expect)
client.deleteMessage(queueUrl,msgs.get(0).getReceiptHandle)
}
}
}
}
}
Write some code in the trait to make the consumer and producer easier.
The consumer trait, IncludeSQSConsumer.scala
package com.sillycat.jobsconsumer.messagequeue.consumer
import com.amazonaws.services.sqs.model.Message
import com.amazonaws.services.sqs.model.{DeleteMessageRequest, ReceiveMessageRequest}
import com.sillycat.jobsconsumer.messagequeue.SQSQueue
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
import scala.collection.JavaConverters._
trait IncludeSQSConsumer extends IncludeLogger with IncludeConfig{
protected def queueName = "default"
while(true){
SQSQueue.getAsyncClient
.receiveMessage(new ReceiveMessageRequest((getQueueUrl)).withMaxNumberOfMessages(10))
.getMessages
.asScala
.map(handleMessage)
.foreach(deleteMessage)
}
protected def getQueueUrl:String = {
SQSQueue.getAsyncClient.getQueueUrl(queueName).getQueueUrl
}
protected def handleMessage(msg: Message): Message
protected def deleteMessage(msg: Message) = {
logger.debug("Deleting Message after operation " + msg.getReceiptHandle)
SQSQueue.getAsyncClient.deleteMessageAsync(
new DeleteMessageRequest(getQueueUrl,msg.getReceiptHandle))
}
}
Simple Consumer, RawjobConsumer.scala
package com.sillycat.jobsconsumer.messagequeue.consumer
import com.amazonaws.services.sqs.model.{Message}
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
object RawJobConsumer extends App with IncludeSQSConsumer with IncludeLogger with IncludeConfig{
override protected def queueName = {
val queueName = config.getString(envStr("sqs.queueName.rawjobs"))
logger.debug("Polling the Raw Job Message from Queue: " + queueName)
queueName
}
protected def handleMessage(msg: Message): Message = {
logger.debug("Receiving Message from SQS " + msg.getBody)
msg
}
}
trait of producer, IncludeSQSProducer.scala
package com.sillycat.jobsconsumer.messagequeue.producer
import com.amazonaws.services.sqs.model.{SendMessageRequest, SendMessageBatchRequestEntry}
import com.sillycat.jobsconsumer.messagequeue.SQSQueue
import com.sillycat.jobsconsumer.utilities.{IncludeConfig, IncludeLogger}
trait IncludeSQSProducer extends IncludeLogger with IncludeConfig{
protected def queueName = "default"
def sendMessage(sourceMsg:Any): Unit = {
logger.debug("Send the Message to Queue " + getQueueUrl)
val msg = new SendMessageRequest(getQueueUrl,prepareMessage(sourceMsg))
SQSQueue.getAsyncClient.sendMessage(msg)
}
protected def prepareMessage(sourceMsg:Any):String
private def getQueueUrl:String = {
SQSQueue.getAsyncClient.getQueueUrl(queueName).getQueueUrl
}
}
Simple Producer, ClassifiedJobProducer.scala
package com.sillycat.jobsconsumer.messagequeue.producer
object ClassifiedJobProducer extends IncludeSQSProducer{
override protected def queueName = {
val queueName = config.getString(envStr("sqs.queueName.classifiedjobs"))
logger.debug("Send the classified Job Message to Queue: " + queueName)
queueName
}
def prepareMessage(sourceMsg:Any):String = {
sourceMsg.toString
}
}
References:
JAVA or SCALA
http://aws.amazon.com/java/
https://github.com/adamw/mqperf/tree/master/src/main/scala/com/softwaremill/mqperf/mq
old blog
https://github.com/luohuazju/sillycat-analyzer-java
https://github.com/adamw/mqperf
Message Compress
kryo
http://blog.csdn.net/rocklee/article/details/26706145
messagepack
http://web2.0coder.com/archives/347
http://www.cnblogs.com/peiandsky/archive/2012/04/24/2467766.html
protobuf
https://github.com/google/protobuf
https://github.com/drslump/Protobuf-PHP
serializing and deserializing XML
http://alvinalexander.com/scala/serializing-deserializing-xml-scala-classes
Testing SQS Server
http://maciejb.me/2012/10/17/testing-your-java-amazon-sqs-code-with-elasticmq/
https://github.com/adamw/elasticmq
发表评论
-
Update Site will come soon
2021-06-02 04:10 1677I am still keep notes my tech n ... -
Stop Update Here
2020-04-28 09:00 316I will stop update here, and mo ... -
NodeJS12 and Zlib
2020-04-01 07:44 475NodeJS12 and Zlib It works as ... -
Docker Swarm 2020(2)Docker Swarm and Portainer
2020-03-31 23:18 368Docker Swarm 2020(2)Docker Swar ... -
Docker Swarm 2020(1)Simply Install and Use Swarm
2020-03-31 07:58 369Docker Swarm 2020(1)Simply Inst ... -
Traefik 2020(1)Introduction and Installation
2020-03-29 13:52 336Traefik 2020(1)Introduction and ... -
Portainer 2020(4)Deploy Nginx and Others
2020-03-20 12:06 431Portainer 2020(4)Deploy Nginx a ... -
Private Registry 2020(1)No auth in registry Nginx AUTH for UI
2020-03-18 00:56 436Private Registry 2020(1)No auth ... -
Docker Compose 2020(1)Installation and Basic
2020-03-15 08:10 374Docker Compose 2020(1)Installat ... -
VPN Server 2020(2)Docker on CentOS in Ubuntu
2020-03-02 08:04 455VPN Server 2020(2)Docker on Cen ... -
Buffer in NodeJS 12 and NodeJS 8
2020-02-25 06:43 385Buffer in NodeJS 12 and NodeJS ... -
NodeJS ENV Similar to JENV and PyENV
2020-02-25 05:14 478NodeJS ENV Similar to JENV and ... -
Prometheus HA 2020(3)AlertManager Cluster
2020-02-24 01:47 423Prometheus HA 2020(3)AlertManag ... -
Serverless with NodeJS and TencentCloud 2020(5)CRON and Settings
2020-02-24 01:46 337Serverless with NodeJS and Tenc ... -
GraphQL 2019(3)Connect to MySQL
2020-02-24 01:48 247GraphQL 2019(3)Connect to MySQL ... -
GraphQL 2019(2)GraphQL and Deploy to Tencent Cloud
2020-02-24 01:48 451GraphQL 2019(2)GraphQL and Depl ... -
GraphQL 2019(1)Apollo Basic
2020-02-19 01:36 328GraphQL 2019(1)Apollo Basic Cl ... -
Serverless with NodeJS and TencentCloud 2020(4)Multiple Handlers and Running wit
2020-02-19 01:19 314Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(3)Build Tree and Traverse Tree
2020-02-19 01:19 318Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(2)Trigger SCF in SCF
2020-02-19 01:18 294Serverless with NodeJS and Tenc ...
相关推荐
Amazon SQS Java 消息传递库 Amazon SQS Java Messaging Library包含 Java Message Service 兼容类,用于与 Amazon Simple Queue Service 进行通信。 该项目构建在适用于 Java 的 AWS 开发工具包之上,以使用 Amazon...
2017年最新的AWS实践类图书,理解云计算有帮助,英文版
Amazon SQS Java临时队列客户端使用临时队列客户端,您可以创建轻型临时队列,这些临时队列在不再使用时会自动删除。 您可以将“临时队列客户端”用于常见的消息传递模式,例如“请求-响应”。 该库提供了两个互补的...
Squiss, 用于 node.js的Amazon SQS轮询器 Squiss node.js 4和更高版本的Amazon SQS轮询器和单个队列客户端const poller = new Squiss({ queueName: 'my-sqs-queue', bodyForma
const { Producer } = require ( 'sqs-producer' ) ; // create simple producer const producer = Producer . create ( { queueUrl : 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name' , region : '...
总之,`amazon-sqs.jar`是与亚马逊SQS服务交互的关键组件,它使得开发者能够在Java应用中轻松地实现消息队列的功能,从而提升应用的可靠性和可扩展性。正确理解和使用这个库,能帮助你在分布式系统设计中充分利用...
亚马逊SQS大数据云架构
《Practical Amazon EC2, SQS, Kinesis, and S3: A Hands-On Approach to AWS-2017》是一本面向实践的指南,旨在帮助读者深入理解和掌握Amazon Web Services (AWS) 中的核心服务,包括Amazon Elastic Compute Cloud ...
本篇将深入探讨如何使用Java与Amazon Simple Queue Service (SQS)进行交互,主要基于提供的"SQS_Example"项目,该项目利用Maven作为构建工具。SQS是亚马逊云服务(AWS)提供的一个完全托管的消息队列服务,它能够...
Practical Amazon EC2, SQS, Kinesis, and S3 A Hands-On Approach to AWS 英文无水印原版pdf pdf所有页面使用FoxitReader、PDF-XChangeViewer、SumatraPDF和Firefox测试都可以打开 本资源转载自网络,如有侵权...
const { Consumer } = require ( 'sqs-consumer' ) ; const app = Consumer . create ( { queueUrl : 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name' , handleMessage : async ( message ) => { /...
官方版本,亲测可用
**Python-SimpleQ:亚马逊SQS的简单可伸缩队列实现** 在现代软件开发中,事件驱动和异步处理是提升系统性能和可扩展性的关键。Python SimpleQ 是一个针对亚马逊 Simple Queue Service (SQS) 的轻量级库,它提供了一...
java8 看不到源码sqs-gui-客户端 用于测试您的 AWS-SQS 队列并轻松执行基本操作的桌面应用程序。 ...target/sqs_client-1.0-SNAPSHOT-jar-with-dependencies.jar C) 下载可执行文件 视窗: Linux:
var Consumer = require ( 'sqs-consumer' ) ; var app = new Consumer ( { queueName : '' , handleMessage : function ( message , done ) { // do some work with `message` done ( ) ; } } ) ; app . ...
var Consumer = require ( 'sqs-consumer' ) ; var app = Consumer . create ( { queueUrl : 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name' , messageAttributeNames = [ 'user_id' ] , ...
client , err := sqs . NewClient ( "JimmysAccessKeyId" , "JimmysSecretAccessKey" , "us-east-1" ) if err != nil { // wrong region? } queueURL , err := client . CreateQueue ( "jimmys-queue" ) if err !=...
**Laravel 开发与 Amazon SQS (Simple Queue Service) 集成详解** 在 Laravel 开发中,我们经常需要处理异步任务,以提高应用程序的响应速度和性能。Amazon SQS(Simple Queue Service)是一种完全托管的消息队列...
这是用Amazon SQS后端替换Kue的快速方法。 代替: var kue = require ( 'kue' ) ,queue = kue . createQueue ( ) ; 这样做: var AWS = require ( 'aws-sdk' )var sqs = new AWS . SQS ( { region : '' , ...