项目运用rabbitMq 实现的分布式架构,每台机器既是生成者也是消费者,改项目依赖于外部调度服务,mq的consumer listener引用其他组的jar包(实现对外部资源的调用),消息消费是单线程的,在此调用jar包操作业务或者是个人代码写的有问题,可能导致线程死锁,或者其他代码问题(http请求未设置超时),导致消息消费被卡住,最终消息堆积。导致正常业务垮掉。既然consumer没配置消息的超时时间,java支持多线程,那么可以用来完美解决,worker线程执行任务,protect线程监听worker线程判断执行时间,(其实这两个线程都是在同时互相监听,各自执行完毕stop未执行完成的线程)不啰嗦直接上代码
BaseThreadUtil类 worker 线程和protect线程集成该抽象类
/** * Created by liweigao on 2017/4/25. */ public abstract class BaseThreadUtil extends Thread { public abstract void execute();//抽象方法需要子类实现 private String threadName = ""; //在父类重写run方法,在子类只要重写execute方法就可以了 @Override public void run() { super.run(); execute(); } //在需要回调数据的地方(两个子类需要),声明一个接口 public static interface Callback { public void complete(); } //2.创建接口对象 public Callback callback; public String getThreadName() { return threadName; } public void setThreadName(String threadName) { this.threadName = threadName; } }
worker 线程类 执行主要业务线程。
/** * Created by liweigao on 2017/4/25. */ public class WorkerThread extends BaseThreadUtil { private Logger logger = LoggerFactory.getLogger(WorkerThread.class); private Runnable runnable; public WorkerThread(Runnable runnable, String threadName) { this.runnable = runnable; if (threadName != null && !"".equals(threadName)) { super.setThreadName(threadName); } else { super.setThreadName("worker thread"); } } @Override public void execute() { StopWatch stopWatch=new StopWatch(); stopWatch.start(); if (runnable != null) { runnable.run(); } stopWatch.stop(); // System.out.println("线程:" + super.getThreadName() + "执行完毕,开始执行回调……"); logger.debug("线程:" + super.getThreadName() + "执行完毕,开始执行回调……耗时:"+stopWatch.getTotalTimeMillis() +"ms"); //任务执行完毕 执行回调 callback.complete(); } }
protect 线程类。
/** * Created by liweigao on 2017/4/25. */ public class ProtectThread extends BaseThreadUtil { private Logger logger = LoggerFactory.getLogger(ProtectThread.class); private Integer timeout = 6000; public ProtectThread(Integer timeout, String threadName) { this.timeout = timeout; if (threadName != null && !"".equals(threadName)) { super.setThreadName(threadName); } else { super.setThreadName("protect thread"); } } @Override public void execute() { StopWatch stopWatch = new StopWatch(); stopWatch.start(); try { Thread.sleep(timeout); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } stopWatch.stop(); // System.out.println("线程:" + super.getThreadName() + "执行完毕,开始执行回调……"); logger.debug( "线程:" + super.getThreadName() + "执行完毕,开始执行回调……耗时:" + stopWatch.getTotalTimeMillis() + "ms"); //线程任务执行完毕 执行回调 callback.complete(); } }
baseStopUtil 类 也可以叫回调类,处理未完成线程。
/** * Created by liweigao on 2017/4/25. */ public class BaseStopUtil implements Callback { private Logger logger = LoggerFactory.getLogger(BaseStopUtil.class); BaseThreadUtil baseCallBackUtil; // 获取对象 public BaseStopUtil(BaseThreadUtil baseCallBackUtil) { this.baseCallBackUtil = baseCallBackUtil; } @Override public void complete() { // System.out.println("线程:" + baseCallBackUtil.getThreadName() + "被停掉……"); logger.debug("线程:" + baseCallBackUtil.getThreadName() + "被停掉……"); if (baseCallBackUtil.isAlive()) { baseCallBackUtil.stop(); } } }
ListenThreadConsumer 封装对外调用方法,启动两个线程,配置超时时间。
/** * Created by liweigao on 2017/4/25. */ public class ListenThreadConsumer { /** * 过期时间 */ private Integer timeout; /** * 任务 */ private Runnable runnable; /** * 检测间隔时间 默认1000 */ // private Integer spacetime; public ListenThreadConsumer(Integer timeout, Runnable runnable) { this.timeout = timeout; this.runnable = runnable; } public void execute() { ProtectThread protectThread = new ProtectThread(timeout, ""); WorkerThread workerThread = new WorkerThread(runnable, ""); protectThread.callback = new BaseStopUtil(workerThread); workerThread.callback = new BaseStopUtil(protectThread); protectThread.start(); workerThread.start(); } }
那么就可以很简单的运用到自己的程序中了。main方法测试。
public static void main(String[] args) { ListenThreadConsumer listenThreadConsumer = new ListenThreadConsumer(100, new Runnable() { @Override public void run() { System.out.println("这是我的测试……………………"); } }); //执行任务以及监控 listenThreadConsumer.execute(); }
打印日志: 这是我的测试…………………… 2017-04-28 18:56:50.916 [Thread-1] DEBUG c.w.s.c.util.thread.WorkerThread-[34] - 线程:worker thread执行完毕,开始执行回调……耗时:1ms 2017-04-28 18:56:50.922 [Thread-1] DEBUG c.w.s.c.util.thread.BaseStopUtil-[23] - 线程:protect thread被停掉…… 以上完毕,可直接copy复用。以上信息欢迎大神吐槽,欢迎提建议。互相监控,其中哪一个线程有问题,都会被停掉。
相关推荐
本文将深入探讨Java中如何实现单个线程的执行超时监控。 首先,我们可以使用`java.util.concurrent`包中的`Future`和`ExecutorService`来实现线程超时。`ExecutorService`是一个接口,它提供了管理和控制线程池的...
本项目以"java多线程实现大批量数据导入源码"为题,旨在通过多线程策略将大量数据切分,并进行并行处理,以提高数据处理速度。 首先,我们需要理解Java中的线程机制。Java通过`Thread`类来创建和管理线程。每个线程...
java一些简单的多线程用法,适合初学者
在Java编程中,控制程序执行超时是一项重要的任务,特别是在多线程环境下,我们可能需要确保某个任务不会无限制地运行下去,导致资源耗尽。本文将深入探讨如何使用Java的线程机制来实现程序执行的超时控制,同时也会...
本主题将深入探讨如何使用Java的并发包(java.util.concurrent)来实现多线程对数据库数据的批量处理,包括增、删、改等操作。 首先,我们需要了解Java中的线程基础。线程是程序执行的最小单位,一个进程可以包含多...
Java多线程读大文件 java多线程写文件:多线程往队列中写入数据
Java多线程实现数据切割批量执行,实现限流操作。 java线程池Executors实现数据批量操作。 批量异步Executors处理数据,实现限流操作,QPS限流。 线程池调用第三方接口限流实现逻辑。 案例适合: 1.批量处理大数据。...
多线程允许我们同时处理多个任务,这对于大文件传输或需要并行处理的场景尤其有用。本篇文章将深入探讨如何使用Java实现多线程文件传输,并涵盖以下几个关键知识点: 1. **线程基础**:在Java中,线程是程序执行的...
在当今高度并发的应用环境中,Java多线程技术被广泛应用于处理数据库操作,以提升系统的响应速度和处理能力。本文将基于一个具体的Java多线程操作数据库的应用程序,深入探讨其背后的原理、实现细节以及潜在的挑战。...
Java多线程导出Excel是处理大数据量时的一种高效策略,尤其在面对千万级别的数据时。传统的Apache POI库在处理大规模数据时可能会遇到栈溢出(StackOverflowError)和内存溢出(OutOfMemoryError)等问题,因为这些...
本实例将探讨如何利用Java实现一个具有进度条显示功能的多线程应用。进度条通常用于可视化地表示某个任务的完成程度,这对于长时间运行的操作如文件下载、上传或大型计算来说非常有用。 首先,我们要理解Java中的...
在Java中,实现多线程有两种主要方式:通过实现Runnable接口或者继承Thread类。本案例将深入探讨Java多线程中的关键知识点,包括线程同步、线程通信和线程阻塞。 线程同步是为了防止多个线程同时访问共享资源,导致...
在Java编程中,多线程技术是实现高效并发处理的关键,尤其在大数据量处理和分布式系统中。本主题聚焦于如何利用多线程实现在不同数据库间的数据互导,以及结合连接池技术来优化数据库操作,并实现多表插入功能。我们...
在Java编程中,多线程查询数据库是一种常见的优化策略,特别是在处理大数据量或者需要并行执行多个查询时。本文将详细探讨如何利用Java的多线程技术和线程池来实现并发查询数据库,以及相关的文件`BatchDataUtil....
在Java编程中,控制程序执行超时是一项关键任务,特别是在多线程环境下,我们需要确保某个任务不会无限期地运行,导致资源浪费或者阻塞其他重要任务。本篇将深入探讨如何利用Java的线程和定时器(Timer)来实现这个...
在Java编程中,多线程并发是提升程序执行效率、充分利用多核处理器资源的重要手段。本文将基于"java 多线程并发实例"这个主题,深入探讨Java中的多线程并发概念及其应用。 首先,我们要了解Java中的线程。线程是...
本文将详细介绍一个基于Java实现的多线程文件上传系统,并结合队列管理技术来优化后台处理流程。该系统通过创建多个线程来并行处理客户端的文件上传请求,同时利用队列结构来协调任务的调度与执行。 #### 关键技术...
#### 二、Java多线程分页查询原理及实现 ##### 1. 分页查询基础概念 分页查询是指在查询数据时,将数据分成多个页面展示,而不是一次性返回所有数据。这种方式能够有效地减少单次查询的数据量,从而提高查询速度和...
本篇文章将详细介绍三种不同的方法,帮助Java客户端从MQ队列接收消息。 1. **IBM WebSphere MQ JMS API** IBM的WebSphere MQ提供了一套Java Message Service (JMS) API,允许Java应用程序与MQ队列进行通信。首先,...
在Java中,多线程的实现主要通过两种方式:继承Thread类和实现Runnable接口。理解并掌握多线程的使用对于任何Java开发者来说都至关重要。 一、线程的创建与启动 1. 继承Thread类:创建一个新的类,该类继承自Thread...