`
m635674608
  • 浏览: 5003266 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

使用 ActiveMQ 和 HornetQ 在 WebSockets 上轻松实现 STOMP 消息传输

    博客分类:
  • java
 
阅读更多

对于在不同层级上构建分布式软件系统来说,消息机制是一个非常强大的工具。通常来说,至少在 Java 生态圈内,客户端 (前端) 从来不会直接和消息中间件(或交换器进行交互,而是去调用服务器端 (后端)的服务。或者说,客户端甚至都没有意识到还存在着消息解决方案。

随着 Websockets 得到了越来越多的使用,以及面向文本的协议的广泛支持,比如STOMP (用来和消息中间件或交换器进行通讯) ,情况正在有所改变。 今天的这篇博文将试图解释下面的事情是多么容易,即通过使用基于WebsocketsSTOMP 协议,把两个非常流行的 JMS 实现 Apache ActiveMQ 和 JBoss HornetQ 提供给 web 前端 (JavaScript) 来使用。

lwei
lwei
翻译于 4年前
2人顶
 翻译得不错哦!
 

在深入代码之前,也许有人会说这样做不是一个好主意。那么我们的目的是什么呢? 答案真的取决于:

  • 你在开发的是概念的原型/证明,你需要一个简易的方式来整合发布/订阅或者点到点的消息发送
  • 你不想要/需要 构建复杂精致的架构,最简单的能运行的解决方案就足够了

这里可扩展性,容错性和其它一些重要的决定因素都没有考虑在内,但是如果你想要开发具有鲁棒性和弹性的架构,这些绝对是应当考虑的。

好,让我们开始吧。从我们正在尝试解决的问题来入手总是更好一些: 我们想要开发一个简单的发布/订阅解决方案,来使那些用JavaScript 编写的 web 客户端可以发送消息并监听特定的主题。任何时候接收到任何消息时,客户端就会展示一个弹出窗口。请注意,我们需要使用现代浏览器,它要能够支持 Websockets,比如 Google Chrome 或者 Mozilla Firefox.

lwei
lwei
翻译于 4年前
1人顶
 翻译得不错哦!
 

我们的两个例子中客户端代码都是一样的,所以我们就从那儿开始。一个很棒的学习起点是一篇叫做 STOMP Over WebSocket 的文章,它介绍了 stomp.js 组件,下面是我们的 index.html:

<script src="stomp.js"></script>

<script type="text/javascript">
 var client = Stomp.client( "ws://localhost:61614/stomp", "v11.stomp" );

 client.connect( "", "",
  function() {
      client.subscribe("jms.topic.test",
       function( message ) {
           alert( message );
          }, 
    { priority: 9 } 
      );
   
   client.send("jms.topic.test", { priority: 9 }, "Pub/Sub over STOMP!");
  }
 );
 
</script>

很简单的代码,不过仍有几个细节值得解释一下。首先,我们在 ws://localhost:61614/stomp 寻找 Websockets 终端。在本机部署时这是没问题的,但是最好用真实的 IP 地址或者主机名来替代 localhost 。其次,一旦连接成功, 客户端会去订阅主题(只对优先级为 9的消息感兴趣) ,随后立即给该主题发送消息。从客户端的角度来看,我们已经完成了所有工作。

lwei
lwei
翻译于 4年前
1人顶
 翻译得不错哦!
 

让我们继续来看看消息中间件,首先是 Apache ActiveMQ。为了简化例子,我们会把 Apache ActiveMQ 中间件嵌入到一个简单的不需要XML配置文件的 Spring 应用中。因为源代码已经在 GitHub 上提供了,我将跳过 POM 文件片段,只展示代码部分:

package com.example.messaging;

import java.util.Collections;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();    
        broker.addConnector( "ws://localhost:61614" ); 
        broker.setPersistent( false );
        broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );
  
        final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );
        broker.setDestinations( new ActiveMQDestination[] { topic }  );
  
        final ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector( true );
        broker.setManagementContext( managementContext );
  
        return broker;
    }
}

正如我们可以看到的,ActiveMQ 中间件被配置为使用 ws://localhost:61614 链接,并假定它使用 STOMP 协议。并且,我们创建了名为 jms.topic.test 的 JMS 主题,启用了 JMX 管理。要运行它,使用 Starter 类:

package com.example.messaging;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Starter  {
    public static void main( String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

现在,走起,开始运行,让我们在浏览器中打开 index.html 文件,我们应当可以看见类似下面的内容:

简单吧! 有求知欲的读者们请注意,为了支持 WebsocketsActiveMQ 需要使用 Jetty 7.6.7.v20120910 ,最新的 Jetty 发布版不管用。

lwei
lwei
翻译于 4年前
1人顶
 翻译得不错哦!
 

继续,考虑到 HornetQ 的特点,下面的实现看起来有些不同,不过仍然不太复杂。由于 Starter 类跟原来一样,所以唯一改变是配置:

package com.example.hornetq;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public EmbeddedJMS broker() throws Exception {
        final ConfigurationImpl configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled( false );
        configuration.setJournalType( JournalType.NIO );
        configuration.setJMXManagementEnabled( true );
        configuration.setSecurityEnabled( false );
  
        final Map< String, Object > params = new HashMap<>();
        params.put( TransportConstants.HOST_PROP_NAME, "localhost" );
        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );
        params.put( TransportConstants.PORT_PROP_NAME, "61614" );
  
        final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );
        configuration.getAcceptorConfigurations().add( stomp );
        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );
  
        final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );
        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );
  
        final JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );
  
        final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );
        jmsConfig.getTopicConfigurations().add( topicConfig );
  
        final EmbeddedJMS jmsServer = new EmbeddedJMS();
        jmsServer.setConfiguration( configuration );
        jmsServer.setJmsConfiguration( jmsConfig );
  
        return jmsServer;
    }
}

完整的源代码在 GitHub 上。在启动 Starter 类之后,在浏览器中打开 index.html,你将看到的结果应该类似于:

HornetQ 的配置看起来似乎更啰嗦一些,不过除了优秀的 Netty 框架之外,没有牵涉到其他的依赖。

出于我自己的好奇,我用 Apollo 实现替换了 ActiveMQ 中间件。虽然我如期的使它正常运行,但是我发现它的 API 非常笨重,至少在当前的 1.6 版本中是这样,所以在本文中我没有叙及这部分内容。

所有源代码可以从GitHub上获取: Apache ActiveMQ 示例 和 JBoss HornetQ 示例

 

https://www.oschina.net/translate/easy-messaging-with-stomp-over-websockets-using-activemq-and-hornetq

分享到:
评论

相关推荐

    ActiveMQ和HornetQ性能对比

    本文旨在通过一系列测试数据对比分析ActiveMQ与HornetQ在不同消息大小及数量下的性能表现。测试环境为相同的硬件配置,确保了测试结果的公正性。通过对比两者的发送时间、吞吐量等指标,可以更直观地了解两者之间的...

    realtime-messaging:使用 STOMP over WebSockets 在 Spring Boot 上实时传输数据

    在 Spring Boot 上使用 STOMP Over WebSockets 的实时私人消息演示该项目演示了在 Spring Boot 上使用 over WebSockets 结合和向客户端实时发送数据。 有关更详细的解释,请访问博客页面先决条件跑步安装并运行您的 ...

    go语言实现使用activemq 收发消息

    在本文中,我们将深入探讨如何使用Go语言实现与ActiveMQ的通信,主要关注消息的收发功能。ActiveMQ是Apache软件基金会开发的一款开源消息中间件,支持多种协议,包括我们这里提到的STOMP(Simple Text Oriented ...

    php 版本 activemq+stomp.php

    这个库包含了实现STOMP协议的类和方法,使得开发者可以轻松地在PHP应用程序中集成ActiveMQ的功能。 下面是一些可能包含在压缩包内的关键知识点: 1. **STOMP协议**:讲解STOMP的基本概念,包括其命令(如CONNECT、...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    在本文中,我们将深入探讨如何使用SpringBoot、ActiveMQ和MQTT来实现消息的发送与接收。这是一个典型的分布式系统中的消息通信场景,其中SpringBoot作为应用程序框架,ActiveMQ作为消息中间件,而MQTT(Message ...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    ActiveMQ使用mqtt协议的实现发布消息的三种方式.txt

    java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)

    ActiveMQ实现Android端的消息推送,包含Android端和Server端的代码和使用说明

    压缩包中包含Client Android 端和Service 端,Service端采用apache-activemq-5.13.3最新的版本。Android端采用MQTT实现了消息的接收,接收消息的回调是messageArrived方法。 Server端的简要使用说明如下: 1、解压...

    springboot集成activemq实现消息接收demo

    本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们需要在Spring Boot项目中引入ActiveMQ的相关依赖。在`pom.xml`文件中添加以下Maven依赖: ```xml &lt;groupId&gt;org.spring...

    JMS 使用 ActiveMQ 传送文件

    XML常用于数据交换,因为它具有自我描述性和平台无关性,与JMS结合使用,可以使得消息的结构化和解析更为简单,从而在异构环境中实现文件的高效传输。 2. **JMS 使用 ActiveMQ 传送文件.doc** - 这个文档应该直接...

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    spring 整合activemq实现自定义动态消息队列

    本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...

    ActiveMQ 使用Ajax 收发消息实战

    标题中的"ActiveMQ 使用Ajax 收发消息实战"指出我们将探讨如何使用ActiveMQ消息中间件与Ajax技术结合,实现Web应用程序中的异步消息传递。ActiveMQ是Apache软件基金会的一个项目,提供了一个开源的消息代理,支持...

    SpringBoot整合ActiveMQ+websocket.docx

    在Spring Boot应用中整合ActiveMQ和WebSocket,可以创建一个实时通信系统,使后端服务能够高效地推送消息到前端客户端。以下将详细解释这个过程的关键知识点: 1. **ActiveMQ**:Apache ActiveMQ是一个开源的消息...

    自己实现的ActiveMQ连接池和新版本ActiveMQ自带的连接池,封装好的工具类,可直接使用

    ActiveMQ是Apache软件基金会的一个开源项目,提供了一个强大的、高性能的消息中间件,支持多种协议,如AMQP、STOMP、OpenWire等,并且具有高可用性和高可靠性。 本资源提供的内容是关于ActiveMQ的连接池实现,分为...

    ActiveMQ实现

    在本项目中,我们涵盖了ActiveMQ的基本安装和使用,以及如何通过代码与之交互。 首先,让我们了解什么是ActiveMQ。ActiveMQ是一个高性能、可靠的、完全支持JMS 1.1规范的消息中间件。它提供了多种协议的支持,包括...

    activeMq in action 使用activeMq开发JMS的简单讲述

    3. **多种协议**:除了JMS之外,ActiveMQ还支持STOMP、AMQP、MQTT等多种消息传输协议,增加了系统的兼容性。 4. **高可用性**:通过网络集群和故障转移,ActiveMQ可以实现高可用性和负载均衡,确保服务不间断。 5. *...

    ActiveMQ实现的消息收发案例

    在实现案例中,我们通常会创建一个ActiveMQ服务器,然后通过JMS API来编写生产者和消费者程序。 在`ActiveMQDemo`中,我们可能包含以下步骤: 1. **配置ActiveMQ服务器**:安装并启动ActiveMQ服务器,通常可以通过...

    ActiveMQ APR STOMP技术文档

    结合ActiveMQ、APR 和 STOMP,开发者可以在Linux环境下使用C语言构建高效的、跨平台的消息客户端。APR 提供了底层的系统接口,ActiveMQ 则作为消息中间件处理和转发消息,而STOMP简化了客户端与ActiveMQ的通信。这样...

    ActiveMQ消息服务器 v5.17.6.zip

    1. **多协议支持**:ActiveMQ支持多种通信协议,包括OpenWire、Stomp、AMQP、MQTT和WebSockets,这使得它能够与不同平台和语言的系统无缝集成。 2. **持久化机制**:ActiveMQ提供基于文件的持久化,确保即使在...

Global site tag (gtag.js) - Google Analytics