第十章 ActiveMq代理的属性在实际中的应用
章节导读
- 使用通配符和复合的目的地
- 利用advistory消息
- 理解虚拟的topic 以及追溯消费者
- 使用activeMq插件
- apache camel的介绍
10.1 通配符和复合的destination
下面会介绍使用通配符订阅多个目的地,使用复合目的地发布消息到多个目的地.
10.1.1 使用通配符从多个目的地中消费
activemq支持层级目的地,如果你有一个应用用来描述一项运动的最新结果,你可以使用 Sport.league.team来表示 .举例说明,描述Leed队伍在足球比赛中的的最新结果,可以这样表示:football.division1.leed.如果leed同时参与足球和橄榄球,方便起见,你想要看到Leed的所有比赛的所有结果,
以下三种通配符是很有用的:
- . -- 点,用于分割元素
- * -- 用于匹配一个元素
- > -- 用于匹配一个或所有的元素
描述leed队伍参加的所有项目的最新比分,你可以这样去表示:*.*.leed:
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic allLeeds = session.createTopic("*.*.Leeds"); MessageConsumer consumer = session.createConsumer(allLeeds); Message result = consumer.receive();
如果你想得到division1联盟中所有足球比赛的结果,你可以这样表示:football.division1.*,如果你想要得到所有橄榄球比赛的最新得分,你可以这样表示:rugby.>.
PS:通配符只对消费者起作用,如果你把消息发布到一个叫做rugby.>.的主题,消息只会被发送到rugby.>.而不是所有以rugby名字开头的主题.
10.1.2 发送消息到多个目的地
符合目的地使用一种名为"逗号分离"的类型的名字命名.例如:你创建一个名为store.order.backoffice,store.order.warehouse的队列,那么发送消息的时候就会同时发送到这两个队列.复合目的地支持队列和主题的组合.要么队列名是queue://,要么主题名是topic://.
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue ordersDestination = session.createQueue("store.orders, topic:// store.orders"); MessageProducer producer = session.createProducer(ordersDestination); Message order = session.createObjectMessage(); producer.send(order);
10.2 advisory消息
advisory消息是一种代理被改变的结果的通知类型的消息,Activemq内部也使用这种类型的消息,通知连接关于临时目的地的可用性以及通知网络消费者的可用性.每个advisory消息都有一种JMS类型的Advisory和预先定义好的JMS字符属性,以及代理的唯一标示:
- originBrokerId -- 生成advisory消息的代理的id
- originBrokerName -- 代理的名字
- originBrokerUrl -- 代理的第一个传输连接器的URL
public class Consumer { private static Logger logger = Logger.getLogger(Consumer.class); private static String brokerURL = "failover:(tcp://localhost:61617,tcp://localhost:61616)"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException, InterruptedException { Consumer consumer1 = new Consumer(); Topic connectionAdvisory = AdvisorySupport.CONNECTION_ADVISORY_TOPIC; MessageConsumer consumer = consumer1.getSession().createConsumer(connectionAdvisory); //监听broker的启动和停止 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { DataStructure data = (DataStructure) ((org.apache.activemq.command.Message) message).getDataStructure(); if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) { ConnectionInfo connectionInfo = (ConnectionInfo) data; System.out.println("Connection started: " + connectionInfo); } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) { RemoveInfo removeInfo = (RemoveInfo) data; System.out.println("Connection stopped: " + removeInfo.getObjectId()); } else { System.err.println("Unknown message " + data); } } catch (Exception e) { e.printStackTrace(); } } }); } public Session getSession() { return session; } }
服务启动时会打印出下面的消息:
Connection started: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:YOS-01605061430-65527-1486105925973-1:1, clientId = ID:YOS-01605061430-65527-1486105925973-0:1, clientIp = tcp://127.0.0.1:49187, userName = null, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true, faultTolerant = true, failoverReconnect = false}
其它的advisory详见书(Activemq实战)282页.
10.3 虚拟topic
现在还无法实现把消息发送给主题,然后多个消费者按队列的形式从主题中消费.
虚拟主题允许生产者给一个正常的topic发送消息,同时消费者从队列中接收消息.所以消费者订阅到一个队列接送从主题中来的消息.具体结构看下图
为了能正确使用虚拟主题,必须要遵循一些规范.首先,主题名字应该以VirtualTopic.主题名来命名.所以如果你想要为一个名为order的主题创建虚拟主题,你可以这样命名:VirtualTopic.order.要从整个队列中消费,消费者必须订阅到一个名为(Consumer.消费者名称).VirtualTopic.(虚拟主题名称)的队列.
假设你要让消费者在一个队列中竞争消息,你可以创建两个队列接受者,都从名为Consumer.Foo.VirtualTopic.order中消费.
public static void main(String[] args) throws JMSException, InterruptedException { String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI); Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start(); String queueName = "Consumer.Foo.VirtualTopic.orders"; // Create the first consumer for Consumer.Foo.VirtualTopic.orders Session sessionA = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue fooQueueA = sessionA.createQueue(queueName); MessageConsumer consumerA = sessionA.createConsumer(fooQueueA); consumerA.setMessageListener(getMessageListener()); // Create the second consumer for Consumer.Foo.VirtualTopic.orders Session sessionB = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue fooQueueB = sessionB.createQueue(queueName); MessageConsumer consumerB = sessionB.createConsumer(fooQueueB); consumerB.setMessageListener(getMessageListener()); // Create the sender String topicName = "VirtualTopic.orders"; Connection senderConnection = connectionFactory.createConnection(); senderConnection.start(); Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic ordersDestination = senderSession.createTopic(topicName); MessageProducer producer = senderSession.createProducer(ordersDestination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // Send 2000 messages for (int i = 0; i < 20; ++i) { TextMessage message = senderSession.createTextMessage(); message.setText("i:" + i); producer.send(message); } } private static MessageListener getMessageListener() { return new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMsg = (TextMessage) message; try { System.out.println(textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }; }
虚拟主题提供了一种方便的机制来结合负载均衡以及队列的故障转移,消费者不需要担心创建唯一的JMS client Id和名称,也可以以负债均衡的形式实现消息竞争.如果其中一个消费者死亡,其它的消费者或继续接受队列中的消息.
10.4 可追溯的消费者
假设一个应用需要消费尽可能的快的被发送和消费,那么需要关闭消息的持久化.这样就有一个缺点,如果消费者在生产者启动之后启动,那么久有可能丢失消息.为了提供一个有效的追溯消息的方法,ActiveMq可以缓存一个可配置的大小或者数量的主题消息。这些由两部分组成--消息消费者需要通知代理它对追溯信息感兴趣,还需要配置代理的目的地中多少消息需要被缓存.为了标记消费者可追溯,你需要设置可追溯标志.
Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
在代理端,你可以配置相关的策略,默认是叫做FixedSizedSubscriptionRecoveryPolicy,默认大小是64KB.具体请参考subscriptionRecoveryPolicy配置
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">"> <subscriptionRecoveryPolicy> <fixedSizedSubscriptionRecoveryPolicy maximumSize="8mb"/> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
10.5 dead-letter(死信)队列
当代理上的消息过期或者不能被发送时,它们就会被移到dead-letter队列,它们可以在之后的某个时间段被管理员查看或者消费.
消息通常在以下场景中重发给客户端:
- 客户端使用事务并调用rollback方法
- 客户端使用事务却在提交事务之前关闭了.
- 客户端在session中使用了CLIENT_ACKNOWLEDGE,并调用了recover().
有时候客户端会因为消息错误的格式而不接受消息.这种情况下,一直重发消息是没有意义的.你可以配置Activemq代理重发消息之前的等待时间,在每次失败后是否应该增加时间,以及在被移到死信队列之前的最大重发次数.
以下使用程序配置的例子:
RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); policy.setMaximumRedeliveries(2);所有消息都有一个默认名为ActiveMQ.DLQ的死信队列.你可以配置死信队列的层次结构或者配置一个私有的目的地.
<broker...> <destinationPolicy> <policyMap> <policyEntries> <!— 设置所有队列,使用 '>' ,否则用队列名称 --> <policyEntry queue=">"> <deadLetterStrategy> <!-- queuePrefix:设置死信队列前缀 useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信 processExpired=false代表不处理过期的 processNonPersistent="false"代表不处理非持化的 --> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="false" processNonPersistent="false" /> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> ... </broker>当一条消息被发送到死信队列,一条advisory消息就会生成.你可以监听ActiveMQ.Advisory.MessageDLQd.* topic来获取死信队列中的advisory消息.
相关推荐
这个例子可以帮助我们理解如何在实际项目中集成ActiveMQ,实现异步通信和解耦。 总的来说,ActiveMQ是一个强大的工具,支持多种编程语言,如Java和JavaScript。通过Java的JMS API和JavaScript的StompWebsocket库,...
在实际项目中,还可以利用ActiveMQ的其他特性,如事务消息、持久化消息、优先级队列等,以满足更复杂的业务需求。同时,ActiveMQ支持集群部署,能够提供高可用性和负载均衡,进一步增强系统的稳定性。 总之,通过...
2. **ActiveMQ配置**:在ActiveMQ中,我们需要创建一个特定的队列或主题来接收来自Uploadify的文件信息。队列提供一对一的消息传递,而主题则支持一对多的广播模式。根据应用场景,可以选择合适的通信模式。 3. **...
在Java编程中,ActiveMQ作为一款强大的开源消息中间件,扮演着至关重要的角色。...在实际项目中,根据性能测试和具体业务需求,可以选择RabbitMQ、Redis或其他消息中间件产品与ActiveMQ配合使用,以达到最佳效果。
Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和读取消息的应用程序编程接口(API)。...这有助于我们在实际开发中更好地应用消息队列技术,提升系统的可扩展性和容错性。
博主可能通过实际操作展示了如何在项目中集成和使用ActiveMQ。 标签“源码”意味着这篇博客或文档可能会涉及到ActiveMQ的内部工作原理,可能包括对源代码的解读,帮助读者理解其底层机制。“工具”标签则暗示了...
在实际应用中,ActiveMQ可以帮助解耦应用程序,提高系统的并发性和可伸缩性。通过消息队列,它可以处理大量并发请求,缓解服务器压力,同时允许应用程序在不同时间处理消息,实现异步处理。此外,ActiveMQ的故障恢复...
这本书《ActiveMQ实战(英文版)》旨在深入探讨如何在实际环境中有效地使用和管理ActiveMQ。 ActiveMQ的核心功能包括: 1. **消息传递**:ActiveMQ作为消息代理,允许应用程序之间通过发送和接收消息进行异步通信。...
ActiveMQ中,我们使用`javax.jms.Topic`接口来创建和管理主题,而`MessageProducer`用于发布消息,`MessageSubscriber`用于订阅和接收消息。发布-订阅模式常用于实时通知场景,如股票市场更新、天气预报等,因为这些...
通过设置`brokerURI`参数来指定`activemq.xml`的路径,同时注册`SpringBrokerContextListener`监听器,以便于在应用启动时加载ActiveMQ的配置。 ```xml <param-name>brokerURI <param-value>/WEB-INF/activemq...
标题中的“在Java与C++间应用Activemq”指的是使用Apache ActiveMQ这一开源消息代理在Java和C++编程语言之间构建通信桥梁。ActiveMQ是Java Message Service (JMS) 的实现,允许分布式系统中的组件通过消息传递进行...
总之,这个实验旨在通过实际操作,使学习者深入了解消息中间件的实用价值,掌握ActiveMQ的基本操作,以及如何在Java环境中实现单线程处理多队列的高效通信模式。在完成实验后,学习者应具备设计和实现基于ActiveMQ的...
Apache ActiveMQ是开源社区中最流行的Java消息代理,也是企业级消息中间件(Message Broker)的首选之一。...在实际部署时,可以根据具体需求配置ActiveMQ的参数,如内存大小、持久化策略等,以达到最佳性能。
在实际应用中,开发者通常会使用ActiveMQ作为JMS的消息代理,创建消息生产者和消费者来实现消息的发送和接收。例如,生产者可以创建一个消息,设置其属性,然后通过ActiveMQ连接发送到队列或主题。消费者则监听指定...
ActiveMQ-CPP是Apache ActiveMQ的C++客户端库,它允许开发者在C++应用程序中使用消息中间件的功能,实现高效的异步通信。在本文中,我们将深入探讨如何在Visual Studio 2017(简称VS2017)环境下成功配置和测试...
在Java消息服务(JMS)领域,Apache ActiveMQ是一个广泛使用的开源消息代理,它提供了高效、可靠的异步通信能力。...在实际开发中,还需要根据项目的具体需求和服务器负载来调整连接池的参数,以达到最佳性能。
在实际应用中,Apache ActiveMQ常被用作企业级应用之间的消息桥梁,例如在微服务架构中,它可以帮助服务之间进行解耦通信,提高系统的稳定性和可扩展性。通过深入理解和熟练运用ActiveMQ,你可以构建出高效、可靠的...
在Linux系统上部署和使用Apache ActiveMQ能够提供稳定、高效的消息传递服务,支持分布式系统中的异步通信和解耦。 Apache ActiveMQ的主要功能包括: 1. **消息队列**: 它允许应用程序将消息放入队列中,由其他应用...
ActiveMQ在企业级应用中广泛应用,因为它支持多种协议,包括开放标准的JMS以及AMQP、STOMP、XMPP等,使得不同平台和语言的应用可以方便地进行通信。 1. **JMS(Java Message Service)**:JMS是Java平台中用于在...
8. **事务处理**:学习如何在ActiveMQ中使用JMS事务确保消息的一致性和可靠性。 9. **性能监控**:ActiveMQ提供了一套强大的监控工具,包括Web控制台,可以用来查看消息的发送、接收和堆积情况,帮助优化系统性能。...