`
wangchongan
  • 浏览: 3543 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

采用BlockingQueue实现内存消息队列

阅读更多

在我们日常开发过程中,有时候需要采用内存消息队列的方案来实现我们想要的功能。

        比如日志系统:

我们需要把系统中关键模块的日志写入文件或者数据库以便存档分析。那么我们可以采用内存队列来存储各个模块输出的日志,另外再由专门的日志存储端把日志写入存储系统中。基本架构如下:

image

图1:日志系统局部架构

 

这样做的好处是打印日志的模块只需要关注打印日志,不用关心日志存储到哪里和如何分类存储等逻辑。而日志读取存储端也只需要关注读取和分析队列中的日志消息即可。

 

      再比如通用邮件发送功能:

一般业务系统中常需要发送各种邮件,比如给用户的业务提醒邮件、修改密码时的密保邮件、业务功能报警邮件等等。那么我们可以采用内存消息队列来存储邮件实体,需要发送邮件的业务功能可以直接把封装好的邮件消息丢入队列中,另外再由专门负责邮件发送端去负责处理真正邮件发送的逻辑。基本架构类似上图,如下:

 

image

图2:邮件发送系统局部架构

 

在这种场景下面,我们采用基于消息队列的设计去实现有以下优点:

1、代码逻辑清晰且易使用

职责分离后,代码逻辑更清晰是必然的,并且在功能的使用上也更简单,比如如果是使用Spring技术的话,业务代码中只需要注入一个统一发送服务类即可。

2、业务和基础设施解耦

负责写日志和发邮件的业务模块逻辑只管把封装好的实体往消息队列中丢就好了,不用去关心日志是怎么存储的和邮件是怎么发送出去的。另外专门负责日志存储和邮件发送的基础设施层由于已经和业务解耦了,那么可以只关注于技术层面的设计和优化。

3、易于进行性能调优和设计

业务逻辑和技术基础逻辑进行解耦以后,两部分就是独立的功能,那么基础设施端可以更易于在此基础上进行性能调优和进行良好的架构设计,比如为了提高吞吐量,我们可以把消息的消费端(即日志存储端和邮件发送端)设计为多线程并发处理的方式。

 

实现消息队列的方法有很多种,成熟的产品比如RabbitMQActiveMQ 和 ZeroMQ,这些产品都是比较成熟的实现消息队列的方案,一般企业里面都是直接使用或者基于这些技术再开发出一套定制化的解决方案。但本文主题不是讨论这些技术,而是针对一种简单的场景,也就是无需使用复杂的消息队列产品,只是想建立一个便于使用的内存消息队列就可以满足需求的场景。这种场景一般有以下特点:

1、消息量不大

2、消息安全性不高

3、不需要HA

4、不需要完备的failover机制

 

下面我们来看下基于阻塞队列实现内存消息队列的基本原理图:

image

图3:基本原理图

统一消息发送端

封装了接收消息、消息合法性校验、消息转化、格式化序列化以及put入队列的逻辑,被业务代码所使用。

阻塞队列

基于BlockQueue实现,在外层做了一定封装。

消息接收消费端

负责读取和消费队列中的消息,在无消息时会进行阻塞等待,遇到异常时会交给异常处理机制进行处理。

异常处理机制

负责在消费消息时发生异常时的后续处理,比如把消息经过处理后重丢回队列或者存储入异常队列专门有一套异常处理流程进行处理等等,具体本文不做详细讨论。

 

一般在这种设计下的消费任务总是由一个专门的线程去监听队列并阻塞等待,而这个线程一般都是随应用启动而启动,所以原理图里的应用初始化启动就是这个意思,说明了消费端的线程是随着应用初始化而创建,并且是常驻的。

 

原理和设计说完了,下面以接收发送报警邮件的业务场景为例,贴一下关键代码:

以下是对BlockQueue进行封装后的队列,接收特定的实体。

/**
 * 报警阻塞队列
 * 
 * @author chongan.wangca
 */
public class AlarmMessageQueue {

    private Logger                        logger            = LoggerFactory.getLogger(AlarmMessageQueue.class);

    //队列大小
    public static final int               QUEUE_MAX_SIZE    = 100;

    private static AlarmMessageQueue      alarmMessageQueue = new AlarmMessageQueue();

    //阻塞队列
    private BlockingQueue<AlarmMessageVO> blockingQueue     = new LinkedBlockingQueue<AlarmMessageVO>(QUEUE_MAX_SIZE);

    public static AlarmMessageQueue getInstance() {
        return alarmMessageQueue;
    }

    /**
     * 消息入队
     * @param alarmMessageVO
     * @return
     */
    public boolean push(AlarmMessageVO alarmMessageVO) {
        return this.blockingQueue.offer(alarmMessageVO);
    }

    /**
     * 消息出队
     * @return
     */
    public AlarmMessageVO poll() {
        AlarmMessageVO result = null;
        try {
            result = this.blockingQueue.take();
        } catch (InterruptedException e) {
            logger.error("", e);
        }
        return result;
    }

    /**
     * 获取队列大小
     * @return
     */
    public int size() {
        return this.blockingQueue.size();
    }
}

 

消息消费端代码节选,示意了阻塞等待及消费消息的实现方式:

    /*
     * 无限循环阻塞等待及消费消息
     * @see java.lang.Runnable#run()
     */
    public void run() {
        while (true) {
            try {
                AlarmMessageVO alarmMessageVO = AlarmMessageQueue.getInstance().poll();
                process(alarmMessageVO);
            } catch (Exception e) {
                logger.error("Poll AlarmMessageVO from AlarmMessageQueue error or send alarm mail error.", e);
            }
        }
    }

注意笔者这里的业务场景不需要对消费异常的消息进行重试,但大家需要根据自己的业务场景去决定是否需要在catch里面进行异常处理流程。

下面再来看下消息发送端是如何使用的,其实很简单:

    public void moduleAlarm(ModuleResourceDO moduleResourceDO, List<ResourceHolder> resourceHolderList) {
        AlarmMessageVO alarmMessageVO = new AlarmMessageVO(moduleResourceDO, resourceHolderList);
        // 把报警的所需数据放进阻塞队列,交给新的负责发邮件的处理类进行异步处理,本方法尽可能以最快速度响应返回
        AlarmMessageQueue.getInstance().push(alarmMessageVO);
    }

最后在应用初始化时候把消费端启动起来,这点可以使用Spring的init-method配置下实现,但是这里有个小问题,因为消费端是阻塞的,所以直接配置的话会导致应用启动不起来,这里需要小技巧,即init-method配置的不是消费端,而是启动消费端的Service,在这个Service里面可以采用new Thrad的方式去把消费端启动起来。如:

/**
 * 用于启动异步发送报警信息线程,由于该执行是block的,无法直接使用spring init-method
 * 
 * @author chongan.wangca
 */
public class AsynAlarmServiceStarter {

    private AsyncAlarmService asyncAlarmService;

    public void init() {
        Thread asyncAlarmServiceThread = new Thread(asyncAlarmService);
        asyncAlarmServiceThread.start();
    }

    public void setAsyncAlarmService(AsyncAlarmService asyncAlarmService) {
        this.asyncAlarmService = asyncAlarmService;
    }

}

好,到这里本文要讲述的内容基本讲完了。大家如果遇到类似的场景不妨采用类似方案,这样会让你得到更多好处。但是话说回来,如果你的场景需要处理消息量很大,并且需要多个MQ,需要进行HA、failover等,那么建议你采用现有的例如RabbitMQ或ActiveMQ,像在阿里有基于ActiveMQ开发的Napoli,这些都是消息队列很好的技术产品。

 

(全文完)

 -------------------------------------

专注Java开发及其相关领域技术。致力于多线程、大并发、高性能、海量数据研究和学习。欢迎加入一起学习讨论。加入Q-群:253042038

 个人博客地址:http://wangchongan.com

分享到:
评论

相关推荐

    高效的实现队列

    在IT行业中,队列是一种非常...总之,高效地实现队列需要考虑到性能、内存利用率、线程安全等因素,具体实现方式取决于应用场景和需求。通过理解和掌握不同的队列实现,开发者能够更好地优化系统性能,解决实际问题。

    Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

    1. `ArrayBlockingQueue`:基于数组的有界队列,创建时需要指定容量,它保证了线程安全,采用互斥锁控制。 2. `LinkedBlockingQueue`:基于链表的无界队列(可选有界),如果不指定容量,默认大小为`Integer.MAX_...

    java.util.concurrent 实现线程池队列

    2. **LinkedBlockingQueue**:基于链表结构的无界阻塞队列,吞吐量通常高于ArrayBlockingQueue,但在高并发下,内存消耗会较大。 3. **SynchronousQueue**:一个不存储元素的阻塞队列,每个插入操作必须等到另一个...

    线程安全队列Queue

    - **高性能**:由于采用了分段锁机制,`LinkedBlockingQueue`允许生产者和消费者线程同时访问队列,从而提高了并发性能。 - **容量控制**:可以通过构造函数指定队列的最大容量,避免无限制增长导致内存溢出的问题。...

    QueueMonitor:使用 Java 的队列监视器

    综上所述,`QueueMonitor`是Java并发编程中的一个重要工具,它利用`BlockingQueue`接口及其实现,结合监控策略、并发控制和日志系统,实现对队列操作的实时监控。理解和掌握这一技术,对于提升Java应用的性能和健壮...

    操作系统课程设计-Spooling技术Java实现

    其次,定义一个打印队列,使用Java的`BlockingQueue`实现并发安全的队列操作;然后,创建一个预处理线程,对队列中的任务进行处理;再创建一个打印线程,模拟打印机的工作,从队列中取出任务并打印;最后,设置一个...

    Java并发编程之阻塞队列详解

    相较于ArrayBlockingQueue,它的内存开销稍大,但在队列操作上的性能可能更优。 3. **PriorityBlockingQueue**:此队列按元素的优先级排序,遵循先进先出(FIFO)原则。它是一个无界的阻塞队列,即没有容量限制。 ...

    BlockingFQueue:基于磁盘持久存储的阻塞队列(Fast and Persistent Blocking Queue)

    Java中实现阻塞队列通常基于`java.util.concurrent`包下的`BlockingQueue`接口,`BlockingFQueue`可能也是基于这个接口实现的。接口中定义了如`put`、`take`、`offer`等操作,这些操作都具有阻塞特性。具体实现可能...

    高性能java系统实现与调优

    - **异步通信**:采用消息队列等机制,降低系统间的耦合度。 - **垂直拆分**:根据业务需求将应用和服务划分为多个独立的部分。 #### 设计模式与实践 - **单例模式**:对于频繁使用的大对象,采用单例模式减少对象...

    腾讯面经.pdf

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景...

    java_thread_cn.rar_Java 线程池_java thread books_java线程_线程池_线程池调度

    在Java中,线程池的实现主要依赖于`java.util.concurrent`包中的`ExecutorService`接口和它的实现类,如`ThreadPoolExecutor`。`ExecutorService`定义了线程池的基本操作,如执行任务、关闭线程池等。而`...

    并发容器和线程池,java并发编程3

    - **无界队列**:理论上可以无限存储元素,但在实际应用中会受到内存限制。 - **高性能**:适用于大量线程进行读取操作的场景。 - **非阻塞性**:使用CAS(Compare and Swap)而非传统的锁机制来保证线程安全。 **...

    Java面试题集合部分.docx

    - `Queue`:主要用于实现队列操作,例如`BlockingQueue`(阻塞队列)。 2. **快速失败机制**:集合内部维护了一个`modCount`变量,用于记录集合结构改变的次数。在迭代器遍历时,如果发现实际`modCount`与预期值...

    java多线程设计模式详解PDF及源码

    使用BlockingQueue(阻塞队列)可以实现高效的同步机制。 2. 单例模式:在多线程环境中,确保一个类只有一个实例且提供全局访问点,常用于配置管理、日志记录等。双检锁/双重校验锁(DCL,即double-checked locking)...

    JCToolsJDK中缺失的并发工具

    - `MpmcArrayQueue`:支持多个生产者和消费者,采用无锁和CAS操作实现,性能优秀。 - `MpmcLinkedArrayQueue`:与`MpmcArrayQueue`类似,但使用链表结构,提供更大的灵活性。 3. **Ring Buffer**: - `Ring...

    【并发编程】自定义简单线程池.pdf

    - 设定队列的最大容量,防止内存溢出。 - 使用`ReentrantLock`实现加锁,确保线程安全。 - 提供多种入队和出队方法: - `void put(T task)`:无超时阻塞添加。 - `boolean put(T task, long timeout, TimeUnit ...

    java多线程_设计模式_各种技术(我的书架)

    Java中可以通过BlockingQueue(阻塞队列)来实现这种模式,如ArrayBlockingQueue或LinkedBlockingQueue。 3. 观察者模式:也称为发布订阅模式,用于实现对象之间的解耦。在多线程环境下,需要确保事件的发布和订阅...

    mpmc

    3. **阻塞与非阻塞**:选择合适的阻塞策略,如使用`BlockingQueue`的`put()`和`take()`方法,或者采用无阻塞的自旋等待。 4. **容量设计**:队列的大小需要根据系统需求和性能目标进行合理设置,过大会浪费内存,过...

    Concurrent In java

    其内部实现采用了分段锁的机制,即将整个映射表分割成多个小段,每个段都有自己的锁,当某个段被锁定时,其他段仍然可以进行并发操作。这种设计极大地减少了锁的竞争,提高了并发性能。 `ConcurrentHashMap`还提供...

Global site tag (gtag.js) - Google Analytics