- 浏览: 115108 次
- 性别:
- 来自: 北京
文章分类
最新评论
基本发送与接收
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<!--创建连接工厂 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<!--创建主题连接工厂 -->
<bean id="topicListenConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!--创建主题 -->
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="normandy.topic"/>
</bean>
<!--消息转换器 -->
<bean id="messageConvertForSys" class="com.normandy.tech.test.MessageConvertForSys">
<bean id="topicSendJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="topicSendConnectionFactory"></property>
<property name="defaultDestination" ref="myTopic" />
<property name="messageConverter" ref="messageConvertForSys" />
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="true"/>
</bean>
<!-- 声明ActiveMQ消息目标,目标可以是一个队列,也可以是一个主题ActiveMQTopic -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg>
</bean>
<!---->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="defaultDestination" ref="destination"></property>
<property name="receiveTimeout" value="6000"></property>
</bean>
<bean id="sender" class="com.roadrantz.marketing.Sender">
<property name="jmsTemplate" ref="jmsTemplate"></property>
</bean>
<bean id="receiver" class="com.roadrantz.marketing.Receiver">
<property name="jmsTemplate" ref="jmsTemplate"></property>
</bean>
</beans>
生产者
public class Sender {
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendInfo() {
this.jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session arg0) throws JMSException {
MapMessage mess = arg0.createMapMessage();
mess.setString("lastname", "pp");
return mess;
}
});
}
}
消费者
public class Receiver {
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Receiver(){}
public String receiveMessage() throws JMSException{
MapMessage mess=(MapMessage) jmsTemplate.receive();
String re=mess.getString("lastname");
return re;
}
}
持久化
将mysql的jar包拷贝到active的lib目录下
修改activemq/conf/activemq.xml文件
增加数据源配置
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/mysql?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
持久化适配器
<persistenceAdapter>
<!-- <journaledJDBC journalLogFiles="5" dataDirectory="${activemq.base}/activemq-data"/> -->
<!-- To use a different datasource, use the following syntax : -->
<journaledJDBC journalLogFiles="5" dataDirectory="../activemq-data" dataSource="#mysql-ds"/>
</persistenceAdapter>
定义转换器
public class MessageConvertForSys implements MessageConverter {
/* (non-Javadoc)
* @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object, javax.jms.Session)
*/
public Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException {
System.out.println("[toMessage]");
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setJMSExpiration(1000);
objectMessage.setStringProperty("key1", object+"_add");
return objectMessage;
}
/* (non-Javadoc)
* @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message)
*/
public Object fromMessage(Message message) throws JMSException,
MessageConversionException {
System.out.println("[fromMessage]");
ObjectMessage objectMessage = (ObjectMessage) message;
return objectMessage.getObjectProperty("key1");
}
}
spring消息驱动pojo spring+activeMq+jencks
jencks是一个JCA容器,可以控制mq连接,据说可以控制事务
applicationContext
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<bean id="helloBean" class="mdp.demo.HelloBean" />
<bean id="jmsTransactionManager" class="org.jencks.factory.TransactionManagerFactoryBean" />
<bean id="activeMQContainer" class="org.jencks.JCAContainer">
<property name="bootstrapContext">
<bean id="bootstrapContext" class="org.jencks.factory.BootstrapContextFactoryBean">
<property name="transactionManager" ref="jmsTransactionManager" />
</bean>
</property>
<property name="resourceAdapter">
<bean id="activeMQResourceAdapter" class="org.apache.activemq.ra.ActiveMQResourceAdapter">
<property name="serverUrl" value="tcp://localhost:61616" />
</bean>
</property>
</bean>
<bean id="HelloMDP" class="org.jencks.JCAConnector">
<property name="jcaContainer" ref="activeMQContainer" />
<property name="transactionManager" ref="jmsTransactionManager" />
<property name="activationSpec">
<bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
<property name="destination" value="Hello.Queue" />
<property name="destinationType" value="javax.jms.Queue" />
</bean>
</property>
<property name="ref" value="helloBean" />
</bean>
</beans>
监听类
public class HelloBean implements MessageListener {
@Override
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
try {
String name=arg0.getStringProperty("name");
System.out.println(name);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
客户端applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="defaultDestinationName" value="messages.input" />
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</beans>
服务端启动
public class ServerTest {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
ApplicationContext ctx=new ClassPathXmlApplicationContext("applicationContext.xml");
}
}
客户端
package mdp.demo;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class MDPClient {
/**
* @param args
*/
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
ApplicationContext ctx=new ClassPathXmlApplicationContext("spring-client.xml");
JmsTemplate tmp=(JmsTemplate) ctx.getBean("jmsTemplate");
tmp.setDefaultDestinationName("Hello.Queue");
tmp.send(new MessageCreator(){
@Override
public Message createMessage(Session arg0) throws JMSException {
// TODO Auto-generated method stub
Message mes=arg0.createMapMessage();
mes.setStringProperty("name", "hello");
return mes;
}
});
}
}
事务
<beans>
<!-- ActiveMQ Broker -->
<bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
<property name="persistent" value="false"/>
<property name="transportConnectorURIs">
<list>
<value>tcp://localhost:5000</value>
</list>
</property>
</bean>
<!-- Geronimo Transaction Manager -->
<bean id="transactionContextManager" class="org.jencks.factory.TransactionContextManagerFactoryBean"/>
<bean id="geronimo" class="org.jencks.factory.GeronimoTransactionManagerFactoryBean"/>
<bean id="geronimoTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="geronimo" />
</bean>
<!-- Jencks Connection Manager -->
<bean id="connectionManager" class="org.jencks.factory.ConnectionManagerFactoryBean">
<property name="transactionSupport">
<bean class="org.jencks.factory.XATransactionFactoryBean">
<property name="useTransactionCaching" value="true"/>
<property name="useThreadCaching" value="false"/>
</bean>
</property>
<property name="poolingSupport">
<bean class="org.jencks.factory.SinglePoolFactoryBean">
<property name="maxSize" value="2"/>
<property name="minSize" value="1"/>
<property name="blockingTimeoutMilliseconds" value="60"/>
<property name="idleTimeoutMinutes" value="60"/>
<property name="matchOne" value="true"/>
<property name="matchAll" value="true"/>
<property name="selectOneAssumeMatch" value="true"/>
</bean>
</property>
</bean>
<!-- ActiveMQ Connection -->
<bean id="jmsResourceAdapter" class="org.apache.activemq.ra.ActiveMQResourceAdapter" depends-on="broker">
<property name="serverUrl">
<value>tcp://localhost:5000</value>
</property>
</bean>
<bean id="jmsManagedConnectionFactory" class="org.apache.activemq.ra.ActiveMQManagedConnectionFactory">
<property name="resourceAdapter" ref="jmsResourceAdapter"/>
</bean>
<bean id="jmsConnectionFactory" class="org.springframework.jca.support.LocalConnectionFactoryBean">
<property name="managedConnectionFactory" ref="jmsManagedConnectionFactory"/>
<property name="connectionManager" ref="connectionManager"/>
</bean>
<!-- Tranql JDBC Connection -->
<!--
<bean id="tranqlManagedConnectionFactory" class="org.jencks.tranql.XAPoolDataSourceMCF">
<property name="driverName" value="org.postgresql.Driver"/>
<property name="url" value="jdbc:postgresql://ats-manager/activemq"/>
<property name="user" value="activemq"/>
</bean>
<bean id="tranqlDataSource" class="org.springframework.jca.support.LocalConnectionFactoryBean">
<property name="managedConnectionFactory" ref="tranqlManagedConnectionFactory"/>
<property name="connectionManager" ref="connectionManager"/>
</bean>
-->
<!-- Enhydra JDBC Connection -->
<bean id="enhydraDataSource" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown">
<property name="dataSource">
<bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown">
<property name="transactionManager" ref="geronimo" />
<property name="driverName" value="org.postgresql.Driver" />
<property name="url" value="jdbc:postgresql://ats-manager/activemq" />
</bean>
</property>
<property name="user" value="activemq"/>
</bean>
<bean id="jencksJCAContainer" class="org.jencks.JCAContainer">
<property name="bootstrapContext">
<bean class="org.jencks.factory.BootstrapContextFactoryBean">
<property name="threadPoolSize" value="25"/>
</bean>
</property>
<property name="resourceAdapter" ref="jmsResourceAdapter"/>
</bean>
<bean id="inboundConnector" class="org.jencks.JCAConnector">
<property name="jcaContainer" ref="jencksJCAContainer" />
<property name="activationSpec">
<bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
<property name="destination" value="messages.input"/>
<property name="destinationType" value="javax.jms.Queue"/>
</bean>
</property>
<property name="transactionManager" ref="geronimo"/>
<property name="ref" value="echoBean"/>
</bean>
<bean id="echoBean" class="transactions.EchoBean">
<property name="jdbcTemplate">
<bean class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="enhydraDataSource"/>
</bean>
</property>
<property name="jmsTemplate">
<bean class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
</property>
</bean>
</beans>
public class EchoBean implements MessageListener {
private Log log = LogFactory.getLog(getClass());
private JdbcTemplate jdbcTemplate;
private JmsTemplate jmsTemplate;
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void onMessage(Message message) {
log.debug(message);
if (message instanceof TextMessage) {
try {
String messageText = ((TextMessage)message).getText();
log.debug("execute JMS operation");
jmsTemplate.convertAndSend("messages.ouptut", messageText);
log.debug("execute JDBC operation");
jdbcTemplate.execute("insert into t1 values('"+messageText+"')");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
发表评论
-
jdk1.6新特性
2011-02-17 11:01 813一:Desktop类和SystemTray类 在JDK ... -
位运算应用口诀和实例
2011-03-11 14:40 595位运算应用口诀清零取反要用与,某位置一可用或若要取反和交换,轻 ... -
Java aio(异步网络IO)初探
2011-03-14 11:21 735按照《Unix网络编程 ... -
Drools 规则语言详解
2011-07-24 19:14 11631.概述: Drools 3采用 ... -
使用 Drools 规则引擎实现业务逻辑
2011-07-25 14:05 730使用声明性编程 ... -
详解Consistent Hashing算法
2011-09-27 21:30 717在做服务器负载均衡时候可供选择的负载均衡的算法有很多 ... -
jad ubunt安装
2011-09-29 10:39 866jad下载地址:http://www.varaneckas.c ... -
WebDAV简介
2011-10-16 13:45 1110WebDAV (Web-based Distributed ... -
条带化(Striping)
2011-10-19 09:48 1156条带化(Striping)是把连续的数据分 ... -
JCA和Web服务的讨论
2011-11-27 10:01 837前不久我去了趟东海岸,在那里我拜会了很多开发者、客户以及合 ...
相关推荐
**ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...
Spring集成ActiveMQ配置详解 Spring框架与ActiveMQ的集成,为开发者提供了一种高效、可靠的JMS消息处理机制。在企业级应用中,这种集成能够极大地提升系统的响应速度和容错能力,特别是在需要异步通信和分布式事务...
用于ACtiveMq 配置插件配置使用,配置介绍等,适合初学者
ActiveMQ路由配置方式 ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 ...
本主题主要探讨如何将ActiveMQ配置为使用MySQL 8作为其持久化存储方式,以及在这个过程中所需的Jar包。 1. **ActiveMQ与持久化**: - ActiveMQ允许用户选择不同的持久化机制,包括文件系统(KahaDB)和关系数据库...
Spring 集成 ActiveMQ 配置 Spring 集成 ActiveMQ 配置是指将 Spring 框架与 ActiveMQ 消息队列集成,以实现基于 JMS(Java Message Service)的消息传递。ActiveMQ 是 Apache 软件基金会的一个开源的消息队列系统...
**ActiveMQ配置详解** Apache ActiveMQ是开源的Java消息服务(JMS)提供商,它作为企业级消息中间件,能够帮助应用程序之间进行异步通信。本文将深入探讨ActiveMQ的配置,包括安装、基本配置、高级特性以及源码分析...
在本文中,我们将深入探讨ActiveMQ的配置及其入门知识。 一、ActiveMQ简介 ActiveMQ是Apache软件基金会的顶级项目,它的核心功能是作为消息代理,负责接收、存储和转发消息。它支持多种协议,如OpenWire、AMQP、...
本篇文章将详细阐述如何在Windows操作系统上安装和配置JDK、Tomcat以及ActiveMQ,这三个组件是开发和部署Java Web应用程序的基础。 首先,我们从JDK的安装与配置开始。JDK (Java Development Kit) 是开发和运行Java...
#### 一、ActiveMQ配置概览 ActiveMQ是一款非常流行的开源消息中间件,它基于Java开发,支持多种消息传递模式,如点对点(P2P)、发布/订阅(Pub/Sub)等。本文将详细介绍ActiveMQ的配置要点,包括Java内存调整、主从...
"ActiveMQ5.13 安装与配置" ActiveMQ 是 Apache 软件基金会提供的一个开源message broker,能够实现点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)模式的消息传递。ActiveMQ 5.13 是 ActiveMQ 的一个...
对于初学者,理解这些基本元素是掌握ActiveMQ配置的关键。配合提供的文档,如《activeMQ in Action.doc》和《ActiveMQ测试报告.pdf》,可以更深入地学习ActiveMQ的工作原理和最佳实践。对于与数据库的集成,如`...
本文将深入探讨如何在ActiveMQ中配置组合队列(也称为复制队列)以及实现负载均衡。 首先,让我们了解什么是组合队列。组合队列是一种特殊的队列,它的特点是消息不仅被存储在一个队列中,还会被复制到其他队列,...
本篇将深入讲解如何在Spring环境中配置和使用ActiveMQ。 首先,我们需要了解Spring与ActiveMQ集成的基本概念。Spring框架提供了一套完整的JMS(Java Message Service)支持,可以方便地与各种消息队列进行整合,...