精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (16)
|
|
---|---|
作者 | 正文 |
发表时间:2010-12-08
最后修改:2011-05-18
环境需求:1. JDK 1.5 或者以上 2. Apache Ant , 在写本文时,用的是 Ant 1.7.1 3. ActiveMQ , 在写本文时,用的是 Apache ActiveMQ 5.4.1 技术需求:2. JNDI(Java Naming and Directory Interface )
在JMS的“发布/订阅(pub/sub) ”模型中,消息的发布者(Publisher) 通过主题(Topic) 发布消息,订阅者(Subscriber) 通过订阅主题获取消息。 一个主题可以同时有多个订阅者. 通过这种方式我们可以实现广播式(broadcast)消息。 为了更好的理解"发布/订阅(Pub/Sub)"模式,我在《Java消息服务器 第二版》 上找到了一个很好的例子来说明他的使用。不过书上只提供了相关代码供我们理解,没有讲述整个创建过程,在这里打算记录下整个构建实例的过程:
1. 创建项目目录入下图所示,并将activemq-all-*.jar 复制到项目的classpath中:
2. 编写Chat代码:
public class Chat implements MessageListener { private TopicSession pubSession; private TopicPublisher pub; private TopicConnection conn; private String username; public Chat(String topicFactory, String topicName, String username) throws NamingException, JMSException { // 创建 JNDI context InitialContext ctx = new InitialContext(); //1. 创建 TopicConnectionFacotry TopicConnectionFactory factory = (TopicConnectionFactory) ctx .lookup(topicFactory); //2. 创建 TopicConnection TopicConnection connection = factory.createTopicConnection(); //3. 根据 Connection 创建 JMS 会话 TopicSession pubSession = (TopicSession) connection.createSession( false, Session.AUTO_ACKNOWLEDGE); TopicSession subSession = (TopicSession) connection.createSession( false, Session.AUTO_ACKNOWLEDGE); //4. 创建 Topic Topic topic = (Topic) ctx.lookup(topicName); //5. 创建 发布者 和 订阅者 TopicPublisher pub = pubSession.createPublisher(topic); TopicSubscriber sub = subSession.createSubscriber(topic, null, true); //6. 为发布者设置消息监听 sub.setMessageListener(this); this.conn = connection; this.pub = pub; this.pubSession = pubSession; this.username = username; //7. 开启JMS连接 connection.start(); } protected void writeMessage(String txt) { try { TextMessage message = pubSession.createTextMessage(); message.setText(username + ": " + txt); pub.publish(message); } catch (JMSException e) { e.printStackTrace(); } } public void onMessage(Message msg) { TextMessage txtMsg = (TextMessage) msg; try { System.out.println(txtMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } public void close() throws JMSException { this.conn.close(); } public static void main(String[] args) throws NamingException, JMSException, IOException { if (args.length != 3) { System.out.println("Factory, Topic, or username missing"); } Chat chat = new Chat(args[0], args[1], args[2]); BufferedReader cmd = new BufferedReader( new InputStreamReader(System.in)); while (true) { String s = cmd.readLine(); if (s.equalsIgnoreCase("exit")) { chat.close(); System.exit(0); } else { chat.writeMessage(s); } } } }
3.由于里我们使用了JNDI, 所以我们需要编辑jndi.properties。内容如下:
# START SNIPPET: jndi java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory # use the following property to configure the default connector java.naming.provider.url = tcp://localhost:61616 java.naming.security.principal=system java.naming.security.credentials=manager # use the following property to specify the JNDI name the connection factory # should appear as. #connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry connectionFactoryNames = topicConnectionFactry # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] #queue.MyQueue = example.ChatQue topic.chat = example.chat # register some topics in JNDI using the form # topic.[jndiName] = [physicalName] #topic.MyTopic = example.ChatTop # END SNIPPET: jndi
4. 到这里已经基本完成Chat 编码工作,使用如下指令即可运行这个示例:
《Java 消息服务》原文 写道
java com.dayang.jms.demo.Chat [topicConnectionFactory] [topicName] [userName]
不过如果没有设置相关classpath,是不可能通过这个指令来成功运行这个Demo,在这里我打算使用Ant来帮我完成这个工作
5. 编写build.xml脚本如下:
<?xml version="1.0" encoding="utf-8" ?> <project name="chat" default="run" basedir="."> <property name="src.dir" value="src" /> <property name="build.dir" value="build" /> <property name="classes.dir" value="${build.dir}/classes" /> <property name="jar.dir" value="${build.dir}/jar" /> <property name="lib.dir" value="libs"/> <!-- 设置main函数所在类 --> <property name="main-class" value="com.dayang.jms.chat.Chat" /> <!-- 定义classpath --> <path id="classpath"> <fileset dir="${lib.dir}" includes="**/*.jar" /> </path> <!-- 创建构建目录,用于存放构建生成的文件 --> <target name="init"> <mkdir dir="${build.dir}"/> </target> <!-- 编译 --> <target name="compile" depends="init"> <mkdir dir="${classes.dir}"/> <javac srcdir="${src.dir}" destdir="${classes.dir}" classpathref="classpath"/> <!-- copy properties file to classpath --> <copy todir="${classes.dir}"> <fileset dir="${src.dir}" excludes="**.*.jar" /> </copy> </target> <!-- 打包 --> <target name="jar" depends="compile"> <mkdir dir="${jar.dir}"/> <jar destfile="${jar.dir}/${ant.project.name}.jar" basedir="${classes.dir}"> <manifest> <attribute name="Main-Class" value="${main-class}" /> </manifest> </jar> </target> <!-- 运行client1 --> <target name="run1" depends="jar"> <java fork="true" classname="${main-class}"> <arg value="topicConnectionFactry"/> <arg value="chat"/> <arg value="client1"/> <classpath> <path refid="classpath"/> <path location="${jar.dir}/${ant.project.name}.jar"/> </classpath> </java> </target> <!-- 运行client2 --> <target name="run2" depends="jar"> <java fork="true" classname="${main-class}"> <arg value="topicConnectionFactry"/> <arg value="chat"/> <arg value="client2"/> <classpath> <path refid="classpath"/> <path location="${jar.dir}/${ant.project.name}.jar"/> </classpath> </java> </target> <target name="clean"> <delete dir="${build.dir}"/> </target> <target name="clean-build" depends="clean,jar"/> </project>
6. 打开两个控制台窗口,分别使用ant run1 和 ant run2 指令来运行程序, 如果成功我们将看到如下结果:
写在最后:这个示例仅仅简单的说了JMS 发布/订阅 API的基本使用,更多特性需要在以后的使用中进行摸索。 发布/订阅 除了能够提供“1对多”的消息专递方式之外,还提供了消息持久化的特性。他允许订阅者在上线后接收离线时的消息,关于这部分特性,以及“发布/订阅”的应用场景打算在以后的文章中慢慢阐述。 参考资料:1. JMS: http://baike.baidu.com/view/157103.htm 2. ActiveMQ: http://baike.baidu.com/view/433374.htm 3. JNDI http://baike.baidu.com/view/209575.htm
5. Ant Manual http://ant.apache.org/manual/index.html
2011-05-18: 新增加了Demo的源代码, 需要的可以下载附件JSMDemo.rar JMSDemo工程目录结构如下:
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2010-12-08
好贴,就是有点简单。
我想知道的几个问题是: 1、并发性 2、稳定性 3、性能,吞吐量,各种persistent的问题 4、集群/负载均衡 |
|
返回顶楼 | |
发表时间:2010-12-09
androidleader 写道 好贴,就是有点简单。
我想知道的几个问题是: 1、并发性 2、稳定性 3、性能,吞吐量,各种persistent的问题 4、集群/负载均衡 人家就是写个helloworld的demo, nc啊问这些问题..... |
|
返回顶楼 | |
发表时间:2010-12-09
GRDJE 写道 androidleader 写道 好贴,就是有点简单。
我想知道的几个问题是: 1、并发性 2、稳定性 3、性能,吞吐量,各种persistent的问题 4、集群/负载均衡 人家就是写个helloworld的demo, nc啊问这些问题..... +1 |
|
返回顶楼 | |
发表时间:2010-12-09
liubey 写道 GRDJE 写道 androidleader 写道 好贴,就是有点简单。
我想知道的几个问题是: 1、并发性 2、稳定性 3、性能,吞吐量,各种persistent的问题 4、集群/负载均衡 人家就是写个helloworld的demo, nc啊问这些问题..... +1 我只是希望有路过的高手出来指点一二 nc的几位,激动个啥。。。 |
|
返回顶楼 | |
发表时间:2010-12-09
androidleader 写道 好贴,就是有点简单。
我想知道的几个问题是: 1、并发性 2、稳定性 3、性能,吞吐量,各种persistent的问题 4、集群/负载均衡 如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法.... 并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面: http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ |
|
返回顶楼 | |
发表时间:2010-12-09
witcheryne 写道 androidleader 写道 好贴,就是有点简单。
我想知道的几个问题是: 1、并发性 2、稳定性 3、性能,吞吐量,各种persistent的问题 4、集群/负载均衡 如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法.... 并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面: http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ 1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些 1) 有丢消息或消息重复问题 2) 连续启停,failover机制有问题,存在幽灵队列 2、发现没有什么好的测试方法 1) 测试的benchmark,网上有一个, 2) loadrunner不太好使,自己写程序测不太可靠; |
|
返回顶楼 | |
发表时间:2010-12-09
androidleader 写道 witcheryne 写道 androidleader 写道 好贴,就是有点简单。
我想知道的几个问题是: 1、并发性 2、稳定性 3、性能,吞吐量,各种persistent的问题 4、集群/负载均衡 如果你对这些感兴趣你可以在文中提到的参考资料里找找·· ActiveMQ 文档提供了各种测试方法.... 并发,稳定,吞吐量基本因需求而定, 消息传递有很多协议, 每种协议对应的应用场景都有所不同,这里有篇文章对MOM选型以及性能说的很全面: http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes#Zero_MQ 1、发现现有的几个版本,5.3 5.4都不是很稳定,5.2稳定些 1) 有丢消息或消息重复问题 2) 连续启停,failover机制有问题,存在幽灵队列 2、发现没有什么好的测试方法 1) 测试的benchmark,网上有一个, 2) loadrunner不太好使,自己写程序测不太可靠; 这个你了解的比我多,性能测试方面我不太清楚, 关于测试方法希望你能分享一下。 我用JMeter测ActiveMQ 5.4.1, 开500个线程,1秒间隔, 循环 10 次~ 没发现什么异常... 我们用ActiveMQ主要目的是代替原先的SocketServer,将消息传递独立出来,解决C/S和B/S应用集成的问题。 需要高并发·你试试 ZeroMQ : http://www.infoq.com/cn/news/2010/09/introduction-zero-mq, 基于AMQP协议,用Erlang写的RabbitMQ你也可以试试: http://www.infoq.com/cn/articles/AMQP-RabbitMQ |
|
返回顶楼 | |
发表时间:2010-12-09
jms--Java Message Service lz貌似写错了
|
|
返回顶楼 | |
发表时间:2010-12-09
grady 写道 jms--Java Message Service lz貌似写错了
多谢~ 立刻纠正... |
|
返回顶楼 | |