`

How can I support priority queues in activeMQ

 
阅读更多

Use Message Priority

A common requirement is to support priority consumption; so high priority messages are consumed before low priority.

In version 5.4 priority queues are supported. Both the message cursors and the message stores (KahaDB and JDBC) support message priority. The support is disabled by default so it needs to be be enabled using Per Destination Policies through xml configuration, in the example below, 'prioritizedMessages' is enabled for all queues.

 <destinationPolicy>
   <policyMap>
     <policyEntries>
       <policyEntry queue=">" prioritizedMessages="true"/>
    ...

The full range of priority values (0-9) are supported by the JDBC message store. For KahaDB three priority categories are supported, Low (< 4), Default (= 4) and High (> 4).

Since the message cursors (and client side) implement strict ordering of priorities, it's possible to observe strict priority ordering if message dispatching can happen from the cache and not have to hit the disk (i.e., your consumers are fast enough to keep up with producers), or if you're using non-persistent messages that never have to flush to disk (using the FilePendingMessageCursor). However, once you hit a situation where consumers are slow, or producers are just significantly faster, you'll observe that the cache will fill up (possibly with lower priority messages) while higher priority messages get stuck on disk and not available until they're paged in. In this case, you can make a decision to tradeoff optimized message dispatching for priority enforcement. You can disable the cache, message expiration check, and lower you consumer prefetch to 1 to ensure getting the high priority messages from the store ahead of lower priority messages Note, this sort of tradeoff can have significant performance implications, so you must test your scenarios thoroughly. :

 <destinationPolicy>
   <policyMap>
     <policyEntries>
       <policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
    ...

Alternative strategies

Use Selectors

You can have say 100 consumers using a selector to find the high priority stuff

JMSPriority > 6

then have 50 consumers doing average or above

JMSPriority >= 4

Then say 10 consumers consuming all messages (so all priorities). Then this way you'll have a pool of threads always processing high priority messages - giving you very efficient priority based dispatching of messages without ActiveMQ having to batch up messages and reorder them before dispatching them.

Use Resequencer

You can reorder messages on some input queue A and send them to queue B in sorted order to avoid having to change your clients. This avoids the need to use selectors in your application as shown above.

To do this use the Resequencer from the Enterprise Integration Patterns

分享到:
评论

相关推荐

    Fast Priority Queues for Cached Memory.

    Fast Priority Queues for Cached Memory.

    PRIORITY QUEUES WITH SIMPLE QUEUE SERVICE

    return Integer.compare(this.priority, other.priority); } public String getMessage() { return message; } } public static void main(String[] args) { // 创建一个包含自定义比较逻辑的优先级队列...

    Java Methods-Heaps and Priority Queues.ppt

    二、优先队列(Priority Queue) 优先队列是一种数据结构,它能够根据优先级来存储和提取数据。优先队列的实现方式有很多种,常见的有基于数组的实现和基于链表的实现。优先队列的操作有add、remove和peek三个基本...

    ActiveMQ in Action

    ### ActiveMQ in Action:深入理解JMS及其关键概念 #### JMS基本构件 JMS(Java Message Service)作为一套标准接口规范,旨在为应用程序提供一种可靠且高效的消息传输机制。ActiveMQ作为Apache旗下的一个高性能...

    ActiveMQ 配置文件详解

    **ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...

    Group6_Thread_safe_PriorityQueues:LockFreeQueue和PipelinedBlockingQueue的实现

    线程安全PriorityQueues #操作说明实现在并发访问下性能比Java的PriorityBlockingQueue更好的pipelinedPriorityQueue和lockfreePriorityQueue。 #要求Java 1.8+ Eclipse中的Maven插件蚀#如何使用用Eclipse打开...

    Fast Interrupt Priority Management in OS Kernels

    Fast Interrupt Priority Management in OS Kernels

    ActiveMQ使用手册(中文版)

    ### ActiveMQ 使用手册知识点概述 #### 一、ActiveMQ 原理与基本构件 **1.1 连接工厂(Connection Factory):** - **定义:** 连接工厂是客户端用来创建连接的对象。在ActiveMQ中,`ActiveMQConnectionFactory` 类...

    ActiveMQ_in_Action_中文

    《ActiveMQ_in_Action_中文》一书深入探讨了Apache ActiveMQ这一开源消息中间件的各个方面,它是基于Java消息服务(JMS)规范构建的。本文将根据书籍的部分内容概述JMS的基本构件及其重要性,同时解析ActiveMQ如何...

    activeMQ介绍

    【ActiveMQ介绍】 ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它是Java消息服务(JMS)的实现,能够支持多种消息协议,如AMQP、STOMP等。ActiveMQ以其高效、稳定和灵活的特性在分布式系统中广泛应用,为...

    ActiveMQ 教学视频/教程 /附带笔记等资源

    ActiveMQ 是一个开源的消息中间件,它遵循Java消息服务(JMS)标准,为企业级应用程序提供高效率、可扩展和可靠的异步通信解决方案。在这个"ActiveMQ 教学视频/教程 /附带笔记等资源"的压缩包中,你将找到一系列关于...

    activeMq 实战

    ### ActiveMQ 实战 #### JMS 基本构件概览 **ActiveMQ** 是一个高性能、功能丰富的开源消息中间件,它实现了 **Java Message Service (JMS)** 规范。JMS 规范定义了一组接口,这些接口提供了一个标准的方式来进行...

    Data Structures, Algorithms and Applications in C++ Second Edition

    Chapter 12 Priority Queues Chapter 13 Tournament Trees Chapter 14 Binary Search Trees Chapter 15 Balanced Search Trees Chapter 16 Graphs Part III Algorithm Design Methods Chapter 17 The Greedy ...

    C和C++参考手册chm版

    C/C++ Reference General C/C++ Pre-processor commands Operator Precedence ...C++ Priority Queues C++ Queues C++ Stacks C++ Sets C++ Multisets C++ Maps C++ Multimaps C++ Bitsets All C++ Functions

    非常好用的C和C++函数库电子手册

    包括: General C/C++ Pre-processor commands ...C++ Priority Queues C++ Queues C++ Stacks C++ Sets C++ Multisets C++ Maps C++ Multimaps C++ Bitsets Iterators All C++ Functions 非常好用!英文版的

    activemq Demo

    Apache ActiveMQ是开源的、高性能的消息中间件,它在企业级应用中被广泛使用,用于实现应用程序间的异步通信和解耦。在这个"activemq Demo"中,我们可以探索ActiveMQ的基本概念、SSL/TLS安全配置以及如何在实际项目...

    activeMQ,JMS学习资料.pdf

    ActiveMQ和JMS是企业级消息传递的重要组成部分,它们为分布式系统中的应用程序提供了可靠的数据交换手段。Java消息服务(JMS)是一种API,由Sun Microsystems开发,用于在Java应用程序之间交换消息。ActiveMQ则是...

    STL中priority_queue

    STL 中的 priority_queue priority_queue 是 STL 中的一种容器,可以实现优先级队列的功能。下面,我们将详细介绍 priority_queue 的使用方法和实现原理。 priority_queue 的基本概念 priority_queue 是一种特殊...

    C/C++语言参考(英文版)

    C++ 语言参考 英文版 C/C++ Reference General C/C++ ...C++ Priority Queues C++ Queues C++ Stacks C++ Sets C++ Multisets C++ Maps C++ Multimaps C++ Bitsets All C++ Functions

Global site tag (gtag.js) - Google Analytics