`

7.3实现request/reply with JMS

阅读更多
如前面章节中描述的,messaging完全是关于从receiver的senders的decoupling。消息从一个进程发送到一个代理,并且消息以异步的方式又从代理接收到另外一个不同的进程。使用JMS实现的系统结构风格被称为request/reply。From a hight level,一个request/reply方案包含了发送消息(request)和等待一个消息的返回(reply)的应用。传统的,这样的系统设计通过使用一个client-server结构实现,服务器和客户端以一个同步地模式在网络(TCP,UDP等等)上交互。这种模式当然有可伸缩性的限制,并且很难将它进一步分散。That’s where messaging enters the picture—提供设计一个能通过基于消息的request/reply设计简单扩展的系统的能力。一些世界上扩展性最好的系统通过使用类似这个示例中演示的异步的进程来实现。
在图形7.2中显示的图标描述了一个request/reply的概览。注意客户端同时包含了producer和consumer,并且worker也同时包含了producer和consumer。这两个实体都将在下面被解释。
首先,producer建立一个request,这个request是JMS消息的格式,并设置了一系列重要的属性--correlation ID(通过JMSCorrelationID消息属性设置)和reply destination(通过JMSReplyTo消息属性设置)。correlation ID很重要,因为如果有多个未处理的request,它允许requests和replies关联起来。reply的destination是reply预计要被发送到的地址(通常是一个临时的JMS destination因为它要资源友好的多)。接着客户端配置了一个consumer去监听reply destination。
第二步,一个worker接收了request,处理它,并且使用在request message中的JMSReplyTo属性设置的destination来发送一个reply消息。这个reply消息必须使用原request的correlation ID也设置JMSCorrelationID。当客户端接收这个reply message,它能正确的将消息和原始的request相关联起来。
现在有意思的部分来了--演示这个结构如何能变得高度可扩展。想象一下一个单一的worker不足够处理来到的request的加载。没问题:添加额外的workers来处理加载。这些workers甚至能分散到不同的主机上--这是扩展这个设计的最重要的方面。因为workers对相同主机上的相同资源不是contentding的,唯一的限制是通过代理最大的消息流量,它远高于你可以实现的任何经典client-server设置的量。此外,ActiveMQ能被垂直和水平地扩展,如第四部分讨论的那样,让我们现在看一下一个简单的request/reply实现。
7.3.1实现server和worker
这个系统中首先要关注的是消息代理。让代理运行这样就能准备好两端启动的时候的连接。一个嵌入代理会在这个示例中使用到因为它很容易演示。系统中下一个运行的部分是worker。worker由一个consume消息和发送response的消息监听器组成。即使是一个简单的实现,它将提供你在系统中使用它的足够的信息。那么来看一看server的实现。
Listing 7.14 Create a broker, a consumer, and a producer for the request/reply example

...
public void start() throws Exception {
	createBroker();
	setupConsumer();
}
private void createBroker() throws Exception {
	broker = new BrokerService();
	broker.setPersistent(false);
	broker.setUseJmx(false);
	broker.addConnector(brokerUrl);
	broker.start();
}
private void setupConsumer() throws JMSException {
	ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
	Connection connection;
	connection = connectionFactory.createConnection();
	connection.start();
	session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	Destination adminQueue = session.createQueue(requestQueue);
	producer = session.createProducer(null);
	producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	consumer = session.createConsumer(adminQueue);
	consumer.setMessageListener(this);
}
public void stop() throws Exception {
	producer.close();
	consumer.close();
	session.close();
	broker.stop();
}
...

如你所见的,这个start()方法调用了一个方法来创建和启动一个嵌入式代理,并且另外一个方法建立和启动worker。createBroker()方法使用BrokerService类建立一个嵌入式代理。setupConsumer()方法建立用户接收和发送消息的所有JMS对象,包括一个connection,一个session,一个destination,一个consumer和一个producer。producer被建立时没有默认的destination,因为将在每个消息的JMSReplyTo属性中指定destination,消息将发送到那个指定的destination。
进一步看看这个listener,注意下面展示的它是如何处理每个request的consumption的。
Listing 7.15 The message listener for the request/reply example
...
public void onMessage(Message message) {
	try {
		TextMessage response = this.session.createTextMessage();
		if (message instanceof TextMessage) {
			TextMessage txtMsg = (TextMessage) message;
			String messageText = txtMsg.getText();
			response.setText(handleRequest(messageText));
		}
		response.setJMSCorrelationID(message.getJMSCorrelationID());
		producer.send(message.getJMSReplyTo(), response);
	} catch (JMSException e) {
		e.printStackTrace();
	}
}
public String handleRequest(String messageText) {
	return "Response to '" + messageText + "'";
}
...

这个listener新建一个新的消息,分配正确的correlation ID,并发送消息到reply-to队列。简单的东西,但还是很重要。虽然这个消息监听器在它的实现并不是earth shattering,它展示了完成worker的任务的基本必要的步骤。根据需求任何数量的额外操作或数据库存取能被添加到你的系统的监听器。
启动这个服务是比较明显的:建立一个它的实例并调用start()方法。所有的server功能被封装在main方法中,如下表所示。
Listing 7.16 Starting the server for the request-reply example
...
public static void main(String[] args) throws Exception {
	Server server = new Server();
	server.start();
	System.out.println();
	System.out.println("Press any key to stop the server");
	System.out.println();
	System.in.read();
	server.stop();
}
...

一旦服务被启动并且worker运行,那么一切就绪等待接收从client来的requests。
7.3.2实现client
客户端的工作是初始化到代理的requests。这是整个request/reply的处理开始的地方,并且一般由你的一个业务进程来调用。这个进程可能要接受一个命令,履行这个命令,整合许多业务系统,或者购买或销售一个financial position。无论是什么实例,request-reply以发送一个消息开始。
向代理发送一个消息需要基本的connection,session,destination和producer,这些都是通过start()方法在客户端建立。这些在下面的列表中都被列举了:
Listing 7.17 Methods for starting and stopping the request/reply client
...
public void start() throws JMSException {
	ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
	connection = connectionFactory.createConnection();
	connection.start();
	session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	Destination adminQueue = session.createQueue(requestQueue);
	producer = session.createProducer(adminQueue);
	producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	tempDest = session.createTemporaryQueue();
	consumer = session.createConsumer(tempDest);
	consumer.setMessageListener(this);
}
public void stop() throws JMSException {
	producer.close();
	consumer.close();
	session.close();
	connection.close();
}
...

producer发送一个消息到request queue接着consumer在新建立的临时queue进行监听。现在该是为客户端实现真实逻辑的时候了,如下所示。
Listing 7.18 Implementation of logic for request/reply client
...
public void request(String request) throws JMSException {
	System.out.println("Requesting: " + request);
	TextMessage txtMessage = session.createTextMessage();
	txtMessage.setText(request);
	txtMessage.setJMSReplyTo(tempDest);
	String correlationId = UUID.randomUUID().toString();
	txtMessage.setJMSCorrelationID(correlationId);
	this.producer.send(txtMessage);
}
public void onMessage(Message message) {
	try {
		System.out.println("Received response for: " + ((TextMessage) message).getText());
	} catch (JMSException e) {
		e.printStackTrace();
	}
}
...

表7.18中显示的这个request()方法根据request内容建立了一个message,设置了JMSReplyTo到临时queue,并且设置了correlation ID--这三个东西很重要。虽然这个示例中的correlation ID使用随机的UUID,也就是基本任何ID generator都可以做到。现在women准备好发送一个request了。
就想启动server是一个简单的main方法,客户端也是如此,如下所示。

Listing 7.19 Starting the request/reply client

...
public static void main(String[] args) throws Exception {
	Client client = new Client();
	client.start();
	int i = 0;
	while (i++ < 10) {
		client.request("REQUEST-" + i);
	}
	Thread.sleep(3000); //wait for replies
	client.stop();
}
...

如前面所解释的,这是一个简单的实现。所以在启动client后,10个requests被发送到代理上。现在该是实际地运行这个示例的时候了。
7.3.3运行request/reply示例
运行这个示例需要两个terminal:一个是server的一个是client的。server的需要首先启动。server在一个名为Server的类中被实现而client在一个名为Client的类中被实现。因为这些类中的每一个都是通过main方法被初始化,启动每一个都很简单。下面的类表显示了如何启动server类。
Listing 7.20 Start up the server for the request/reply example
$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch7.sync.Server
...
INFO | Using Persistence Adapter: MemoryPersistenceAdapter
INFO | ActiveMQ 5.4.1 JMS Message Broker (localhost) is starting
INFO | For help or more information please see:
http://activemq.apache.org/
INFO | Listening for connections at:
tcp://dejan-bosanacs-macbook-pro.local:61616
INFO | Connector tcp://dejan-bosanacs-macbook-pro.local:61616 Started
INFO | ActiveMQ JMS Message Broker
(localhost, ID:dejanb-57522-1271170284460-0:0) started
Press any key to stop the server
INFO | ActiveMQ Message Broker
(localhost, ID:dejanb-57522-1271170284460-0:0) is shutting down
INFO | Connector tcp://dejan-bosanacs-macbook-pro.local:61616 Stopped
INFO | ActiveMQ JMS Message Broker
(localhost, ID:dejanb-57522-1271170284460-0:0) stopped
...

当server被启动,接着该是启动client和开始发送requests的时候了。下面的列表显示了如何启动client。
Listing 7.21 Start up the client for the request/reply example
$ mvn exec:java -Dexec.mainClass=org.apache.activemq.book.ch7.sync.Client
...
Requesting: REQUEST-1
Requesting: REQUEST-2
Requesting: REQUEST-3
Requesting: REQUEST-4
Requesting: REQUEST-5
Requesting: REQUEST-6
Requesting: REQUEST-7
Requesting: REQUEST-8
Requesting: REQUEST-9
Requesting: REQUEST-10
Received response for: Response to 'REQUEST-1'
Received response for: Response to 'REQUEST-2'
Received response for: Response to 'REQUEST-3'
Received response for: Response to 'REQUEST-4'
Received response for: Response to 'REQUEST-5'
Received response for: Response to 'REQUEST-6'
Received response for: Response to 'REQUEST-7'
Received response for: Response to 'REQUEST-8'
Received response for: Response to 'REQUEST-9'
Received response for: Response to 'REQUEST-10'
...

注意当客户端被启动,10个requests被发送去初始化request/reply过程并且10个reply从worker那里接收回来。虽然它并非是极好的,但当你要将它应用到你自己的业务流程中的时候,在这个简单request/reply示例的能量是很明显的。
使用request/reply模式,想象一下每一秒中有成千上万的requests从许许多多的clients发送到代理,它们分别到许许多多的主机。在一个生产系统中,会有多余1个的代理实例会被用于redundancy,failover和load balencing。这些broker也会distributed到许许多多的主机上。处理这么多requests的唯一方法是使用许多的workers。Producers总是能迅速的发送消息,而比consumer接受和处理它们快得多,所以会需要许许多多的workers,它们也都将被扩展到许多主机上。使用许多主机的好处是每一个都能根据需要启动和关闭,并且整个系统本身不会被影响到。producers和workers将继续处理消息,并且即使它们中的一个挂了,它将不会影响到系统。准确来说这是许多large-scale系统如何处理这样tremendous复杂的方式--通过对异步messaging的使用,就像request/reply模式展示的那样。
JMS API是tedious的,因为它需要你写很多代码来初始化所有必要的JMS对象,例如connections,sessions,producers,consumers等等。这个Spring框架提供的很大的好处的地方。它通过supply一个更cogent的API和简化整个配置来帮助你删除这样的boilerplate code。
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    轻触开关 12*12*7.3方头插件/四脚插件 TS-KG03T 汉博HANBO 3D

    轻触开关 12*12*7.3方头插件/四脚插件 TS-KG03T 汉博HANBO 3D

    每日签到 v3.00 FOR phpwind7.3.2/7.3/7.0/6.32

    # 插件简介 ... 2.可以后台进行设置是否允许每小时签到,如果不允许依然是一个完美的每日签到插件。前台无需改动,我已经做好调整,关闭每时签到则签到次数相关信息不显示; 3.增强验证功能,避免出现挂机刷签到的情况...

    boost1.71_MinGW7.3(32/64位动态库、静态库)

    标题中的"boost1.71_MinGW7.3"指的是Boost库的1.71版本,这是专门为使用MinGW7.3编译器的C++开发环境编译和优化的。 MinGW (Minimalist GNU for Windows) 是一个可移植的C/C++开发环境,它将GCC(GNU Compiler ...

    iNodeSetup7.3 (E0538).rar

    iNodeSetup7.3 (E0538).rar 是一个专为Mac系统设计的新版本iNode客户端安装包。iNode作为一个网络应用,通常用于文件传输、远程访问或网络管理等场景,尤其在教育和企业环境中较为常见。这个E0538版本可能是其软件...

    centos7.3/联想打印机/兄弟打印机/驱动安装

    centos7.3/联想打印机/兄弟打印机/驱动安装/底层驱动安装 联想很多型号打印机没有Linux驱动,如果不知道那款驱动通用,可在日本兄弟官网上找:看哪一款和联想外形一样或者非常接近的,这款打印机的Linux驱动基本都会...

    SIMATIC HMI WinCC V7.3 WinCC/IndustrialDataBridge 入门指南[手册].pdf

    根据所提供的文件内容,以下是关于SIMATIC HMI WinCC V7.3以及WinCC/IndustrialDataBridge (IDB) 入门指南的相关知识点。 首先,手册的主要目的是介绍用户如何使用WinCC V7.3和WinCC/IndustrialDataBridge产品。...

    SecureCRT7.3 34/64位,带注册机

    SecureCRT7.3 34/64位,带注册机。完美破解版。。。。。。。。。。。。。。。。。。。。。。。。。。

    linux java jdk1.7 centos

    oracle官网已不开放下载,亲测可用,jdk1.7 linux centos,提供多版本

    centos7.3配置zabbix3.2/3.4

    ### CentOS 7.3 配置 Zabbix 3.2/3.4 #### 一、系统环境准备 为了在 CentOS 7.3 上成功安装和配置 Zabbix 3.2/3.4,首先需要确认系统环境是否满足要求。通过执行 `cat /etc/redhat-release` 命令来验证当前系统...

    linux搭建C开发环境

    `rpm -ivh ftp://216.254.0.38/linux/redhat/7.3/en/os/i386/RedHat/RPMS/glibc-kernheaders-2.4-7.14.i386.rpm` - glibc-common `rpm -ivh ftp://216.254.0.38/linux/redhat/7.3/en/os/i386/RedHat/RPMS/glibc-...

    Foxit PhantomPDF Business 7.3 Patch

    许多企业不仅需要创建和编辑PDF文档,更加关注PDF的安全性。他们需要保证重要文件档案符合法规和企业管理的同时,确保这些文档审批后不被修改。因而,对于大型企业来说,拥有一套实用、小巧、易于使用...注册7.3x的版本

    MagickWand For PHP 5.5/5.6/7.0/7.1/7.2/7.3 (Win32/x64)

    MagickWand For PHP。支持PHP版本5.5.x / 5.6.x / 7.0.x / 7.1.x / 7.2.x / 7.3.x Thread Safe, x86 / x64 添加 extension=php_magickwand.dll。采用 ImageMagick 版本:6.9.10-59 Q32。暂不支持 PHP 7.4

    Linux系统下C开发环境的构成和安装

    rpm -ivh ftp://216.254.0.38/linux/redhat/7.3/en/os/i386/RedHat/RPMS/glibc-2.2.5-34.i386.rpm ``` 此外,还需要安装glibc的开发库和相关工具: - `glibc-devel`: 包含了用于开发的头文件和库文件。 - `glibc-...

    php-7.3.13.tar.gz

    --with-config-file-scan-dir=/etc/php7.3/conf.d \ --enable-fpm \ --enable-mbstring \ --with-curl \ --with-zlib \ --with-jpeg \ --with-zip \ --with-openssl \ --enable-bcmath \ --enable-intl \ --with-pdo...

    gradle-7.3-all.zip 极速下载

    Gradle 7.3 版本为 JVM 项目引入了声明性测试套件 API,添加了对使用 Java 17 构建项目的支持,并更新了 Scala 插件以支持 Scala 3。 Gradle 7.3 具体更新内容如下:  通过更新你的 Wrapper,将你的构建转换为...

    社区论坛任务 For PHPWind 6.3.2/7.0/7.3/7.3.2(gbk/big5)最后更新2009/5/7

    该插件适用于PHPWind 6.3.2、7.0、7.3及7.3.2等多个版本,并且支持GBK和Big5两种编码方式,为多语言环境的论坛提供了便利。本文将深入探讨该插件的功能、特点及其对论坛运营的影响。 首先,我们来看一下这个插件的...

    【独家分享】电竞比分源码/免买分源码/可二开/支持最新PHP7.3/LOL,王者,吃鸡等等电竞比分源码

    【独家分享】电竞比分源码/免买分源码/可二开/支持最新PHP7.3 OS6.5 到7.5 推荐7.4 Nginx -Tengine2.2.3 MySQL 8.0.16 PHP 7.3 Redis 5.0.8 php7.3 安装扩展fileinfo redis Swoole4 伪静态使用 think PHP 修改LOGO ...

    H3C inode 7.3 linux

    《深入理解H3C iNode 7.3 Linux系统》 在信息技术领域,H3C iNode是一款由H3C公司推出的嵌入式Linux操作系统,主要用于网络设备和服务器平台。iNode 7.3作为其重要的版本,为用户提供了一套稳定、高效的运行环境。...

    SecureCRT 7.3&SecureFX 7.3

    SecureCRT 7.3&SecureFX 7.3 2015-10-26

    MySQL Cluster 7.3实现互联网级性能和运营商级可用性的新特性

    ### MySQL Cluster 7.3 实现互联网级性能与运营商级可用性的新特性解析 #### 简介 MySQL Cluster 7.3 是一款高度可扩展、实时、事务型数据库系统,设计之初即考虑到了电信级别的高可用性和互联网级别的高性能需求...

Global site tag (gtag.js) - Google Analytics