- 浏览: 474278 次
- 性别:
- 来自: 北京
-
文章分类
最新评论
-
chexiazi:
一样的xml代码 报这个错 <ns1:XMLFault ...
CXF 通过用户名和密码进行验证 -
di1984HIT:
谢谢啊 ~~~
通过JavaCompiler进行编译java文件(转载) -
aa00aa00:
'%${userName}%' 这种是可以的,是可以模糊查询的 ...
mybatis 模糊查询 -
一棵杨柳的地盘:
我把你的代码不了一遍 但是汇报错首先:static { ...
CXF 通过用户名和密码进行验证 -
shangmin1990:
转 IntelliJ IDEA 编辑器生成 Hibernate 实体映射文件
原文出处:http://blog.chenlb.com/2010/01/activemq-hello.html
<!-- google_ad_section_start -->企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。
在 Java 里有 JMS 的多个实现。其中 apache 下的 ActiveMQ 就是不错的选择。还有一个比较热的是 RabbitMQ (是 erlang 语言实现的)。这里示例下使用 ActiveMQ
用 ActiveMQ 最好还是了解下 JMS
JMS 公共 | 点对点域 | 发布/订阅域 |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。
ConnectionFactory 是连接工厂,负责创建Connection。
Connection 负责创建 Session。
Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。
Destination 是消息的目的地。
详细的可以网上找些 JMS 规范(有中文版)。
下载 apache-activemq-5.3.0。http://activemq.apache.org/download.html,解压,然后双击 bin/activemq.bat。运行后,可以在 http://localhost:8161/admin 观察。也有 demo, http://localhost:8161/demo。把 activemq-all-5.3.0.jar 加入 classpath。
Jms 发送 代码:
- publicstaticvoidmain(String[]args)throwsException{
- ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
- Connectionconnection=connectionFactory.createConnection();
- connection.start();
- Sessionsession=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
- Destinationdestination=session.createQueue("my-queue");
- MessageProducerproducer=session.createProducer(destination);
- for(inti=0;i<3;i++){
- MapMessagemessage=session.createMapMessage();
- message.setLong("count",newDate().getTime());
- Thread.sleep(1000);
- //通过消息生产者发出消息
- producer.send(message);
- }
- session.commit();
- session.close();
- connection.close();
- }
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for(int i=0; i<3; i++) { MapMessage message = session.createMapMessage(); message.setLong("count", new Date().getTime()); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } session.commit(); session.close(); connection.close(); }
Jms 接收代码:
- publicstaticvoidmain(String[]args)throwsException{
- ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
- Connectionconnection=connectionFactory.createConnection();
- connection.start();
- finalSessionsession=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
- Destinationdestination=session.createQueue("my-queue");
- MessageConsumerconsumer=session.createConsumer(destination);
- /*//listener方式
- consumer.setMessageListener(newMessageListener(){
- publicvoidonMessage(Messagemsg){
- MapMessagemessage=(MapMessage)msg;
- //TODOsomething....
- System.out.println("收到消息:"+newDate(message.getLong("count")));
- session.commit();
- }
- });
- Thread.sleep(30000);
- */
- inti=0;
- while(i<3){
- i++;
- MapMessagemessage=(MapMessage)consumer.receive();
- session.commit();
- //TODOsomething....
- System.out.println("收到消息:"+newDate(message.getLong("count")));
- }
- session.close();
- connection.close();
- }
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); /*//listener 方式 consumer.setMessageListener(new MessageListener() { public void onMessage(Message msg) { MapMessage message = (MapMessage) msg; //TODO something.... System.out.println("收到消息:" + new Date(message.getLong("count"))); session.commit(); } }); Thread.sleep(30000); */ int i=0; while(i<3) { i++; MapMessage message = (MapMessage) consumer.receive(); session.commit(); //TODO something.... System.out.println("收到消息:" + new Date(message.getLong("count"))); } session.close(); connection.close(); }
启动 JmsReceiver 和 JmsSender 可以在看输出三条时间信息。当然 Jms 还指定有其它格式的数据,如 TextMessage
结合 Spring 的 JmsTemplate 方便用:
xml:
- <?xmlversion="1.0"encoding="UTF-8"?>
- <beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
- <!--在非web/ejb容器中使用pool时,要手动stop,spring不会为你执行destroy-method的方法
- <beanid="jmsFactory"class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop">
- <propertyname="connectionFactory">
- <beanclass="org.apache.activemq.ActiveMQConnectionFactory">
- <propertyname="brokerURL"value="tcp://localhost:61616"/>
- </bean>
- </property>
- </bean>
- -->
- <beanid="jmsFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
- <propertyname="brokerURL"value="tcp://localhost:61616"/>
- </bean>
- <beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
- <propertyname="connectionFactory"ref="jmsFactory"/>
- <propertyname="defaultDestination"ref="destination"/>
- <propertyname="messageConverter">
- <beanclass="org.springframework.jms.support.converter.SimpleMessageConverter"/>
- </property>
- </bean>
- <beanid="destination"class="org.apache.activemq.command.ActiveMQQueue">
- <constructor-argindex="0"value="my-queue"/>
- </bean>
- </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <!-- 在非 web / ejb 容器中使用 pool 时,要手动 stop,spring 不会为你执行 destroy-method 的方法 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> </bean> --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory" /> <property name="defaultDestination" ref="destination" /> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" /> </property> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="my-queue" /> </bean> </beans>
sender:
- publicstaticvoidmain(String[]args){
- ApplicationContextctx=newFileSystemXmlApplicationContext("classpath:app*.xml");
- JmsTemplatejmsTemplate=(JmsTemplate)ctx.getBean("jmsTemplate");
- jmsTemplate.send(newMessageCreator(){
- publicMessagecreateMessage(Sessionsession)throwsJMSException{
- MapMessagemm=session.createMapMessage();
- mm.setLong("count",newDate().getTime());
- returnmm;
- }
- });
- }
public static void main(String[] args) { ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml"); JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate"); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { MapMessage mm = session.createMapMessage(); mm.setLong("count", new Date().getTime()); return mm; } }); }
receiver:
- publicstaticvoidmain(String[]args){
- ApplicationContextctx=newFileSystemXmlApplicationContext("classpath:app*.xml");
- JmsTemplatejmsTemplate=(JmsTemplate)ctx.getBean("jmsTemplate");
- while(true){
- Map<String,Object>mm=(Map<String,Object>)jmsTemplate.receiveAndConvert();
- System.out.println("收到消息:"+newDate((Long)mm.get("count")));
- }
- }
public static void main(String[] args) { ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml"); JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate"); while(true) { Map<String, Object> mm = (Map<String, Object>) jmsTemplate.receiveAndConvert(); System.out.println("收到消息:" + new Date((Long)mm.get("count"))); } }
注意:直接用 Jms 接口时接收了消息后要提交一下,否则下次启动接收者时还可以收到旧数据。有了 JmsTemplate 就不用自己提交 session.commit() 了。如果使用了 PooledConnectionFactory 要把 apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar 加到 classpath
参考资料:
发表评论
-
java 位运算总结
2018-10-23 11:14 0<div class="iteye-blog- ... -
java sort 排序中的自然序排序总结
2018-10-23 11:07 5855<div class="iteye-blo ... -
Java内存分析工具
2012-11-24 11:16 2262Java运行时数据区包含:Method area, heap, ... -
转 StringUtils.isBlank()函数(分享一下)
2012-02-15 10:23 3006StringUtils的isBlank与isEmply ... -
jquery 使用ajax 提交整个表单的数据
2012-01-06 14:52 5943在使用Struts开发项目的时候发现很爽的一件事就是可以不使用 ... -
struts2 通配符
2011-12-31 16:25 1311在使用struts2时,如果想定义action的话可以这样做 ... -
Maven Web项目部署到Tomcat下
2011-12-30 17:28 19602通过Maven来搭建项目是 ... -
velocity与servlet的整合
2011-12-28 21:07 1509第一天学习velocity,参考别人的blog写了一个与Se ... -
ibatis 与spring3整合
2011-12-28 13:01 5773spring与ibates整合,使用 ... -
ibatis 入门
2011-12-28 12:48 1283昨天学习了一下ibatis ... -
转 JTA简介
2011-12-26 17:47 1076Java Transaction API(Java事务 ... -
转 IntelliJ IDEA 编辑器生成 Hibernate 实体映射文件
2011-12-23 16:03 23416很多人不知道怎么用 IntelliJ IDEA 编辑器 ... -
转 IntelliJ IDEA 使用基础篇 Java IDE编辑器
2011-12-23 15:02 8546IntelliJ IDE ... -
转在CXF中用JAXB数据绑定支持Map类型
2011-12-05 16:38 5272一些java类型不能自然映射成xml,例如,HashMap ... -
cxf wsdltojava 构造方法报错
2011-12-05 14:23 1341I'm marking this as "n ... -
(转)Struts2学习笔记--Internationalization
2011-11-21 18:13 1375Struts2 的国际化 ======== ... -
单例模式
2011-11-18 20:47 1036单例模式(SINGLETON):单例模式确保某一个类只有一个实 ... -
类加载的顺序
2011-11-17 21:40 1208写道 package com.wang; public cl ... -
socket实现客户端与客户端通信
2011-11-17 19:34 3071最近闲来无聊,看了一下socket通信,实现了一个最简版的客户 ... -
接口设计的 11 种原则
2011-05-02 21:02 27607种设计坏味道 1.僵化性: 很难对系统进行改动,因为每个改动 ...
相关推荐
ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...
### ActiveMQ高并发处理方案详解 #### 一、引言 在现代分布式系统中,消息队列作为异步通信的核心组件之一,对于提高系统的吞吐量、降低响应时间和实现服务解耦等方面起着至关重要的作用。Apache ActiveMQ作为一款...
2. 客户端连接:客户端通过WebSocket API建立到ActiveMQ的连接,指定目标URL通常是ws://或者wss://(如果是加密连接)加上ActiveMQ服务器的地址和WebSocket端口。 3. 订阅主题或队列:连接建立后,客户端可以订阅想...
Apache ActiveMQ是开源的、基于Java消息服务(JMS)的应用服务器,它是Apache软件基金会的一部分。这个名为"apache-activemq-5.17.3"的压缩包包含了ActiveMQ的5.17.3版本,这是一个稳定且功能丰富的发布版本。在深入...
jmx监控ActiveMQ监控 jmx(Java Management Extensions)是一种Java技术,为Java应用程序提供了管理和监控的功能。ActiveMQ是Apache软件基金会下的一个开源消息队列系统,提供了高效、可靠的消息传递服务。在生产...
Apache ActiveMQ是业界广泛使用的开源消息中间件,它基于Java消息服务(JMS)标准,提供了高度可扩展、可靠的异步通信能力。标题"apache-activemq-5.16.5"指的是该软件的一个特定版本,即5.16.5版本,通常每个新版本...
本报告详细阐述了使用JMeter对ActiveMQ进行性能测试的过程和结果,旨在评估ActiveMQ在JMS(Java消息服务)环境下的性能表现。JMeter作为一个强大的负载和性能测试工具,被广泛用于测试各种应用程序,包括消息中间件...
**ActiveMQ的activemq.xml配置详解** ActiveMQ是Apache软件基金会开发的一个开源消息代理,它遵循Java消息服务(JMS)规范,提供可靠的消息传递功能。`activemq.xml`是ActiveMQ的核心配置文件,它定义了服务器的...
在Linux环境下,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它是Java Message Service (JMS) 的实现,能够处理大量的并发消息传递。ActiveMQ提供了高可用性、可扩展性和稳定性,使得它成为分布式...
Apache ActiveMQ是开源社区中最流行的消息中间件之一,它基于Java消息服务(JMS)标准,提供高效、可靠的异步通信解决方案。ActiveMQ在企业级应用中广泛应用,因为它支持多种协议,如OpenWire、STOMP、AMQP、MQTT、...
Apache ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它基于Java Message Service (JMS) 规范,提供高效、可靠的消息传递服务。在本文中,我们将深入探讨Apache ActiveMQ,特别是针对“apache-activemq-...
ActiveMQ路由配置方式 ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 ...
ActiveMQ是中国最流行的开源消息中间件之一,基于Java Message Service(JMS)规范,它提供了一个高可伸缩、高性能、稳定且灵活的消息传递平台。这个"ActiveMQ接受和发送工具.rar"压缩包包含了用于与ActiveMQ交互的...
ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS(Java Message Service,Java消息服务)1.1规范,面向消息的中间件(Message Oriented Middleware,MOM)是指利用高效可靠的消息传递机制进行与平台无关的...
Spring集成ActiveMQ是将Spring框架与ActiveMQ消息中间件相结合,实现异步处理和解耦应用程序的关键技术。在本文中,我们将深入探讨如何配置和使用这一组合,以及它在实际项目中的应用。 首先,让我们了解Spring框架...
#### 一、ActiveMQ简介 ActiveMQ是一款非常流行的开源消息中间件,它基于Java语言开发,并且遵循了Java消息服务(JMS)规范。ActiveMQ提供了丰富的特性,包括持久化消息存储、事务支持、集群等功能。在企业级应用中,...
而ActiveMQ是Apache出品的一款开源消息中间件,它遵循JMS(Java Message Service)规范,用于处理应用程序之间的异步通信。本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们...
ActiveMQ in Action pdf英文原版加源代码压缩包。 Apache ActiveMQ in Action is a thorough, practical guide to implementing message-oriented systems in Java using ActiveMQ. The book lays out the core of ...
ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递标准(JMS,Java Message Service),用于在分布式系统中实现可靠的消息传递。在本文中,我们将深入探讨ActiveMQ v6.0.1的核心特性、应用...
本资源提供的内容是关于ActiveMQ的连接池实现,分为两部分:一是作者自己实现的ActiveMQ连接池,二是新版本ActiveMQ自带的连接池。连接池是一种资源管理技术,通过复用已建立的数据库连接或网络连接,减少创建和销毁...