`
冰与火的抉择
  • 浏览: 12022 次
  • 性别: Icon_minigender_2
  • 来自: 西安
文章分类
社区版块
存档分类
最新评论

JMS activemq

 
阅读更多
语言是自己组织的。可能不专业,请读者见谅
有不对的地方,也欢迎大家指正

JMS有两种方式,一种是pub/sub,一种是(后续补充)
主要讨论pub/sub方式

pub/sub方式的工作流程:
首先订阅者(subscriber)向JMS容器订阅需要订阅的主题(topic),消息发布者发布消息,订阅了该主题的订阅者都能收到这个消息。(这是一对多的方式,类似于广播,个人理解),默认情况下,该消息不是持久的。即消息一发出,不管有没有人接受,都不会保存下来,即订阅者只能接收到自己订阅之后发布者发出的消息。
如果出现意外情况,订阅者突然断线了,发布者发布了一条消息,订阅者再连接上是不会收到消息的。解决这种情况的办法是订阅者中加个clientID。

JMS提供者(JMS Provider)也叫JMS服务器/容器

连接工厂Connection Factory:
是用来创建客户端到JMS容器之间JMS连接的工厂
context ctx = new InitialContext();
TopicConnectionFactory tcf = ctx.lookup("topic主题");

目的地:destinations
消息发布者发布消息的地方,即订阅者接收消息的来源地。因为目的地之注册在jms服务器的,所以我们只能通过JNDI查找。
Topic topic = (Topic)ctx.lookup("topic主题");

连接:connection
客户端与JMS提供者(容器)之间的连接
TopicConnection tc = tcf.createTopicConnection();

javax.jms.ExceptionListener

(请忽略:根据某些参数,生成clientID,向服务器注册JMS客户端)
需要导入activemq的jar包
第一种:最简单的activemq  默认端口:61616    发布者,订阅者示例
开启apache-activemq-5.11.1\bin下的activemq.bat
发布者:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicPublisher {

public static void main(String[] args) {
try {
//用户名,密码连接。连接方式:tcp。连接ip:port
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616");
Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建主题。连接时,发布者发布某个主题名,订阅者订阅想要订阅的主题名。
Topic topic = session.createTopic("TopicName");

//发布者  发布这个主题
MessageProducer producer = session.createProducer(topic);
//发送模式:非持久化         DeliveryMode.PERSISTENT:持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

while(true){
TextMessage message = session.createTextMessage();
String text = "abcdefg";
message.setText(text);
producer.send(message);
System.out.println("Sent Message:" + message.getText());
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

订阅者:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicSubscriber implements MessageListener {

private static ConnectionFactory connectionFactory = null;
private static Connection connection = null;
private static Session session = null;
private static Topic topic = null;
private static MessageConsumer consumer = null;

public TopicSubscriber(){
try {
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616");
connection = connectionFactory.createConnection();
// connection.setClientID("aaa");

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//主题名:一定想要订阅的发布者的主题名
topic = session.createTopic("TopicName");
consumer = session.createConsumer(topic);
// consumer = session.createDurableSubscriber(topic, "aaa");

consumer.setMessageListener(this);
//开始监听
connection.start();
} catch (JMSException e) {
e.printStackTrace();
close();
}
}

private void close() {
try {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}

} catch (JMSException e) {
e.printStackTrace();
}
finally{
try {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
consumer = null;
session = null;
connection = null;
} catch (JMSException e) {
e.printStackTrace();
}
}
}

@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
new TopicSubscriber();
}

}



第二种:SSL 安全通道加密的
发布者:

import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.apache.activemq.ActiveMQSslConnectionFactory;

public class TopicSSLPublisher {

ActiveMQSslConnectionFactory factory;
Connection connection;
Session session;
String trustStore = "证书名";
String url = "ssl://IP:port";
Topic topic = null;
MessageProducer producer = null;

public TopicSSLPublisher(){
try {
factory = new ActiveMQSslConnectionFactory();
factory.setBrokerURL(url);
factory.setUserName("admin");
factory.setPassword("admin");
factory.setKeyAndTrustManagers(null, getTrustManagers(trustStore), new SecureRandom());
connection = factory.createConnection();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("topicName");
producer = session.createProducer(topic);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();

while(true){
TextMessage message = session.createTextMessage();
String text = "sljdjk";
message.setText(text);
producer.send(message);
System.out.println("Sent Message:" + message.getText());

Thread.sleep(30000);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

}

private TrustManager[] getTrustManagers(String trustStore){
try {
//KeyStore.getDefaultType();
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(ClassLoader.getSystemResourceAsStream(trustStore),"证书密码".toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
return tmf.getTrustManagers();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

public static void main(String[] args) {
new TopicSSLPublisher();
}
}

订阅者:

import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.apache.activemq.ActiveMQSslConnectionFactory;

public class TopicSSLSubscriber implements MessageListener{
ActiveMQSslConnectionFactory factory;
Connection connection;
Session session;
private String trustStore = "证书名";
private String pwd = "pwd";
private String url = "ssl://ip:port";
Topic topic = null;
MessageConsumer consumer = null;

public TopicSSLSubscriber(){
try {
factory = new ActiveMQSslConnectionFactory();
factory.setBrokerURL(url);
factory.setUserName("admin");
factory.setPassword("admin");

factory.setKeyAndTrustManagers(null, getTrustManagers(trustStore), new SecureRandom());
connection = factory.createConnection();
// connection.setClientID("id");//保证唯一性就行
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("topicName");

consumer = session.createConsumer(topic);
// consumer = session.createDurableSubscriber(topic, "id");
//开始监听
connection.start();

} catch (JMSException e) {
e.printStackTrace();
close();
}


}

private TrustManager[] getTrustManagers(String trustStore){
try {
//KeyStore.getDefaultType();
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(ClassLoader.getSystemResourceAsStream(trustStore),pwd.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);
return tmf.getTrustManagers();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

public static void main(String[] args) {
new TopicSSLSubscriber();
}

@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

private void close() {
try {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}

} catch (JMSException e) {
e.printStackTrace();
}
finally{
try {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
if(connection != null){
connection.close();
}
consumer = null;
session = null;
connection = null;
} catch (JMSException e) {
e.printStackTrace();
}
}
}


private KeyManager[] getKetManagers(String trustStore){
try {
//KeyStore.getDefaultType();
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(ClassLoader.getSystemResourceAsStream(trustStore),pwd.toCharArray());
KeyManagerFactory tmf = KeyManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore,pwd.toCharArray());
return tmf.getKeyManagers();
} catch (KeyStoreException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (CertificateException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (UnrecoverableKeyException e) {
e.printStackTrace();
}
return null;
}
}



在这个过程中订阅者中添加clientID的就是解决突发情况的。clientID要保证唯一性。



自己下个activemq的包,activemq的下载连接
自己打开activemq,运行网页,观察网页中发布者,订阅者之间接收消息的情况。
和加了clientID时的情况。
知识点:加了clientID的情况要分为,运行这个主题之前,和运行这个主题之后两种情况。

代码中牵扯到的证书,怎样查看证书的相关知识,将在以后简单介绍

分享到:
评论

相关推荐

    JMS ActiveMQ

    【JMS ActiveMQ】知识点详解 一、MQ简介 消息队列(MQ)是一种用于分布式系统之间交换信息的技术。它允许应用程序将消息存储在内存或磁盘上,直到被其他应用程序消费。MQ使得应用程序可以独立运行,无需关心彼此的...

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    JMS ActiveMQ演示代码

    总之,这个"JMS ActiveMQ演示代码"提供了学习和实践JMS与ActiveMQ集成的起点,通过分析和运行这个示例,你可以深入理解消息中间件在分布式系统中的作用,以及如何利用JMS和ActiveMQ实现高效、可靠的异步通信。

    sprin jms activemq 测试

    在“sprin jms activemq 测试”这个主题中,我们将深入探讨Spring如何与JMS和ActiveMQ集成,以及如何进行相关的测试。 首先,Spring框架通过`spring-jms`模块提供了对JMS的支持。它提供了一组抽象,使得开发者能够...

    jms-test.zip_jms activemq_jms test

    标题中的“jms-test.zip_jms activemq_jms test”揭示了这是一个关于Java消息服务(JMS)的测试项目,特别提到了ActiveMQ作为消息代理。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它实现了JMS规范,允许...

    JMS ACTIVEMQ 教程文档

    Apache ActiveMQ 是一个开源的JMS提供商,它是Apache软件基金会的一部分,也是最广泛使用的JMS实现之一。ActiveMQ提供了多种协议支持,包括开放消息传递协议(STOMP)、AMQP、MQTT等,使得它能够与各种不同类型的...

    JMS.rar_activemq_jms_jms activemq

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它是Java消息服务(Java Message Service,简称JMS)的一个实现。JMS是一种为分布式应用提供异步消息传递的API,它定义了一种标准接口,使得不同的消息系统...

    jms_activeMQ

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源jms服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的...

    jms activemq-all-5.9.0.jar

    activemq-all-5.9.0.jar jms的一个很好实现

    JMS-ActiveMQ入门实例

    **JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和阅读消息的应用程序接口。它为应用程序提供了标准的接口,可以跨越多种消息中间件产品进行通信。...

    Spring+JMS+ActiveMQ+Tomcat实现消息服务的demo

    基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...

    jms activeMQ 经典代码

    写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚: 1.可以有一个或者多个消息生产者向同一个destination发送消息. 2.queue类型的只能有一个...

    activeMQ JMS 3种创建方式

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它实现了Java消息服务(JMS)标准,为分布式系统提供高效、可靠的消息传递。在ActiveMQ中,有三种主要的方式来创建消息队列(QUEUE)和主题(TOPIC),这些...

    JMS 使用 ActiveMQ 传送文件

    **标题:“JMS 使用 ActiveMQ 传送文件”** 在IT领域,Java消息服务(Java Message Service,简称JMS)是一种标准API,它允许应用程序创建、发送、接收和读取消息。这种技术常用于异步通信,尤其是在分布式系统中,...

    ApacheCamel-JMS-ActiveMQ

    ActiveMQ 是一个流行的开源JMS提供者,它支持多种协议,如AMQP、STOMP和OpenWire等。 在"Apache Camel JMS ActiveMQ"的使用样例中,我们有两个主要的场景: 1. **从本地读取信息推送到MQ中**:这一部分涉及到了...

    SpringJMS整合ActiveMQ

    详细内容: SpringJMS整合ActiveMQ.doc 详细说明文档 apache-activemq-5.8.0-bin.zip ActiveMQ安装包 JMSTest.rar MyEclipse8.5下web工程

    JMS-activemq 实例(分ppt,eclipse工程,说明三部分)

    1. **PPTX文件(activemq.pptx)** - 这通常是一个演示文稿,详细介绍了JMS和ActiveMQ的基础知识、工作原理以及如何使用它们。它可能包含概念解释、架构图、配置示例和使用步骤等内容,帮助学习者理解ActiveMQ的核心...

    一头扎进JMS之ActiveMQ系列

    在这个“一头扎进JMS之ActiveMQ系列”中,我们将深入探讨这个流行的开源消息代理——ActiveMQ,它是Apache软件基金会的项目,广泛应用于各种企业级应用。 ActiveMQ作为JMS的实现,提供了多种协议支持,包括开放标准...

    spring整合jms+activemq

    ActivemQ是Apache软件基金会的一个项目,它实现了JMS规范,提供了一个高效、可靠的中间件服务,用于处理消息队列。本文将深入探讨如何在Spring 3.0中整合JMS与ActivemQ,以及它们在实际应用中的关键知识点。 首先,...

    使用jms 操作ActiveMQ

    本教程将围绕"使用JMS操作ActiveMQ"这一主题,详细阐述如何通过JMS与ActiveMQ进行交互,包括创建生产者、消费者以及消息的发送和接收。 首先,我们需要理解JMS的基本概念。在JMS中,消息生产者(Producer)负责创建...

Global site tag (gtag.js) - Google Analytics