欢迎继续收看我的文章。
本节主要内容就是讲解消息的传递方式,上一节已经讲解完客户端和broker端连接的建立方式,在Connection、Session、Producer类对象建立的同时,客户端和broker端会进行一些消息交互,ActiveMQ中把所有的消息交互的内容都叫做Command,每条消息对应一个Command,例如客户端刚连接到broker,broker会发送一个BrokerInfo信息到客户端,接着客户端会发送ConnectionInfo连接信息、ProducerInfo生产者信息等等到服务端,如下图所示:
这些命令都继承于BaseCommand抽象类并实现于Command接口,类图如下,这里使用了访问者设计模式以及适配器设计模式。
适配器模式分为三种实现,第一种是通过继承实现,第二种是通过组合实现,第三种也就是类图中画的叫做默认适配器,CommandVisiter接口中定义了处理所有消息的方法,是broker和客户端api公用的一个接口,但是客户端用不到全部接口,如果实现这个接口那又必须实现全部的方法,所以此处在接口和具体类中间新增了一个CommandVisiterAdapter抽象类实现了全部的处理消息的方法并且全部都是空实现,这样在新建DefaultVisiter的时候就可以根据自己的需要来选择相应的方法进行实现了,访问者设计模式的体现请看下面的代码:
/**
* reads packets from a Socket
*/
publicvoid run() {
LOG.trace("TCP consumer thread for " + this + " starting");
this.runnerThread=Thread.currentThread();
try {
while (!isStopped()) {
doRun();
}
} catch (IOException e) {
stoppedLatch.get().countDown();
onException(e);
} catch (Throwable e){
stoppedLatch.get().countDown();
IOException ioe=new IOException("Unexpected error occured: " + e);
ioe.initCause(e);
onException(ioe);
}finally {
stoppedLatch.get().countDown();
}
}
protectedvoid doRun() throws IOException {
try {
Object command = readCommand();
doConsume(command);
} catch (SocketTimeoutException e) {
} catch (InterruptedIOException e) {
}
}
protected Object readCommand() throws IOException {
return wireFormat.unmarshal(dataIn);
}
这段代码是来自TcpTransport.java中,上一节已经讲解完TcpTransport的建立所以此处不在熬述,可以看到线程启动后一直在调用doRun方法,doRun方法则调用readCommand来读取客户端发送过来的信息,读到之后就会调用ActiveMQConnection.java的onCommand方法,如下代码:
@Override
publicvoid onCommand(final Object o) {
final Command command = (Command)o;
if (!closed.get() && command != null) {
try {
command.visit(new CommandVisitorAdapter() {
@Override
public Response processMessageDispatch(MessageDispatch md) throws Exception {
waitForTransportInterruptionProcessingToComplete();
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
}
dispatcher.dispatch(md);
}
returnnull;
}
@Override
public Response processProducerAck(ProducerAck pa) throws Exception {
if (pa != null && pa.getProducerId() != null) {
ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
if (producer != null) {
producer.onProducerAck(pa);
}
}
returnnull;
}
@Override
public Response processBrokerInfo(BrokerInfo info) throws Exception {
brokerInfo = info;
brokerInfoReceived.countDown();
optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
returnnull;
}
@Override
public Response processConnectionError(final ConnectionError error) throws Exception {
executor.execute(new Runnable() {
@Override
publicvoid run() {
onAsyncException(error.getException());
}
});
returnnull;
}
@Override
public Response processControlCommand(ControlCommand command) throws Exception {
onControlCommand(command);
returnnull;
}
@Override
public Response processConnectionControl(ConnectionControl control) throws Exception {
onConnectionControl((ConnectionControl)command);
returnnull;
}
@Override
public Response processConsumerControl(ConsumerControl control) throws Exception {
onConsumerControl((ConsumerControl)command);
returnnull;
}
@Override
public Response processWireFormat(WireFormatInfo info) throws Exception {
onWireFormatInfo((WireFormatInfo)command);
returnnull;
}
});
} catch (Exception e) {
onClientInternalException(e);
}
}
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onCommand(command);
}
}
可以看到Command作为入参传进onCommand方法,然后方法中调用command.visit(new CommandVisitorAdapter() {…}),而new CommandVisitorAdapter就是上面介绍过的适配器设计模式,这里可以看到这个匿名类只需实现可以用到的方法,而不是实现所有的接口方法。这里command.visit的好处是传入进来的command程序不需要判断他是什么类型的command然后在决定调用这个command的某个方法,而是直接调用visit方法即可,而所有的业务逻辑也是统一的在CommandVisitorAdaptor中实现,这也算是java中动态多分派的实现。
知道了消息接收处理以及broker和客户端的信息交互之后,我们在来看下消息是如何从Command类序列化到字节写入以及字节如何反序列化转成Command类的,在ActiveMQ中,每一个Command消息都对应一个CommandMarshal类,例如ConnectionInfo使用ConnectionInfoMarshal来序列化和反序列化,BrokerInfo使用BrokerInfoMarshal来序列化和反序列化,ConnectionInfoMarshal等marshal的实现就是把字符串或者字节等信息写入到socket的输出流中没有什么可说的,序列化和反序列化又分为两种,一种是tight一种是loose,tight会针对cpu来进行优化,先写入大小,在写入具体数据,而loose方式则直接写入数据,两种方式都会使用缓存功能,客户端和服务端都分别存在marshal[]和unMarshal[]数组,例如客户端给服务端发送ProducerInfo信息,第一次发送后会把ProducerInfo存放在marshal的第0位,然后服务端接收后会把这个producerInfo放在unMarshal[0]中,如果客户端在次发送ProducerInfo则从缓存中取,找到ProducerInfo在marshal[]的第0个,则直接发送0,服务端则从unMarshal[]第0个取出使用,这节就到这里吧,想到什么我在补充,欢迎跟帖讨论。
相关推荐
二:服务介绍: 1) 服务的注册与发现 Spring Cloud是一个基于Spring Boot实现的云应用开发工具,它为基于JVM的云应用开发中涉及的配置管理、服务发现、断路器、智能路由、微代理、控制总线、全局锁、决策竞选、...
这篇文档将深入解析《Android应用源码之简易微信客户端和服务器源码》这一毕业设计项目,旨在帮助读者理解和学习Android应用程序开发,特别是针对即时通讯应用的实现。此项目包括了客户端与服务器端的源代码,提供了...
【标题解析】 "微信小程序大学生闲置物品交易平台设计+ssm后端源码案例设计" 这个标题表明,这是一个关于微信小程序开发的项目,主要针对大学生群体,目的是搭建一个平台,让学生们可以交易自己不再需要的物品。这里...
《家校通管理系统——深入解析Java源码》 家校通管理系统是一款专为教育机构设计的信息化管理平台,旨在加强家庭与学校之间的沟通,提高教育管理效率。此系统基于Java编程语言开发,体现了Java在企业级应用中的强大...
#### 二、系统架构与演进 - **传统架构**:单体应用模式,所有功能模块紧密耦合在一起,不利于扩展和维护。 - **分布式架构**: - **定义**:将系统按业务需求拆分成多个独立的服务,每个服务都可独立部署和扩展。 ...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
Highthon应用程序服务器是一款专为处理高并发、高性能需求设计的应用服务框架,主要针对第三和第四高音应用程序的接收和处理。在Java开发领域,这样的服务器架构对于构建可扩展且稳定的后端服务至关重要。本文将深入...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。提供了一个基于对象模型的 ActionScript 字节码,并提供了 ActionScript 字节码统计工具。 Java类重加载工具 JReloader JReloader 是一个用来重新...
8. **中间件与第三方库**:接触并使用过如Nginx(反向代理服务器)、ActiveMQ(消息中间件)、ThymeLeaf(模板引擎)、Freemarker(模板引擎)、Shiro(安全框架)、Kafka(消息队列)、FastJson(JSON解析)、Solr...