`
witcheryne
  • 浏览: 1099599 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

【入门】基于ActiveMQ 的发布/订阅(Pub/Sub) Chat 示例,上传了源码

阅读更多

环境需求:

1. JDK 1.5 或者以上

2. Apache Ant , 在写本文时,用的是 Ant 1.7.1

3. ActiveMQ , 在写本文时,用的是 Apache ActiveMQ 5.4.1

技术需求:

1. JMS(Java Message Service)  

2. JNDI(Java Naming and Directory Interface )

 

在JMS的“发布/订阅(pub/sub) ”模型中,消息的发布者(Publisher) 通过主题(Topic) 发布消息,订阅者(Subscriber) 通过订阅主题获取消息。 一个主题可以同时有多个订阅者. 通过这种方式我们可以实现广播式(broadcast)消息。

为了更好的理解"发布/订阅(Pub/Sub)"模式,我在《Java消息服务器 第二版》 上找到了一个很好的例子来说明他的使用。不过书上只提供了相关代码供我们理解,没有讲述整个创建过程,在这里打算记录下整个构建实例的过程:

 

1.  创建项目目录入下图所示,并将activemq-all-*.jar 复制到项目的classpath中:

 

 

2. 编写Chat代码:

 

 

public class Chat implements MessageListener {
	private TopicSession pubSession;
	private TopicPublisher pub;
	private TopicConnection conn;
	private String username;

	public Chat(String topicFactory, String topicName, String username)
			throws NamingException, JMSException {

		// 创建 JNDI context
		InitialContext ctx = new InitialContext();

		//1. 创建 TopicConnectionFacotry
		TopicConnectionFactory factory = (TopicConnectionFactory) ctx
				.lookup(topicFactory);
		//2. 创建 TopicConnection
		TopicConnection connection = factory.createTopicConnection();

		//3. 根据 Connection 创建 JMS 会话
		TopicSession pubSession = (TopicSession) connection.createSession(
				false, Session.AUTO_ACKNOWLEDGE);
		TopicSession subSession = (TopicSession) connection.createSession(
				false, Session.AUTO_ACKNOWLEDGE);

		//4. 创建 Topic
		Topic topic = (Topic) ctx.lookup(topicName);

		//5. 创建 发布者 和 订阅者
		TopicPublisher pub = pubSession.createPublisher(topic);
		TopicSubscriber sub = subSession.createSubscriber(topic, null, true);

		//6. 为发布者设置消息监听
		sub.setMessageListener(this);

		this.conn = connection;
		this.pub = pub;
		this.pubSession = pubSession;
		this.username = username;

		//7. 开启JMS连接
		connection.start();
	}

	protected void writeMessage(String txt) {
		try {
			TextMessage message = pubSession.createTextMessage();
			message.setText(username + ": " + txt);

			pub.publish(message);
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}

	public void onMessage(Message msg) {
		TextMessage txtMsg = (TextMessage) msg;
		try {
			System.out.println(txtMsg.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void close() throws JMSException {
		this.conn.close();
	}

	public static void main(String[] args) throws NamingException,
			JMSException, IOException {
		if (args.length != 3) {
			System.out.println("Factory, Topic, or username missing");
		}

		Chat chat = new Chat(args[0], args[1], args[2]);

		BufferedReader cmd = new BufferedReader(
				new InputStreamReader(System.in));

		while (true) {
			String s = cmd.readLine();

			if (s.equalsIgnoreCase("exit")) {
				chat.close();
				System.exit(0);
			} else {
				chat.writeMessage(s);
			}
		}
	}
}

 

  3.由于里我们使用了JNDI, 所以我们需要编辑jndi.properties。内容如下:

 

 

# START SNIPPET: jndi
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616

java.naming.security.principal=system
java.naming.security.credentials=manager

# use the following property to specify the JNDI name the connection factory
# should appear as. 
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
connectionFactoryNames = topicConnectionFactry


# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
#queue.MyQueue = example.ChatQue
topic.chat = example.chat

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = example.ChatTop

# END SNIPPET: jndi

 

4. 到这里已经基本完成Chat 编码工作,使用如下指令即可运行这个示例:

 

《Java 消息服务》原文 写道
java com.dayang.jms.demo.Chat [topicConnectionFactory] [topicName] [userName]

 

  不过如果没有设置相关classpath,是不可能通过这个指令来成功运行这个Demo,在这里我打算使用Ant来帮我完成这个工作

 

5. 编写build.xml脚本如下:

 

 

<?xml version="1.0" encoding="utf-8" ?>
<project name="chat" default="run" basedir=".">
	<property name="src.dir" value="src" />
	<property name="build.dir" value="build" />
	<property name="classes.dir" value="${build.dir}/classes" />
	<property name="jar.dir" value="${build.dir}/jar" />
	<property name="lib.dir" value="libs"/>		
	
	<!-- 设置main函数所在类 -->
	<property name="main-class" value="com.dayang.jms.chat.Chat" />
	
	<!-- 定义classpath -->
	<path id="classpath">
		<fileset dir="${lib.dir}" includes="**/*.jar" />
	</path>
	
	<!-- 创建构建目录,用于存放构建生成的文件 -->
	<target name="init">
		<mkdir dir="${build.dir}"/>
	</target>
	
	<!-- 编译 -->
	<target name="compile" depends="init">
		<mkdir dir="${classes.dir}"/>
		<javac srcdir="${src.dir}" destdir="${classes.dir}" 
			classpathref="classpath"/>
		<!-- copy properties file to classpath -->
		<copy todir="${classes.dir}">
			<fileset dir="${src.dir}" excludes="**.*.jar" />
		</copy>
	</target>
	
	<!-- 打包 -->
	<target name="jar" depends="compile">
		<mkdir dir="${jar.dir}"/>
		<jar destfile="${jar.dir}/${ant.project.name}.jar" 
				basedir="${classes.dir}">
			<manifest>
				<attribute name="Main-Class" value="${main-class}" />
			</manifest>
		</jar>
	</target>
	
	<!-- 运行client1 -->
	<target name="run1" depends="jar">
		<java fork="true" classname="${main-class}">
			<arg value="topicConnectionFactry"/>
			<arg value="chat"/>
			<arg value="client1"/>
			<classpath>
				<path refid="classpath"/>
				<path location="${jar.dir}/${ant.project.name}.jar"/>
			</classpath>
		</java>
	</target>
	
	<!-- 运行client2 -->
	<target name="run2" depends="jar">
		<java fork="true" classname="${main-class}">
			<arg value="topicConnectionFactry"/>
			<arg value="chat"/>
			<arg value="client2"/>
			<classpath>
				<path refid="classpath"/>
				<path location="${jar.dir}/${ant.project.name}.jar"/>
			</classpath>
		</java>
	</target>
	
	<target name="clean">
		<delete dir="${build.dir}"/>
	</target>
	
	<target name="clean-build" depends="clean,jar"/>
	
</project>
 

 

6. 打开两个控制台窗口,分别使用ant run1 和 ant run2 指令来运行程序, 如果成功我们将看到如下结果:


 

写在最后:

这个示例仅仅简单的说了JMS 发布/订阅 API的基本使用,更多特性需要在以后的使用中进行摸索。

发布/订阅 除了能够提供“1对多”的消息专递方式之外,还提供了消息持久化的特性。他允许订阅者在上线后接收离线时的消息,关于这部分特性,以及“发布/订阅”的应用场景打算在以后的文章中慢慢阐述。

参考资料:

1. JMS: http://baike.baidu.com/view/157103.htm

2. ActiveMQ: http://baike.baidu.com/view/433374.htm

3. JNDI http://baike.baidu.com/view/209575.htm

4. 《Java消息服务器 第二版》

 

5. Ant Manual http://ant.apache.org/manual/index.html

 

2011-05-18: 新增加了Demo的源代码, 需要的可以下载附件JSMDemo.rar

JMSDemo工程目录结构如下:


 

 

  • 大小: 21.7 KB
  • 大小: 88.1 KB
  • 大小: 47.6 KB
分享到:
评论
12 楼 潜心修炼 2010-12-13  
1.HornetQ也提供了对其他语言的支持,只不过需要通过其他的协议进行通信,比如STOMP
2.如果不是集群的话,主备是不是就有用了,呵呵
11 楼 witcheryne 2010-12-09  
潜心修炼 写道
witcheryne 写道
androidleader 写道
witcheryne 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;


这个你了解的比我多,性能测试方面我不太清楚, 关于测试方法希望你能分享一下。

我用JMeter测ActiveMQ 5.4.1, 开500个线程,1秒间隔, 循环 10 次~ 没发现什么异常...
我们用ActiveMQ主要目的是代替原先的SocketServer,将消息传递独立出来,解决C/S和B/S应用集成的问题。

需要高并发·你试试 ZeroMQ : http://www.infoq.com/cn/news/2010/09/introduction-zero-mq,
基于AMQP协议,用Erlang写的RabbitMQ你也可以试试: http://www.infoq.com/cn/articles/AMQP-RabbitMQ





可以参考一下HornetQ。
1.支持集群
2.支持主备,高可用
3.性能好,吞吐量能在5000+/秒。(跟环境相关),不过官方说能到几万
4.支持STOMP协议
还有其他优势,不过也有不足,主备的方式不是很让人满意,不过还是有别的办法来进行补足。


支持集群就够了,还要主备?? 这个有点不理解....

ActiveMQ 对 C++ 和 Ajax 都提供了客户端实现,这个比较符合我们的需求...

如果纯java平台,HornetQ 感觉不错~ 文档很全面



10 楼 潜心修炼 2010-12-09  
witcheryne 写道
androidleader 写道
witcheryne 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;


这个你了解的比我多,性能测试方面我不太清楚, 关于测试方法希望你能分享一下。

我用JMeter测ActiveMQ 5.4.1, 开500个线程,1秒间隔, 循环 10 次~ 没发现什么异常...
我们用ActiveMQ主要目的是代替原先的SocketServer,将消息传递独立出来,解决C/S和B/S应用集成的问题。

需要高并发·你试试 ZeroMQ : http://www.infoq.com/cn/news/2010/09/introduction-zero-mq,
基于AMQP协议,用Erlang写的RabbitMQ你也可以试试: http://www.infoq.com/cn/articles/AMQP-RabbitMQ





可以参考一下HornetQ。
1.支持集群
2.支持主备,高可用
3.性能好,吞吐量能在5000+/秒。(跟环境相关),不过官方说能到几万
4.支持STOMP协议
还有其他优势,不过也有不足,主备的方式不是很让人满意,不过还是有别的办法来进行补足。
9 楼 witcheryne 2010-12-09  
grady 写道
jms--Java Message Service lz貌似写错了

多谢~ 立刻纠正...
8 楼 grady 2010-12-09  
jms--Java Message Service lz貌似写错了
7 楼 witcheryne 2010-12-09  
androidleader 写道
witcheryne 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;


这个你了解的比我多,性能测试方面我不太清楚, 关于测试方法希望你能分享一下。

我用JMeter测ActiveMQ 5.4.1, 开500个线程,1秒间隔, 循环 10 次~ 没发现什么异常...
我们用ActiveMQ主要目的是代替原先的SocketServer,将消息传递独立出来,解决C/S和B/S应用集成的问题。

需要高并发·你试试 ZeroMQ : http://www.infoq.com/cn/news/2010/09/introduction-zero-mq,
基于AMQP协议,用Erlang写的RabbitMQ你也可以试试: http://www.infoq.com/cn/articles/AMQP-RabbitMQ



6 楼 androidleader 2010-12-09  
witcheryne 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ



1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些
   1) 有丢消息或消息重复问题
   2) 连续启停,failover机制有问题,存在幽灵队列
2、发现没有什么好的测试方法
   1) 测试的benchmark,网上有一个,
   2) loadrunner不太好使,自己写程序测不太可靠;
5 楼 witcheryne 2010-12-09  
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡


如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法....

并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面:
http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ


4 楼 androidleader 2010-12-09  
liubey 写道
GRDJE 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡

人家就是写个helloworld的demo,
nc啊问这些问题.....

+1

我只是希望有路过的高手出来指点一二

nc的几位,激动个啥。。。
3 楼 liubey 2010-12-09  
GRDJE 写道
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡

人家就是写个helloworld的demo,
nc啊问这些问题.....

+1
2 楼 GRDJE 2010-12-09  
androidleader 写道
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡

人家就是写个helloworld的demo,
nc啊问这些问题.....
1 楼 androidleader 2010-12-08  
好贴,就是有点简单。

我想知道的几个问题是:
1、并发性
2、稳定性
3、性能,吞吐量,各种persistent的问题
4、集群/负载均衡

相关推荐

    用C#实现的ActiveMQ发布/订阅消息传送

    在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...

    Spring Boot ActiveMQ发布/订阅消息模式原理解析

    Spring Boot ActiveMQ 发布/订阅消息模式原理解析 在本文中,我们将深入探讨 Spring Boot ActiveMQ 发布/订阅消息模式的原理和实现。发布/订阅消息模式是一种经典的消息传递模式,在该模式中,消息发送者将消息发送...

    ActiveMQ的点对点与发布/订阅模式小demo

    2. **发布/订阅模式(Publish/Subscribe,Pub/Sub)**: 发布/订阅模式基于主题(Topic)进行通信。生产者(发布者)发布消息到一个主题,多个消费者(订阅者)可以订阅这个主题并接收到消息。相比于点对点模式,...

    JMS之ActiveMQ 点对点+发布/订阅

    在JMS中,消息传递有两种基本模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。 **点对点(Point-to-Point,P2P)模型** 点对点模型是基于队列(Queue)的通信方式。在这个模型中,...

    activeMq点对点和发布/订阅模式demo

    在`activeMQ_demo`这个压缩包中,可能包含了一些示例代码,用于演示如何使用ActiveMQ实现点对点和发布/订阅模式。这些示例可能包括了以下内容: 1. 生产者(Producer):创建和发送消息到队列或主题的代码,展示了...

    ActiveMQ-Topic订阅发布模式Demo

    本示例“ActiveMQ-Topic订阅发布模式Demo”主要关注的是发布/订阅模式,这是一种一对多的消息传递方式。在发布/订阅模式中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅这...

    ActiveMQ的处理模式:PTP与PUB/SUB

    在ActiveMQ中,有两种主要的消息处理模式:点对点(Point-to-Point,简称PTP)和发布/订阅(Publish/Subscribe,简称PUB/SUB)。本文将深入探讨这两种模式及其在SpringBoot应用中的实现。 首先,点对点(PTP)模式...

    ActiveMQ C/C++ 编译库需要的文件

    标题 "ActiveMQ C/C++ 编译库需要的文件" 提到的是关于使用ActiveMQ中间件在C或C++环境中构建通信库所需的一些关键组件。ActiveMQ是Apache软件基金会的一个开源消息传递系统,它实现了Java消息服务(JMS)规范,并...

    ActiveMq发布和订阅消息的实现源码

    本篇文章将深入探讨ActiveMQ的发布/订阅模型(Publish/Subscribe)的实现源码,以及如何与Spring框架进行集成。 首先,我们需要理解ActiveMQ中的发布/订阅模式。在这个模型中,生产者(Publisher)发送消息到一个...

    ActiveMQ通信方式点对点和订阅发布

    本DEMO将深入探讨ActiveMQ中的两种主要通信模式:点对点(Point-to-Point,P2P)模型和发布/订阅(Publish/Subscribe,Pub/Sub)模型。 一、点对点(P2P)通信方式 1. 基本概念:在P2P模型中,消息从一个生产者...

    Apache ActiveMQ学习笔记【原创:mq的方式有两种:点到点和发布/订阅】

    通过以上介绍,我们可以看到 ActiveMQ 提供了两种主要的消息传递模型:点对点模型和发布/订阅模型。这两种模型各有特点,适用于不同的应用场景。点对点模型适用于一对一的消息传递场景,而发布/订阅模型则适用于一对...

    ActiveMQ Master/Slave 主从配置

    ActiveMQ是Apache软件基金会下的一个开源消息代理项目,主要提供了消息队列和消息传递模型的实现。在分布式系统中,消息队列的高可用性是保障服务稳定运行的关键因素之一。ActiveMQ支持多种高可用的主从配置模式,...

    ActiveMq 点对点 发布订阅demo

    在本文中,我们将深入探讨ActiveMQ中的两种主要通信模式:点对点(Point-to-Point)和发布/订阅(Publish/Subscribe),并基于提供的"ActiveMq 点对点 发布订阅demo"进行分析。 1. **点对点通信模式(Point-to-...

    activemq 入门示例代码

    **ActiveMQ 入门示例代码详解** ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统...

    ActiveMQ / RabbitMQ 示例

    在阿里云MQ的示例中,开发者可以学习如何利用其API和SDK在云端部署和管理消息队列,实现消息的发布与订阅。 "Demo_JMS"这个文件名可能指的是一个包含JMS(Java消息服务)接口使用示例的代码包。JMS是Java平台中定义...

    ActiveMQ 5.7源码及jar包

    1. **快速入门**:帮助文档通常包含快速入门指南,指导用户如何快速搭建环境,编写简单的发布/订阅或点对点示例。 2. **高级特性**:文档还会涵盖高级特性,如事务、消息优先级、时间戳、死信队列等,帮助开发者...

    ActiveMQ订阅模式持久化实现

    ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅模式下,消息生产者(Publisher)...

    activeMQ简单入门案例

    ActiveMQ是中国最流行的开源消息中间件之一,它基于Java Message Service (JMS) 规范,为分布式系统提供高效、可靠的消息传递。本教程将引导你通过一个简单的入门案例了解如何使用ActiveMQ实现生产者与消费者的模式...

    activeMQ入门到精通.txt

    - **发布/订阅(Pub/Sub)**:与P2P不同,发布/订阅模型允许多个订阅者订阅同一主题。当消息被发布到主题时,所有订阅了该主题的订阅者都会接收到消息。 #### 高级特性 - **事务支持**:ActiveMQ支持JTA和XA事务,...

    apache MQ 点对点,发布/订阅者,开发流程

    本文将详细介绍Apache ActiveMQ在点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)模式下的开发流程,并提供相关的实践指导。 一、点对点(Point-to-Point)模式 点对点模式是一种基于队列(Queue)的...

Global site tag (gtag.js) - Google Analytics