1.问题背景
对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。
那么问题来了,当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?
正常来说,订阅者程序关闭后,消息会在发送者队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。唯一可能丢失的消息,就是在关闭的一瞬间,已经从队列中取出但还没有处理完毕的消息。
因此我们需要一套平滑关闭的机制,保证在重启的时候,消息可以正常处理完成。
2.问题分析
平滑关闭的思路如下:
- 在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中
- 关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)
- 程序退出
关闭消息订阅:一般消息队列的客户端都提供关闭连接的方法,具体可以自行查看api
关闭线程池:Java的ThreadPoolExecutor
线程池提供shutdown()
和shutdownNow()
两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()
方法进行关闭,并通过isTerminated()
,方法判断线程池是否已经关闭.
那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?
在Linux中,我们可以用kill -9 pid
关闭进程,除了-9之外,我们可以通过 kill -l
查看kill 命令的其它信号量,比如使用 12) SIGUSR2 信号量
我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。
伪代码如下
//注册linux kill信号量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
@Override
publicvoidhandle(Signal signal){
//关闭订阅者
//关闭线程池
//退出
}
});
下面通过一个demo模拟相关逻辑操作
首先模拟一个生产者,每秒生产5个消息
然后模拟一个订阅者,收到消息后交给线程池进行处理,线程池固定4个线程,每个消息处理时间1秒,这样线程池每秒会积压1个消息。
package com.lujianing.demo;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import java.util.concurrent.*;
/**
* @author lujianing01@58.com
* @Description:
* @date 2016/11/14
*/
public classMsgClient{
//模拟消息队列订阅者 同时4个线程处理
private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
//模拟消息队列生产者
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
//用于判断是否关闭订阅
private static volatile boolean isClose = false;
publicstaticvoidmain(String[] args)throws InterruptedException {
BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
producer(queue);
consumer(queue);
}
//模拟消息队列生产者
privatestaticvoidproducer(final BlockingQueue queue){
//每200毫秒向队列中放入一个消息
SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
publicvoidrun(){
queue.offer("");
}
}, 0L, 200L, TimeUnit.MILLISECONDS);
}
//模拟消息队列消费者 生产者每秒生产5个 消费者4个线程消费1个1秒 每秒积压1个
privatestaticvoidconsumer(final BlockingQueue queue)throws InterruptedException {
while (!isClose){
getPoolBacklogSize();
//从队列中拿到消息
final String msg = (String)queue.take();
//放入线程池处理
if(!THREAD_POOL.isShutdown()) {
THREAD_POOL.execute(new Runnable() {
publicvoidrun(){
try {
//System.out.println(msg);
TimeUnit.MILLISECONDS.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
//查看线程池堆积消息个数
privatestaticlonggetPoolBacklogSize(){
long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
return backlog;
}
static {
String osName = System.getProperty("os.name").toLowerCase();
if(osName != null && osName.indexOf("window") == -1) {
//注册linux kill信号量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
@Override
publicvoidhandle(Signal signal){
System.out.println("收到kill消息,执行关闭操作");
//关闭订阅消费
isClose = true;
//关闭线程池,等待线程池积压消息处理
THREAD_POOL.shutdown();
//判断线程池是否关闭
while (!THREAD_POOL.isTerminated()) {
try {
//每200毫秒 判断线程池积压数量
getPoolBacklogSize();
TimeUnit.MILLISECONDS.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("订阅者关闭,线程池处理完毕");
System.exit(0);
}
});
}
}
}
当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient
另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id
ps -fe|grep MsgClient
当我们执行kill -12 pid的时候 可以看到关闭业务逻辑
3.问题总结
在部门的实际业务中,消息队列的消息量还是挺大的,某些业务高峰时每秒有几百的消息量,因此对消息的处理要保证速度,避免消息积压,也可以通过负载解决单个订阅节点的压力。
在某些业务场景中,对消息的完整性要求不那么高,那么就不用考虑重启时的一点损耗。反之,就需要好好思考和设计了。
ps:以后会恢复写博客的习惯,争取每周都更新,如有兴趣,欢迎关注
2016-11-17补充
ThreadPoolExecutor
的getQueue().size()
方法,返回的为线程池队列中积压的消息数
getTaskCount()
- getCompletedTaskCount()
,返回的为线程池队列积压的和正在处理中的消息数
2016-11-19补充
感谢 @纳兰清风 的提醒
Java中可以通过调用Runtime.getRuntime().addShutdownHook()
方法,在Jvm退出时触发回调
kill -15 pid
可以触发调用对应的钩子方法
今天看重启脚本中确实也有 kill -15
的命令 但是不知道两者是有关系的
2016-11-20补充
kill -1 pid
和 kill -2 pid
都能触发钩子方法
SIGTERM,SIGINT,SIGHUP三种信号都会触发shutdownhook
2016-11-27补充
在公司的RPC服务框架中,也是走了相同的平滑关闭策略
通过kill -12 设置rpc框架的关闭状态,通过kill -15 关闭相关连接
相关推荐
Java消息队列的简单实现代码 Java 消息队列是一种异步处理机制,主要目的是减少请求响应时间和解耦。消息队列的应用场景非常广泛,例如在用户注册时,服务端收到用户的注册请求后,可以将其他操作放入消息队列中,...
6. **启动和停止消息队列**:在需要时启动和关闭消息队列服务。在Windows中,这可以通过服务管理器完成。 7. **测试和调试**:编写测试用例,确保消息能够正确发送和接收。使用日志记录和错误处理机制进行调试。 ...
- **异步处理**:消息队列允许系统将非实时任务放入队列,从而避免阻塞主线程,提高响应速度。 - **服务解耦**:消息队列作为中介,使得服务之间不需要直接交互,降低耦合度。 - **流量控制**:在面临流量洪峰时...
Java Queue,scheduler,ThreadPoolManager,两套例子,直接可以运行的。
标题中的“Java实现的线程池、消息队列功能”是指在Java编程中,如何利用编程技术实现线程池和消息队列这两种重要的并发处理机制。线程池和消息队列是解决多线程环境下资源管理和任务调度的有效手段,它们在高并发、...
对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收...这篇文章主要给大家介绍了关于Java中消息队列任务的平滑关闭的相关资料,需要的朋友可以参考下。
Java 消息队列(Message Queue,简称MQ)是一种在分布式系统中用于解耦和异步处理的中间件,它允许应用程序之间通过发送和接收消息进行通信,而无需两者之间直接交互。在Java中,我们可以使用多种MQ实现,如Apache ...
此文档是C#开发的消息队列系统,适用于消息队列入门与新手。 在Windows 7 上安装消息队列的步骤 打开“控制面板”。 单击“程序”,然后在“程序和功能”下, 单击“打开或关闭 Windows 功能”。 -或者-单击“经典...
这里,我们探讨的主题是“Go-管理任务定时将任务放入消息队列直到任务关闭”,这个系统设计主要涉及到以下几个核心概念和技术: 1. **任务管理**:任务管理是整个系统的基石,包括创建、更新、查询以及删除任务。...
消息队列延迟定时任务是软件开发中一种常见且重要的技术,它主要用于处理那些需要在特定时间点执行的任务,比如订单超时处理、定时发送邮件等。在这个场景中,Redis作为一个功能丰富的键值存储系统,被广泛用作消息...
消息队列在软件开发中扮演着重要角色,它允许应用程序异步处理耗时任务,提高系统响应速度和整体性能。think-queue是专门为ThinkPHP5.1设计的一个高效、稳定的队列处理库。 描述中的“tp5.1安装使用think-queue”...
在Java中,使用Starling可以创建一个消息队列,生产者将任务封装成消息并放入队列,消费者则在后台线程中取出并执行这些任务,实现了任务的异步处理。 接下来,我们来看两个示例文件:SetQueueTest.java和...
消息队列在IT行业中扮演着至关重要的角色,尤其是在构建高并发、高可用的分布式系统时。消息队列(Message Queue)是一种中间件,用于在不同的应用程序之间传递消息,以解耦生产者(发送数据的应用)和消费者(处理...
在Java开发中,消息队列的使用尤为广泛,因为它能够帮助处理高并发场景下的数据交换问题,同时也能够实现异步处理,提高系统的响应速度。 首先,我们需要理解消息队列的基本概念。消息队列是一个存储消息的缓冲区,...
### Java中间件中的消息队列应用 #### 一、消息队列的主要应用场景 **1.1 解耦** 在传统的系统架构中,系统A通常需要直接调用系统B和系统C的功能,这种直接调用的方式使得各个系统之间的耦合度非常高。例如,如果...
在标题和描述中提到的“消息队列处理”和“队列任务”,涉及的核心概念主要包括消息传递、任务调度以及异步处理。 1. **消息传递**:消息队列基于发布/订阅或生产者/消费者模型,允许不同服务之间通过消息进行通信...
该项目是一款基于Redis队列技术的Java消息队列设计源码,集成了21个文件,其中包含17个Java源文件、1个LICENSE文件、1个Markdown文件、1个XML配置文件。该系统旨在通过Redis队列实现高效的消息队列管理,适用于需要...
Java队列模拟实现是一个典型的计算机科学中的数据结构应用,它主要涉及了Java编程语言和队列数据结构。在这个工程中,开发者已经创建了一个基于图形用户界面(GUI)的应用程序,用于演示和操作队列的各种功能。以下...
java消息队列