语言是自己组织的。可能不专业,请读者见谅
有不对的地方,也欢迎大家指正
JMS有两种方式,一种是pub/sub,一种是(后续补充)
主要讨论pub/sub方式
pub/sub方式的工作流程:
首先订阅者(subscriber)向JMS容器订阅需要订阅的主题(topic),消息发布者发布消息,订阅了该主题的订阅者都能收到这个消息。(这是一对多的方式,类似于广播,个人理解),默认情况下,该消息不是持久的。即消息一发出,不管有没有人接受,都不会保存下来,即订阅者只能接收到自己订阅之后发布者发出的消息。
如果出现意外情况,订阅者突然断线了,发布者发布了一条消息,订阅者再连接上是不会收到消息的。解决这种情况的办法是订阅者中加个clientID。
JMS提供者(JMS Provider)也叫JMS服务器/容器
连接工厂Connection Factory:
是用来创建客户端到JMS容器之间JMS连接的工厂
context ctx = new InitialContext();
TopicConnectionFactory tcf = ctx.lookup("topic主题");
目的地:destinations
消息发布者发布消息的地方,即订阅者接收消息的来源地。因为目的地之注册在jms服务器的,所以我们只能通过JNDI查找。
Topic topic = (Topic)ctx.lookup("topic主题");
连接:connection
客户端与JMS提供者(容器)之间的连接
TopicConnection tc = tcf.createTopicConnection();
javax.jms.ExceptionListener
(请忽略:根据某些参数,生成clientID,向服务器注册JMS客户端)
需要导入activemq的jar包
第一种:最简单的activemq 默认端口:61616 发布者,订阅者示例
开启apache-activemq-5.11.1\bin下的activemq.bat
发布者:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicPublisher {
public static void main(String[] args) {
try {
//用户名,密码连接。连接方式:tcp。连接ip:port
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建主题。连接时,发布者发布某个主题名,订阅者订阅想要订阅的主题名。
Topic topic = session.createTopic("TopicName");
//发布者 发布这个主题
MessageProducer producer = session.createProducer(topic);
//发送模式:非持久化 DeliveryMode.PERSISTENT:持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while(true){
TextMessage message = session.createTextMessage();
String text = "abcdefg";
message.setText(text);
producer.send(message);
System.out.println("Sent Message:" + message.getText());
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
订阅者:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicSubscriber implements MessageListener {
private static ConnectionFactory connectionFactory = null;
private static Connection connection = null;
private static Session session = null;
private static Topic topic = null;
private static MessageConsumer consumer = null;
public TopicSubscriber(){
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616");
connection = connectionFactory.createConnection();
// connection.setClientID("aaa");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//主题名:一定想要订阅的发布者的主题名
topic = session.createTopic("TopicName");
consumer = session.createConsumer(topic);
// consumer = session.createDurableSubscriber(topic, "aaa");
consumer.setMessageListener(this);
//开始监听
connection.start();
} catch (JMSException e) {
e.printStackTrace();
close();
}
}
private void close() {
try {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
finally{
try {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
consumer = null;
session = null;
connection = null;
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new TopicSubscriber();
}
}
第二种:SSL 安全通道加密的
发布者:
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.apache.activemq.ActiveMQSslConnectionFactory;
public class TopicSSLPublisher {
ActiveMQSslConnectionFactory factory;
Connection connection;
Session session;
String trustStore = "证书名";
String url = "ssl://IP:port";
Topic topic = null;
MessageProducer producer = null;
public TopicSSLPublisher(){
try {
factory = new ActiveMQSslConnectionFactory();
factory.setBrokerURL(url);
factory.setUserName("admin");
factory.setPassword("admin");
factory.setKeyAndTrustManagers(null, getTrustManagers(trustStore), new SecureRandom());
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("topicName");
producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
while(true){
TextMessage message = session.createTextMessage();
String text = "sljdjk";
message.setText(text);
producer.send(message);
System.out.println("Sent Message:" + message.getText());
Thread.sleep(30000);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private TrustManager[] getTrustManagers(String trustStore){
try {
//KeyStore.getDefaultType();
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(ClassLoader.getSystemResourceAsStream(trustStore),"证书密码".toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
return tmf.getTrustManagers();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
new TopicSSLPublisher();
}
}
订阅者:
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.apache.activemq.ActiveMQSslConnectionFactory;
public class TopicSSLSubscriber implements MessageListener{
ActiveMQSslConnectionFactory factory;
Connection connection;
Session session;
private String trustStore = "证书名";
private String pwd = "pwd";
private String url = "ssl://ip:port";
Topic topic = null;
MessageConsumer consumer = null;
public TopicSSLSubscriber(){
try {
factory = new ActiveMQSslConnectionFactory();
factory.setBrokerURL(url);
factory.setUserName("admin");
factory.setPassword("admin");
factory.setKeyAndTrustManagers(null, getTrustManagers(trustStore), new SecureRandom());
connection = factory.createConnection();
// connection.setClientID("id");//保证唯一性就行
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("topicName");
consumer = session.createConsumer(topic);
// consumer = session.createDurableSubscriber(topic, "id");
//开始监听
connection.start();
} catch (JMSException e) {
e.printStackTrace();
close();
}
}
private TrustManager[] getTrustManagers(String trustStore){
try {
//KeyStore.getDefaultType();
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(ClassLoader.getSystemResourceAsStream(trustStore),pwd.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
return tmf.getTrustManagers();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
new TopicSSLSubscriber();
}
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
private void close() {
try {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
finally{
try {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
consumer = null;
session = null;
connection = null;
} catch (JMSException e) {
e.printStackTrace();
}
}
}
private KeyManager[] getKetManagers(String trustStore){
try {
//KeyStore.getDefaultType();
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(ClassLoader.getSystemResourceAsStream(trustStore),pwd.toCharArray());
KeyManagerFactory tmf = KeyManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore,pwd.toCharArray());
return tmf.getKeyManagers();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
}
return null;
}
}
在这个过程中订阅者中添加clientID的就是解决突发情况的。clientID要保证唯一性。
自己下个activemq的包,activemq的下载连接
自己打开activemq,运行网页,观察网页中发布者,订阅者之间接收消息的情况。
和加了clientID时的情况。
知识点:加了clientID的情况要分为,运行这个主题之前,和运行这个主题之后两种情况。
代码中牵扯到的证书,怎样查看证书的相关知识,将在以后简单介绍
分享到:
相关推荐
【JMS ActiveMQ】知识点详解 一、MQ简介 消息队列(MQ)是一种用于分布式系统之间交换信息的技术。它允许应用程序将消息存储在内存或磁盘上,直到被其他应用程序消费。MQ使得应用程序可以独立运行,无需关心彼此的...
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
总之,这个"JMS ActiveMQ演示代码"提供了学习和实践JMS与ActiveMQ集成的起点,通过分析和运行这个示例,你可以深入理解消息中间件在分布式系统中的作用,以及如何利用JMS和ActiveMQ实现高效、可靠的异步通信。
在“sprin jms activemq 测试”这个主题中,我们将深入探讨Spring如何与JMS和ActiveMQ集成,以及如何进行相关的测试。 首先,Spring框架通过`spring-jms`模块提供了对JMS的支持。它提供了一组抽象,使得开发者能够...
标题中的“jms-test.zip_jms activemq_jms test”揭示了这是一个关于Java消息服务(JMS)的测试项目,特别提到了ActiveMQ作为消息代理。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它实现了JMS规范,允许...
Apache ActiveMQ 是一个开源的JMS提供商,它是Apache软件基金会的一部分,也是最广泛使用的JMS实现之一。ActiveMQ提供了多种协议支持,包括开放消息传递协议(STOMP)、AMQP、MQTT等,使得它能够与各种不同类型的...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它是Java消息服务(Java Message Service,简称JMS)的一个实现。JMS是一种为分布式应用提供异步消息传递的API,它定义了一种标准接口,使得不同的消息系统...
ActiveMQ 是Apache出品,最流行的,能力强劲的开源jms服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的...
activemq-all-5.9.0.jar jms的一个很好实现
**JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和阅读消息的应用程序接口。它为应用程序提供了标准的接口,可以跨越多种消息中间件产品进行通信。...
基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...
写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚: 1.可以有一个或者多个消息生产者向同一个destination发送消息. 2.queue类型的只能有一个...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它实现了Java消息服务(JMS)标准,为分布式系统提供高效、可靠的消息传递。在ActiveMQ中,有三种主要的方式来创建消息队列(QUEUE)和主题(TOPIC),这些...
**标题:“JMS 使用 ActiveMQ 传送文件”** 在IT领域,Java消息服务(Java Message Service,简称JMS)是一种标准API,它允许应用程序创建、发送、接收和读取消息。这种技术常用于异步通信,尤其是在分布式系统中,...
ActiveMQ 是一个流行的开源JMS提供者,它支持多种协议,如AMQP、STOMP和OpenWire等。 在"Apache Camel JMS ActiveMQ"的使用样例中,我们有两个主要的场景: 1. **从本地读取信息推送到MQ中**:这一部分涉及到了...
详细内容: SpringJMS整合ActiveMQ.doc 详细说明文档 apache-activemq-5.8.0-bin.zip ActiveMQ安装包 JMSTest.rar MyEclipse8.5下web工程
1. **PPTX文件(activemq.pptx)** - 这通常是一个演示文稿,详细介绍了JMS和ActiveMQ的基础知识、工作原理以及如何使用它们。它可能包含概念解释、架构图、配置示例和使用步骤等内容,帮助学习者理解ActiveMQ的核心...
在这个“一头扎进JMS之ActiveMQ系列”中,我们将深入探讨这个流行的开源消息代理——ActiveMQ,它是Apache软件基金会的项目,广泛应用于各种企业级应用。 ActiveMQ作为JMS的实现,提供了多种协议支持,包括开放标准...
ActivemQ是Apache软件基金会的一个项目,它实现了JMS规范,提供了一个高效、可靠的中间件服务,用于处理消息队列。本文将深入探讨如何在Spring 3.0中整合JMS与ActivemQ,以及它们在实际应用中的关键知识点。 首先,...
本教程将围绕"使用JMS操作ActiveMQ"这一主题,详细阐述如何通过JMS与ActiveMQ进行交互,包括创建生产者、消费者以及消息的发送和接收。 首先,我们需要理解JMS的基本概念。在JMS中,消息生产者(Producer)负责创建...