- 浏览: 198027 次
- 性别:
- 来自: xxx
文章分类
最新评论
-
天使建站:
只有代码,不能测试,还是结合这里的一起看吧,里面有具 ...
Jquery 遍历json和数组 -
一说书先生:
使用apache ActiveMQ深入企业级程序设计网盘地址: ...
Apache ActiveMQ -
360pluse:
使用apache ActiveMQ深入企业级程序设计网盘地址: ...
Apache ActiveMQ -
July01:
推荐用StratoIO打印控件,支持网页、URL、图片、PD、 ...
WEB页面打印--打印指定区域,页面预览,页面设置 -
a455642158:
拿走……感谢!!!
Java二维数组的声明和初始化
转载:http://blog.sina.com.cn/s/blog_649951290100gmt1.html?retcode=0
一.简介ActiveMQ
ActiveMQ 是最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
二.下载ActiveMQ
首先去http://activemq.apache.org/download.html 下载稳定版本4.1.0release
Download Here
Description ==Download Link ==PGP Signature file of download
Binary for Windows== apache-activemq-4.1.0-incubator.zip== incubator-activemq-4.1.0.zip.asc
Binary-for-Unix/Linux/Cygwin==apache-activemq-4.1.0-incubator.tar.gz==incubator-activemq-4.1.0.tar.gz.as
解压后目录如下:
+bin (windows下面的bat和unix/linux下面的sh)
+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
+data (默认是空的)
+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)
+example (几个例子
+lib (activemMQ使用到的lib)
-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)
-LICENSE.txt
-NOTICE.txt
-README.txt
-user-guide.html
三.启动ActiveMQ
可以使用bin\activemq.bat(activemq) 启动,如果一切顺利,你就会看见类似下面的信息(此处解压到D盘根目录下).几个小提示
1. 这个仅仅是最基础的ActiveMQ的配置,很多地方都没有配置因此不要直接使用这个配置用于生产系统
2. 有的时候由于端口被占用,导致ActiveMQ错误,ActiveMQ可能需要以下端口1099(JMX),61616(默认的TransportConnector)
3. 如果没有物理网卡,或者MS的LoopBackAdpater Multicast会报一个错误
四.监控ActiveMQ
启动JDK自带的java控制台查看程序查看客户端(如C:\jdk1.6.0_07\bin\jconsole.exe)
远程进程:127.0.0.1:1099
五. 测试ActiveMQ
由于ActiveMQ是一个独立的jms provider,所以我们不需要其他任何第三方服务器就可以马上做我们的测试了.编译example目录下面的程序ProducerTool/ConsumerTool 是JMS参考里面提到的典型应用,Producer产生消息,Consumer消费消息,而且这个例子还可以加入参数帮助你测试刚才启动的本地ActiveMQ或者是远程的ActiveMQ
ProducerTool [url] broker的地址,默认的是tcp://localhost:61616
[true|flase] 是否使用topic,默认是false
[subject] subject的名字,默认是TOOL.DEFAULT
[durabl] 是否持久化消息,默认是false
[messagecount] 发送消息数量,默认是10
[messagesize] 消息长度,默认是255
[clientID] durable为true的时候,需要配置clientID
[timeToLive] 消息存活时间
[sleepTime] 发送消息中间的休眠时间
[transacte] 是否采用事务
ConsumerTool [url] broker的地址,默认的是tcp://localhost:61616
[true|flase] 是否使用topic,默认是false
[subject] subject的名字,默认是TOOL.DEFAULT
[durabl] 是否持久化消息,默认是false
[maxiumMessages] 接受最大消息数量,0表示不限制
[clientID] durable为true的时候,需要配置clientID
[transacte] 是否采用事务
[sleepTime] 接受消息中间的休眠时间,默认是0,onMeesage方法不休眠
[receiveTimeOut] 接受超时
六.使用应用程序发送点到点消息队列
TestSender类
package test;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class TestSender {
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("conf/applicationContext.xml");
JmsTemplate template = (JmsTemplate) ctx.getBean("myJmsTemplate");
template.setDeliveryMode(DeliveryMode.PERSISTENT);
template.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message = session.createTextMessage();
message.setStringProperty("name", "wangwu");
message.setStringProperty("password", "ww");
System.out.println("send success");
return message;
}
});
}
}
applicationContext.xml配置文件
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
</property>
</bean>
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="defaultDestinationName" value="Hello.Queue" />
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
</bean>
</beans>
七. 使用应用程序接受点到点消息队列
ExampleListener类
package test;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
public class ExampleListener implements SessionAwareMessageListener {
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof TextMessage) {
System.out.println("TextMessage begin");
System.out.println("name = " + message.getStringProperty("name"));
System.out.println("password = " +message.getStringProperty("password"));
}
if (message instanceof ObjectMessage) {
System.out.println("ObjectMessage");
} else if (message instanceof TextMessage) {
System.out.println("TextMessage");
} else if (message instanceof StreamMessage) {
System.out.println("StreamMessage");
} else if (message instanceof MapMessage) {
System.out.println("MapMessage");
}
}
}
TestReceiver类
package test;
import javax.jms.JMSException;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class TestReceiver {
public static void main(String[] args) throws JMSException {
new FileSystemXmlApplicationContext("conf/context.xml");
}
}
context.xml配置文件.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">
<bean id="ExampleListener" class="test.ExampleListener" />
<bean id="destinationa" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>Hello.Queue</value>
</constructor-arg>
</bean>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
</property>
</bean>
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="destinationa" />
<property name="messageListener" ref="ExampleListener" />
</bean>
</beans>
八. 使用WEB程序发送点到点消息队列
TestWebSender类
package com.jl.material.adapter.impl;
import javax.jms.JMSException;
import com.jl.framework.jms.JmsManager;
import com.jl.framework.jms.JmsManagerFactory;
import com.jl.framework.jms.MessageCreateable;
import com.jl.framework.jms.MessageCreatorDefault;
import com.jl.framework.jms.util.JmsUtil;
import com.jl.material.util.MaterialConstants;
public class TestWebSender{
public void sender(String name, String password) {
MessageCreateable mc = new MessageCreatorDefault();
mc.setStringProperty("name", name);
mc.setStringProperty("password", password);
JmsManagerFactory jmsManagerFactory = new JmsManagerFactory();
JmsManager jmsTXManager = jmsManagerFactory.createJmsManager(MaterialConstants.MATERIAL_MODULE_NAME);
try {
jmsTXManager.send(JmsUtil.getDestinationFromConfig("quality_synVerifyBatch_queue_M2Q"), mc);
jmsTXManager.commit();
} catch (JMSException e) {
jmsTXManager.rollback();
throw new RuntimeException(e);
}
}
}
九. 使用WEB程序接受点到点消息队列
TestWebReceiver类
package com.jl.material.adapter.impl;
import javax.jms.MapMessage;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import com.jl.framework.jms.util.support.JMSCallbackable;
public class TestWebReceiver implements JMSCallbackable {
public void mdCallback(Object TextMessage_textMessage) {}
public void logJMSMessageInfo(String arg0) {}
public void mdCallback(ObjectMessage arg0) throws Exception {}
public void mdCallback(TextMessage textMessage) throws Exception {
String name = textMessage.getStringProperty("name");
String password = textMessage.getStringProperty("password");
System.out.println("name = " + name);
System.out.println("password = " + password);
}
public void mdCallback(StreamMessage arg0) throws Exception {}
public void mdCallback(MapMessage arg0) throws Exception {}
}
test_JMS_Spring_Listener.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<bean id="test_testWebReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
<property name="jmsCallbackable">
<bean class="com.jl.material.adapter.impl.TestWebReceiver "/>
</property>
</bean>
<bean id="test_testWebReceiver _queue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${quality_assayVerifyBatch_queue_Q2M}</value>
</constructor-arg>
</bean>
<bean id="test_listenerContainerA"
class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsRecieveFactory" />
<property name="destination" ref=" test_testWebReceiver _queue ">
<property name="messageListener" ref=" test_testWebReceiver " />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
</bean>
</beans>
jms.properties资源文件
jms.sendip=tcp://10.1.1.40:61616,tcp://activemq:61616
jms.listenip=tcp://10.1.1.40:61616,tcp://activemq:61616
quality_assayVerifyBatch_queue_Q2M=
Quality_Assay_VerifyBatch_Q2M.Queue
context.xml配置文件
<?xml version='1.0' encoding='utf-8'?>
<Context path="/identity" reloadable="false">
<Environment name="jms/jms.sendip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />
<Environment name="jms/jms.listenip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />
</Context>
十. 使用发布/订阅方式
使用该方式的发布方基本与点到点方式一样,区别只在队列名的后缀从 .Queue 变成了 .Topic
区别主要在接收方配置文件
<bean id="pound_removeBindingInfoReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
<property name="jmsCallbackable">
<bean class="com.jl.pound.adapter.impl.RemoveBindingInfoReceiver">
<property name="initialInfomationService" ref="initialInfomationService" />
<property name="productPoundServiceFacade" ref="productPoundServiceFacade" />
</bean>
</property>
</bean>
<bean id="pound_removeBindingInfomation_topic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg><value>${CraftPound_RemoveBindingInfomation}</value></constructor-arg>
</bean>
<bean id="pound_listenerContainerB"
class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
<property name="concurrentConsumers" value="1"/>
<property name="connectionFactory" ref="jmsRecieveFactory" />
<property name="destination" ref="pound_removeBindingInfomation_topic" />
<property name="messageListener" ref="pound_removeBindingInfoReceiver" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
<property name="subscriptionDurable" value="true"></property>
<property name="pubSubDomain" value="true"></property>
<!-- <property name="clientId" value="100554"></property> -->
<property name="clientId" ref="generateClientIdentityCode1"></property>
</bean>
<bean id="generateClientIdentityCode1"
class="com.jl.pound.security.GenerateClientIdentityCode">
<property name="prefix" value="1"></property>
</bean>
import org.apache.log4j.Logger;
import org.springframework.beans.factory.FactoryBean;
import com.**.util.NetworkUtils;
public class GenerateClientIdentityCode implements FactoryBean {
private static final Logger LOGGER = Logger.getLogger(GenerateClientIdentityCode.class);
private String prefix;
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public String generateIdentityCode() {
String mac = NetworkUtils.getMACAddress();
mac = prefix + mac;
LOGGER.info("this client identity code is:" + mac);
return mac;
}
public Object getObject() throws Exception {
return generateIdentityCode();
}
public Class getObjectType() {
return String.class;
}
public boolean isSingleton() {
return false;
}
}
在JMS中,Topic实现publish和subscribe语义。一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
JMS Queue执行load balancer语义。一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡
可以使用queue方式发送注册邮件 好友动态数据等
<一>表说明:
当在启动ActiveMQ时,先判断表是否存在,如果不存在,将去创建表,如下:
(1)ACTIVEMQ_ACKS:持久订阅者列表
1.CONTAINER:类型://主题
如:topic://basicInfo.topic
2.SUB_DEST:应该是描述,与1内容相同
3.CLIENT_ID:持久订阅者的标志ID,必须唯一
4.SUB_NAME:持久订阅者的名称.(durableSubscriptionName)
5.SELECTOR:消息选择器,consumer可以选择自己想要的
6.LAST_ACKED_ID:最后一次确认ID,这个字段存的该该订阅者最后一次收到的消息的ID
(2)ACTIVEMQ_LOCK:进行数据访问的排斥锁
1.ID:值为1
2.TIME:时间
3.BROKER_NAME:broker的名称
这个表似为集群使用,但现在ActiveMQ并不能共享数据库.
(3)ACTIVEMQ_MSGS:存储Queue和Topic消息的表
1.ID:消息的ID
2.CONTAINER: 类型://主题
如:queue://my.queue
Topic://basicInfo.topic
3.MSGID_PROD:发送消息者的标志
MSGID_PROD =ID:[computerName][…..]
注意computerName,不要使用中文,消息对象中会存储这个部分,解析connectID时会出现Bad String错误.
4.MSGID_SEQ:还不知用处
5.EXPIRATION:到期时间.
6.MSG:消息本身,Blob类型.
可以在JmsTemplate发送配置中,加上<property name=”timeToLive” value=”432000000”/>,5天的生命期,如果消息一直没有被处理,消息会被删除,但是表中会存在CONTAINER为queue://ActiveMQ.DLQ的记录.也就是说,相当于将过期的消息发给了一个ActiveMQ自定义的删除队列..
<二>关于ActiveMQ的持久订阅消息删除操作
1.主题消息只有一条,所有订阅了这个消息的持久订阅者都要收到消息,只有所有订阅者收到消息并确认(Acknowledge)之后.才会删除.
说明:ActiveMQ支持批量(optimizeAcknowledge为true)确认,以提高性能
2.ActiveMQ执行删除Topic消息的cleanup()操作的时间间隔为5 minutes..
一.简介ActiveMQ
ActiveMQ 是最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
二.下载ActiveMQ
首先去http://activemq.apache.org/download.html 下载稳定版本4.1.0release
Download Here
Description ==Download Link ==PGP Signature file of download
Binary for Windows== apache-activemq-4.1.0-incubator.zip== incubator-activemq-4.1.0.zip.asc
Binary-for-Unix/Linux/Cygwin==apache-activemq-4.1.0-incubator.tar.gz==incubator-activemq-4.1.0.tar.gz.as
解压后目录如下:
+bin (windows下面的bat和unix/linux下面的sh)
+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
+data (默认是空的)
+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)
+example (几个例子
+lib (activemMQ使用到的lib)
-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)
-LICENSE.txt
-NOTICE.txt
-README.txt
-user-guide.html
三.启动ActiveMQ
可以使用bin\activemq.bat(activemq) 启动,如果一切顺利,你就会看见类似下面的信息(此处解压到D盘根目录下).几个小提示
1. 这个仅仅是最基础的ActiveMQ的配置,很多地方都没有配置因此不要直接使用这个配置用于生产系统
2. 有的时候由于端口被占用,导致ActiveMQ错误,ActiveMQ可能需要以下端口1099(JMX),61616(默认的TransportConnector)
3. 如果没有物理网卡,或者MS的LoopBackAdpater Multicast会报一个错误
四.监控ActiveMQ
启动JDK自带的java控制台查看程序查看客户端(如C:\jdk1.6.0_07\bin\jconsole.exe)
远程进程:127.0.0.1:1099
五. 测试ActiveMQ
由于ActiveMQ是一个独立的jms provider,所以我们不需要其他任何第三方服务器就可以马上做我们的测试了.编译example目录下面的程序ProducerTool/ConsumerTool 是JMS参考里面提到的典型应用,Producer产生消息,Consumer消费消息,而且这个例子还可以加入参数帮助你测试刚才启动的本地ActiveMQ或者是远程的ActiveMQ
ProducerTool [url] broker的地址,默认的是tcp://localhost:61616
[true|flase] 是否使用topic,默认是false
[subject] subject的名字,默认是TOOL.DEFAULT
[durabl] 是否持久化消息,默认是false
[messagecount] 发送消息数量,默认是10
[messagesize] 消息长度,默认是255
[clientID] durable为true的时候,需要配置clientID
[timeToLive] 消息存活时间
[sleepTime] 发送消息中间的休眠时间
[transacte] 是否采用事务
ConsumerTool [url] broker的地址,默认的是tcp://localhost:61616
[true|flase] 是否使用topic,默认是false
[subject] subject的名字,默认是TOOL.DEFAULT
[durabl] 是否持久化消息,默认是false
[maxiumMessages] 接受最大消息数量,0表示不限制
[clientID] durable为true的时候,需要配置clientID
[transacte] 是否采用事务
[sleepTime] 接受消息中间的休眠时间,默认是0,onMeesage方法不休眠
[receiveTimeOut] 接受超时
六.使用应用程序发送点到点消息队列
TestSender类
package test;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class TestSender {
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("conf/applicationContext.xml");
JmsTemplate template = (JmsTemplate) ctx.getBean("myJmsTemplate");
template.setDeliveryMode(DeliveryMode.PERSISTENT);
template.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message = session.createTextMessage();
message.setStringProperty("name", "wangwu");
message.setStringProperty("password", "ww");
System.out.println("send success");
return message;
}
});
}
}
applicationContext.xml配置文件
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
</property>
</bean>
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="defaultDestinationName" value="Hello.Queue" />
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
</bean>
</beans>
七. 使用应用程序接受点到点消息队列
ExampleListener类
package test;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
public class ExampleListener implements SessionAwareMessageListener {
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof TextMessage) {
System.out.println("TextMessage begin");
System.out.println("name = " + message.getStringProperty("name"));
System.out.println("password = " +message.getStringProperty("password"));
}
if (message instanceof ObjectMessage) {
System.out.println("ObjectMessage");
} else if (message instanceof TextMessage) {
System.out.println("TextMessage");
} else if (message instanceof StreamMessage) {
System.out.println("StreamMessage");
} else if (message instanceof MapMessage) {
System.out.println("MapMessage");
}
}
}
TestReceiver类
package test;
import javax.jms.JMSException;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class TestReceiver {
public static void main(String[] args) throws JMSException {
new FileSystemXmlApplicationContext("conf/context.xml");
}
}
context.xml配置文件.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">
<bean id="ExampleListener" class="test.ExampleListener" />
<bean id="destinationa" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>Hello.Queue</value>
</constructor-arg>
</bean>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
</property>
</bean>
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="destinationa" />
<property name="messageListener" ref="ExampleListener" />
</bean>
</beans>
八. 使用WEB程序发送点到点消息队列
TestWebSender类
package com.jl.material.adapter.impl;
import javax.jms.JMSException;
import com.jl.framework.jms.JmsManager;
import com.jl.framework.jms.JmsManagerFactory;
import com.jl.framework.jms.MessageCreateable;
import com.jl.framework.jms.MessageCreatorDefault;
import com.jl.framework.jms.util.JmsUtil;
import com.jl.material.util.MaterialConstants;
public class TestWebSender{
public void sender(String name, String password) {
MessageCreateable mc = new MessageCreatorDefault();
mc.setStringProperty("name", name);
mc.setStringProperty("password", password);
JmsManagerFactory jmsManagerFactory = new JmsManagerFactory();
JmsManager jmsTXManager = jmsManagerFactory.createJmsManager(MaterialConstants.MATERIAL_MODULE_NAME);
try {
jmsTXManager.send(JmsUtil.getDestinationFromConfig("quality_synVerifyBatch_queue_M2Q"), mc);
jmsTXManager.commit();
} catch (JMSException e) {
jmsTXManager.rollback();
throw new RuntimeException(e);
}
}
}
九. 使用WEB程序接受点到点消息队列
TestWebReceiver类
package com.jl.material.adapter.impl;
import javax.jms.MapMessage;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import com.jl.framework.jms.util.support.JMSCallbackable;
public class TestWebReceiver implements JMSCallbackable {
public void mdCallback(Object TextMessage_textMessage) {}
public void logJMSMessageInfo(String arg0) {}
public void mdCallback(ObjectMessage arg0) throws Exception {}
public void mdCallback(TextMessage textMessage) throws Exception {
String name = textMessage.getStringProperty("name");
String password = textMessage.getStringProperty("password");
System.out.println("name = " + name);
System.out.println("password = " + password);
}
public void mdCallback(StreamMessage arg0) throws Exception {}
public void mdCallback(MapMessage arg0) throws Exception {}
}
test_JMS_Spring_Listener.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<bean id="test_testWebReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
<property name="jmsCallbackable">
<bean class="com.jl.material.adapter.impl.TestWebReceiver "/>
</property>
</bean>
<bean id="test_testWebReceiver _queue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>${quality_assayVerifyBatch_queue_Q2M}</value>
</constructor-arg>
</bean>
<bean id="test_listenerContainerA"
class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="jmsRecieveFactory" />
<property name="destination" ref=" test_testWebReceiver _queue ">
<property name="messageListener" ref=" test_testWebReceiver " />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
</bean>
</beans>
jms.properties资源文件
jms.sendip=tcp://10.1.1.40:61616,tcp://activemq:61616
jms.listenip=tcp://10.1.1.40:61616,tcp://activemq:61616
quality_assayVerifyBatch_queue_Q2M=
Quality_Assay_VerifyBatch_Q2M.Queue
context.xml配置文件
<?xml version='1.0' encoding='utf-8'?>
<Context path="/identity" reloadable="false">
<Environment name="jms/jms.sendip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />
<Environment name="jms/jms.listenip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />
</Context>
十. 使用发布/订阅方式
使用该方式的发布方基本与点到点方式一样,区别只在队列名的后缀从 .Queue 变成了 .Topic
区别主要在接收方配置文件
<bean id="pound_removeBindingInfoReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
<property name="jmsCallbackable">
<bean class="com.jl.pound.adapter.impl.RemoveBindingInfoReceiver">
<property name="initialInfomationService" ref="initialInfomationService" />
<property name="productPoundServiceFacade" ref="productPoundServiceFacade" />
</bean>
</property>
</bean>
<bean id="pound_removeBindingInfomation_topic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg><value>${CraftPound_RemoveBindingInfomation}</value></constructor-arg>
</bean>
<bean id="pound_listenerContainerB"
class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
<property name="concurrentConsumers" value="1"/>
<property name="connectionFactory" ref="jmsRecieveFactory" />
<property name="destination" ref="pound_removeBindingInfomation_topic" />
<property name="messageListener" ref="pound_removeBindingInfoReceiver" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
<property name="subscriptionDurable" value="true"></property>
<property name="pubSubDomain" value="true"></property>
<!-- <property name="clientId" value="100554"></property> -->
<property name="clientId" ref="generateClientIdentityCode1"></property>
</bean>
<bean id="generateClientIdentityCode1"
class="com.jl.pound.security.GenerateClientIdentityCode">
<property name="prefix" value="1"></property>
</bean>
import org.apache.log4j.Logger;
import org.springframework.beans.factory.FactoryBean;
import com.**.util.NetworkUtils;
public class GenerateClientIdentityCode implements FactoryBean {
private static final Logger LOGGER = Logger.getLogger(GenerateClientIdentityCode.class);
private String prefix;
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public String generateIdentityCode() {
String mac = NetworkUtils.getMACAddress();
mac = prefix + mac;
LOGGER.info("this client identity code is:" + mac);
return mac;
}
public Object getObject() throws Exception {
return generateIdentityCode();
}
public Class getObjectType() {
return String.class;
}
public boolean isSingleton() {
return false;
}
}
在JMS中,Topic实现publish和subscribe语义。一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
JMS Queue执行load balancer语义。一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡
可以使用queue方式发送注册邮件 好友动态数据等
<一>表说明:
当在启动ActiveMQ时,先判断表是否存在,如果不存在,将去创建表,如下:
(1)ACTIVEMQ_ACKS:持久订阅者列表
1.CONTAINER:类型://主题
如:topic://basicInfo.topic
2.SUB_DEST:应该是描述,与1内容相同
3.CLIENT_ID:持久订阅者的标志ID,必须唯一
4.SUB_NAME:持久订阅者的名称.(durableSubscriptionName)
5.SELECTOR:消息选择器,consumer可以选择自己想要的
6.LAST_ACKED_ID:最后一次确认ID,这个字段存的该该订阅者最后一次收到的消息的ID
(2)ACTIVEMQ_LOCK:进行数据访问的排斥锁
1.ID:值为1
2.TIME:时间
3.BROKER_NAME:broker的名称
这个表似为集群使用,但现在ActiveMQ并不能共享数据库.
(3)ACTIVEMQ_MSGS:存储Queue和Topic消息的表
1.ID:消息的ID
2.CONTAINER: 类型://主题
如:queue://my.queue
Topic://basicInfo.topic
3.MSGID_PROD:发送消息者的标志
MSGID_PROD =ID:[computerName][…..]
注意computerName,不要使用中文,消息对象中会存储这个部分,解析connectID时会出现Bad String错误.
4.MSGID_SEQ:还不知用处
5.EXPIRATION:到期时间.
6.MSG:消息本身,Blob类型.
可以在JmsTemplate发送配置中,加上<property name=”timeToLive” value=”432000000”/>,5天的生命期,如果消息一直没有被处理,消息会被删除,但是表中会存在CONTAINER为queue://ActiveMQ.DLQ的记录.也就是说,相当于将过期的消息发给了一个ActiveMQ自定义的删除队列..
<二>关于ActiveMQ的持久订阅消息删除操作
1.主题消息只有一条,所有订阅了这个消息的持久订阅者都要收到消息,只有所有订阅者收到消息并确认(Acknowledge)之后.才会删除.
说明:ActiveMQ支持批量(optimizeAcknowledge为true)确认,以提高性能
2.ActiveMQ执行删除Topic消息的cleanup()操作的时间间隔为5 minutes..
评论
2 楼
一说书先生
2018-02-20
使用apache ActiveMQ深入企业级程序设计
网盘地址:https://pan.baidu.com/s/1i7aQTaX 密码: jkp8
备用地址(腾讯微云):http://url.cn/5AVIkeo 密码:SG7vFD
网盘地址:https://pan.baidu.com/s/1i7aQTaX 密码: jkp8
备用地址(腾讯微云):http://url.cn/5AVIkeo 密码:SG7vFD
1 楼
360pluse
2018-01-22
使用apache ActiveMQ深入企业级程序设计
网盘地址:https://pan.baidu.com/s/1cOBDvC 密码: m34p
网盘地址:https://pan.baidu.com/s/1cOBDvC 密码: m34p
相关推荐
Apache ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它基于Java消息服务(JMS)规范,用于在分布式系统中实现可靠的消息传递。本指南主要关注如何使用ActiveMQ开发消息应用,通过`Instant Apache ActiveMQ...
Apache ActiveMQ是业界广泛使用的开源消息中间件,尤其在Linux环境下表现出色。它基于Java语言开发,遵循Apache软件基金会的许可证,并且实现了多种消息传递协议,包括OpenWire、STOMP、AMQP和XMPP等。在Linux系统上...
Apache ActiveMQ是业界广泛使用的开源消息中间件,它基于Java消息服务(JMS)标准,提供了高度可扩展、可靠的异步通信能力。标题"apache-activemq-5.16.5"指的是该软件的一个特定版本,即5.16.5版本,通常每个新版本...
Apache ActiveMQ 是一款开源的消息中间件,它是Apache软件基金会下的顶级项目,被广泛应用于构建分布式系统中的消息传递。在本文中,我们将深入探讨如何通过Apache ActiveMQ 5.8版本进行入门,以及如何构建一个简单...
该项目为Apache ActiveMQ高性能消息中间件设计源码,采用Java语言编写,总计包含5357个文件,其中Java源文件4383个,XML配置文件340个,属性文件82个,HTML文件77个,JavaScript文件35个,Markdown文件31个,PNG图片...
Apache ActiveMQ是开源的、基于Java消息服务(JMS)的应用服务器,它是Apache软件基金会的一部分。这个名为"apache-activemq-5.17.3"的压缩包包含了ActiveMQ的5.17.3版本,这是一个稳定且功能丰富的发布版本。在深入...
Apache ActiveMQ 是一款高度可扩展且功能强大的消息中间件,它是Apache软件基金会的一部分,完全开源且免费。Apache ActiveMQ 在Java消息服务(JMS)领域扮演着重要角色,为分布式系统提供可靠的消息传递机制。本...
Apache ActiveMQ JMS实现
Apache ActiveMQ Artemis是Apache软件基金会下的一款高性能,可伸缩的消息中间件。它支持JMS 1.1和AMQP 1.0等多种协议,适用于构建基于消息的可靠应用程序。 首先,文档的标题"Apache ActiveMQ Artemis.pdf"指出了...
### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...
Apache ActiveMQ技术讲解文档
Apache ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它基于Java Message Service (JMS) 规范,提供高效、可靠的消息传递服务。在本文中,我们将深入探讨Apache ActiveMQ,特别是针对“apache-activemq-...
### Apache ActiveMQ与JMS整合Tomcat:深入解析与实践 #### 一、Apache ActiveMQ:强大而灵活的开源消息中间件 Apache ActiveMQ作为一款成熟的开源消息中间件,不仅遵循了JMS 1.1规范,还兼容J2EE 1.4以上的标准,...
Apache ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它实现了多种消息协议,如OpenWire、STOMP、AMQP和MQTT,使得不同系统间的数据交换变得简单。在这个"apache activeMQ之初体验(helloworld)"中,我们将...
Apache ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它基于Java Message Service(JMS)规范,用于在分布式系统中高效地处理、路由和传递消息。这个`( apache-activemq-5.13.0-bin.zip )`压缩包包含了...
Apache ActiveMQ是世界上最流行的开源消息代理和队列服务器,它基于Java Message Service(JMS)规范,为分布式系统提供高效、可靠和可扩展的消息传递功能。这个“apache-activemq-5.9.0-bin”压缩包包含了Apache ...
Apache ActiveMQ是世界上最流行的开源消息代理和集成模式服务器,它基于Java Message Service (JMS)规范,提供了一种高效、灵活、可靠的消息传递解决方案。在"apache-activemq-5.13.0-bin.tar.gz"这个压缩包中,包含...
Apache ActiveMQ是Apache软件基金会的一款开源消息中间件,它遵循开放消息传递协议(Open Message Broker,JMS)标准,提供高效、可靠的消息传递服务。在本教程中,我们将深入探讨Apache ActiveMQ 5.11.2版本的核心...
activemq, Apache ActiveMQ镜像 欢迎来到 Apache ActiveMQis是一个高性能的Apache 2.0许可以消息代理和 JMS 1.1实现。正在启动要帮助你入门,请尝试以下链接:入门http://activemq.apache.org/version-