一. 需求
一处产生消息,两处消费,故采取Topic模式的activemq.
二. 下载及安装
http://activemq.apache.org/下载最新版本。
解压, 进入安装目录 bin/activemq start xbean:conf/activemq.xml
http://localhost:8161/admin查看mq状态,用户名和密码在conf/jetty-realm.properties,默认是admin, admin
至此server已启动。
三. server配置
conf/activemq.xml
1. message cursors,
引用
介绍的比较详细,大部分情况下用默认的Store-based cursor就能满足需求
2. producer-flow-control
http://activemq.apache.org/producer-flow-control.html
生产者流量控制,当内存和硬盘空间到达的时候控制流量,抛出异常或挂起等待,个人觉得在有message cursors的时候可以禁用
3. slow consumer handling
对于nondurable 的consumer,要求broker未其保存未消费的message在内存中,会导致生产者阻塞,其他的快速消费者变慢,所以可以设置一个限制大小的消息队列。
constantPendingMessageLimitStrategy
4. jmx enable
未研究todo
四. ps
about durable topics, non-durable模式下,如果订阅者不在线,那么其不在线期间topic上收到的消息将不会被该订阅者收到,如果想要一个订阅者收到topic上的所有消息,请使用durable模式, 推荐使用durable模式,可以免去生产者流量控制和慢消费者处理的麻烦
about transaction mode, server 有transactionstore, 缓存所有的message和acks不执行,直到commit 和 rollback到达,另外也不把commit 和 rollback之前的message和acks放到分发的队列上去
五. durable subscriber样例代码
public class AMQConnectionFactory {
private String url;
private String user;
private String pwd;
private ActiveMQConnectionFactory activeMQConnectionFactory;
public AMQConnectionFactory(String user, String pwd, String url){
this.url = url;
this.user = user;
this.pwd = pwd;
this.activeMQConnectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
}
public Connection getConnection(){
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.start();
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
return connection;
}
public Connection getDurableConnection(String clientId){
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.setClientID(clientId);
connection.start();
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
return connection;
}
public Session getACKSession(){
Session session = null;
Connection connection = getConnection();
if(connection != null){
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
return session;
}
public Session getDurableACKSession(String clientId){
Session session = null;
Connection connection = getDurableConnection(clientId);
if(connection != null){
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
return session;
}
}
producer:
public class TestProducer {
public void run() {
ApplicationContext ctx=new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"});
AMQConnectionFactory amqConnectionFactory = (AMQConnectionFactory) ctx.getBean("amqConnectionFactory");
Session session = amqConnectionFactory.getACKSession();
try{
Topic topic = session.createTopic("inc.message");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
produce(producer);
}catch (JMSException jm){
jm.printStackTrace();
}
System.exit(0);
}
public void produce(MessageProducer producer) {
Message message = new ActiveMQMapMessage();
try{
message.setStringProperty("message", "hello");
producer.send(message);
int i = 0;
while(true){
i++;
message.setStringProperty("message", "hello" +i);
producer.send(message);
System.out.println("produce message: " + message.getStringProperty("message"));
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}catch (JMSException exception){
exception.printStackTrace();
}
}
}
consumer
public class TestConsumer {
public void run(String clientId) {
ApplicationContext ctx=new ClassPathXmlApplicationContext(new String[]{"applicationContext.xml"});
AMQConnectionFactory amqConnectionFactory = (AMQConnectionFactory) ctx.getBean("amqConnectionFactory");
Session session2 = amqConnectionFactory.getDurableACKSession(clientId);
try{
Destination destination = session2.createTopic("inc.message");
IncConsumerListener incConsumerListener = (IncConsumerListener)
ctx.getBean("incConsumerListener");
MessageConsumer consumer = session2.createDurableSubscriber((Topic) destination, "bbbb");
consumer.setMessageListener(incConsumerListener);
}catch (JMSException jmse){
jmse.printStackTrace();
}
}
public static void main (String args[]){
TestConsumer testConsumer = new TestConsumer();
testConsumer.run("iiii");
}
}
分享到:
相关推荐
在ActiveMQ实战demo中,我们通常会经历以下几个步骤: 1. **安装与配置ActiveMQ**:下载并解压ActiveMQ的发布包,配置相应的环境变量,启动ActiveMQ服务器。启动后,可以通过Web管理界面监控和管理消息队列。 2. *...
在使用"apache-activemq-5.17.0"时,用户通常会经历以下步骤: 1. **下载与解压**:从Apache官网下载最新版本的压缩包,解压到指定目录。 2. **配置**:根据实际需求,修改activemq配置文件(如`conf/activemq.xml...
第二个项目是健康猫网上商城,这个B2C平台项目涉及SSM(Spring、SpringMVC、MyBatis)架构,还使用了Dubbo、Solr、ActiveMQ、Redis、Nginx、MyCat等技术。Solr用于全文搜索,ActiveMQ处理消息队列,Redis作为缓存...
5. 消息队列知识:袁先生熟悉ActiveMQ消息队列的使用,了解如何使用消息队列来实现异步通信和解耦合。 6. Linux系统知识:袁先生了解Linux系统的基本知识,包括文件系统、进程管理、网络管理等。 7. 项目管理知识...
了解SOA的思想和实现,了解基于Dubbo+ZooKeeper实现表现层与业务层解耦,了解使用ActiveMQ进行分布式系统间的异步通信。了解SolrCloud及其客户端Solrj的使用,了解Redis集群及其客户端Jedis的使用。了解nginx服务器...
此外,对JavaScript、jQuery等前端技术的掌握,以及对Tomcat服务器、Linux系统、Dubbo、WebService、Solr、ActiveMQ、Nginx等中间件和系统的了解和应用,都反映了求职者的全面技术能力。 在【项目经历】中,应列举...
在项目经历上,他参与了双控管理项目的开发,这是一个基于Spring Boot+jpa框架的系统,涉及风险管控和隐患排查,他负责需求变更、数据库设计、bug修复等工作,并使用各种技术如activeMQ、Angular、Shiro和Quartz等...
项目使用了Struts2、Hibernate、Spring、jQuery、easyUI、Apache Shiro、poi、ActiveMQ、MySQL、Tomcat和SVN。在这个项目中,他运用了Easy UI的layout进行页面布局,使用Activiti工作流框架简化流程控制,通过Ztree...
* 熟练使用ActiveMQ、RabbitMQ等消息中间件 * 熟练使用MySQL关系型数据库,了解Oracle数据库,并具有MySQL优化能力 * 熟练使用缓存Redis,了解Memcache、Mongodb等其他非关系型数据库 * 掌握Nginx、Elastic...
- 熟悉Lucene/Solr全文检索技术,能够实现高效的搜索功能,还掌握了ActiveMQ消息中间件,可实现异步通信。 - 掌握FreeMarker模板引擎,用于动态生成HTML页面,同时了解非关系型数据库Redis和Oracle数据库,具有...
在消息中间件方面,张三了解ActiveMQ,并能利用它实现分布式系统的异步通信。 工作经历方面,张三曾在河南蓝通实业有限公司担任JavaEE软件开发工程师,负责用户需求分析、功能模块编码实现、程序调试和问题解决。而...
1. 广创挖机配件开发项目:一个B2C的在线平台服务项目,使用了SpringMVC、Spring、MyBatis等开源框架进行开发。刘某某主要负责后台商品管理、前台首页、登录、搜索、购物车等模块。 2. 达远商贸信息化平台开发项目:...
- **消息中间件**:ActiveMQ、Kafka、RocketMQ的熟练使用,表明了在分布式系统中处理异步通信和高并发场景的经验。 - **微服务架构**:精通SpringBoot、SpringCloud、Dubbo和Zookeeper,展示了在构建微服务架构...
2. 参与的项目包括尚乐购网上商城,使用的技术栈有dubbo、zookeeper、ActiveMQ、SpringMVC、Spring和Mybatis。他在项目中负责后台管理、商城首页、个人中心、购物车等模块的开发,如商品管理、订单处理、类目管理、...
从郑瑞平的工作经历可以看出,她熟练掌握Java相关的开发技术,包括但不限于SSM(Spring、SpringMVC、MyBatis)框架、分布式服务架构、缓存技术(Redis)、消息中间件(ActiveMQ)、搜索引擎(Solr)、文件存储...
10. **企业级中间件与第三方库**:了解和使用Nginx反向代理服务器,消息队列ActiveMQ,SOAP和RESTful服务框架CXF,模板引擎Thymeleaf和FreeMarker,权限管理Shiro,消息中间件Kafka,JSON处理库FastJson,全文搜索...
他的工作经历包括在深圳智联汽车产业集团和深圳市紫川软件有限公司担任Java开发工程师,参与了卡尔汽车推广APP和优品购物商城的开发。在这些项目中,他应用了各种技术和框架,如SpringBoot、MyBatis、Maven、Git、...