`
SoberChina
  • 浏览: 77295 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java多线程实现mq消息处理超时监控

阅读更多

 

项目运用rabbitMq 实现的分布式架构,每台机器既是生成者也是消费者,改项目依赖于外部调度服务,mq的consumer listener引用其他组的jar包(实现对外部资源的调用),消息消费是单线程的,在此调用jar包操作业务或者是个人代码写的有问题,可能导致线程死锁,或者其他代码问题(http请求未设置超时),导致消息消费被卡住,最终消息堆积。导致正常业务垮掉。既然consumer没配置消息的超时时间,java支持多线程,那么可以用来完美解决,worker线程执行任务,protect线程监听worker线程判断执行时间,(其实这两个线程都是在同时互相监听,各自执行完毕stop未执行完成的线程)不啰嗦直接上代码

 

 BaseThreadUtil类 worker 线程和protect线程集成该抽象类

/**
 * Created by liweigao on 2017/4/25.
 */
public abstract class BaseThreadUtil extends Thread {


  public abstract void execute();//抽象方法需要子类实现

  private String threadName = "";

  //在父类重写run方法,在子类只要重写execute方法就可以了
  @Override
  public void run() {
    super.run();
    execute();
  }

  //在需要回调数据的地方(两个子类需要),声明一个接口
  public static interface Callback {

    public void complete();
  }

  //2.创建接口对象
  public Callback callback;

  public String getThreadName() {
    return threadName;
  }

  public void setThreadName(String threadName) {
    this.threadName = threadName;
  }
}

 worker 线程类 执行主要业务线程。

/**
 * Created by liweigao on 2017/4/25.
 */
public class WorkerThread extends BaseThreadUtil {

  private Logger logger = LoggerFactory.getLogger(WorkerThread.class);

  private Runnable runnable;

  public WorkerThread(Runnable runnable, String threadName) {
    this.runnable = runnable;
    if (threadName != null && !"".equals(threadName)) {
      super.setThreadName(threadName);
    } else {
      super.setThreadName("worker thread");
    }
  }

  @Override
  public void execute() {
    StopWatch stopWatch=new StopWatch();
    stopWatch.start();
    if (runnable != null) {
      runnable.run();
    }
    stopWatch.stop();
//    System.out.println("线程:" + super.getThreadName() + "执行完毕,开始执行回调……");
    logger.debug("线程:" + super.getThreadName() + "执行完毕,开始执行回调……耗时:"+stopWatch.getTotalTimeMillis() +"ms");
    //任务执行完毕  执行回调
    callback.complete();
  }
}

 protect 线程类。

 

/**
 * Created by liweigao on 2017/4/25.
 */
public class ProtectThread extends BaseThreadUtil {

  private Logger logger = LoggerFactory.getLogger(ProtectThread.class);

  private Integer timeout = 6000;

  public ProtectThread(Integer timeout, String threadName) {
    this.timeout = timeout;
    if (threadName != null && !"".equals(threadName)) {
      super.setThreadName(threadName);
    } else {
      super.setThreadName("protect thread");
    }
  }

  @Override
  public void execute() {
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    try {
      Thread.sleep(timeout);
    } catch (InterruptedException e) {
      logger.error(e.getMessage(), e);

    }
    stopWatch.stop();
//    System.out.println("线程:" + super.getThreadName() + "执行完毕,开始执行回调……");
    logger.debug(
        "线程:" + super.getThreadName() + "执行完毕,开始执行回调……耗时:" + stopWatch.getTotalTimeMillis() + "ms");
    //线程任务执行完毕 执行回调
    callback.complete();
  }
}

 baseStopUtil 类 也可以叫回调类,处理未完成线程。

/**
 * Created by liweigao on 2017/4/25.
 */
public class BaseStopUtil implements Callback {

  private Logger logger = LoggerFactory.getLogger(BaseStopUtil.class);
  BaseThreadUtil baseCallBackUtil;

  // 获取对象
  public BaseStopUtil(BaseThreadUtil baseCallBackUtil) {
    this.baseCallBackUtil = baseCallBackUtil;
  }

  @Override
  public void complete() {
//    System.out.println("线程:" + baseCallBackUtil.getThreadName() + "被停掉……");
    logger.debug("线程:" + baseCallBackUtil.getThreadName() + "被停掉……");
    if (baseCallBackUtil.isAlive()) {
      baseCallBackUtil.stop();
    }
  }
}

 

ListenThreadConsumer 封装对外调用方法,启动两个线程,配置超时时间。
/**
 * Created by liweigao on 2017/4/25.
 */
public class ListenThreadConsumer {

  /**
   * 过期时间
   */
  private Integer timeout;

  /**
   * 任务
   */
  private Runnable runnable;

  /**
   * 检测间隔时间 默认1000
   */
//  private Integer spacetime;
  public ListenThreadConsumer(Integer timeout, Runnable runnable) {
    this.timeout = timeout;
    this.runnable = runnable;
  }

  public void execute() {

    ProtectThread protectThread = new ProtectThread(timeout, "");
    WorkerThread workerThread = new WorkerThread(runnable, "");
    protectThread.callback = new BaseStopUtil(workerThread);
    workerThread.callback = new BaseStopUtil(protectThread);
    protectThread.start();
    workerThread.start();
  }
}
 那么就可以很简单的运用到自己的程序中了。main方法测试。
  public static void main(String[] args) {
    ListenThreadConsumer listenThreadConsumer = new ListenThreadConsumer(100, new Runnable() {
      @Override
      public void run() {
        System.out.println("这是我的测试……………………");
      }
    });
    //执行任务以及监控
    listenThreadConsumer.execute();
  }
打印日志:
这是我的测试…………………… 
2017-04-28 18:56:50.916 [Thread-1] DEBUG c.w.s.c.util.thread.WorkerThread-[34] - 线程:worker thread执行完毕,开始执行回调……耗时:1ms 
2017-04-28 18:56:50.922 [Thread-1] DEBUG c.w.s.c.util.thread.BaseStopUtil-[23] - 线程:protect thread被停掉…… 

以上完毕,可直接copy复用。以上信息欢迎大神吐槽,欢迎提建议。互相监控,其中哪一个线程有问题,都会被停掉。

 

分享到:
评论

相关推荐

    Java线程超时监控

    本文将深入探讨Java中如何实现单个线程的执行超时监控。 首先,我们可以使用`java.util.concurrent`包中的`Future`和`ExecutorService`来实现线程超时。`ExecutorService`是一个接口,它提供了管理和控制线程池的...

    java多线程实现大批量数据导入源码

    本项目以"java多线程实现大批量数据导入源码"为题,旨在通过多线程策略将大量数据切分,并进行并行处理,以提高数据处理速度。 首先,我们需要理解Java中的线程机制。Java通过`Thread`类来创建和管理线程。每个线程...

    java多线程实现动画功能

    java一些简单的多线程用法,适合初学者

    java通过线程控制程序执行超时(新)

    在Java编程中,控制程序执行超时是一项重要的任务,特别是在多线程环境下,我们可能需要确保某个任务不会无限制地运行下去,导致资源耗尽。本文将深入探讨如何使用Java的线程机制来实现程序执行的超时控制,同时也会...

    java多线程处理数据库数据

    本主题将深入探讨如何使用Java的并发包(java.util.concurrent)来实现多线程对数据库数据的批量处理,包括增、删、改等操作。 首先,我们需要了解Java中的线程基础。线程是程序执行的最小单位,一个进程可以包含多...

    java多线程读取文件

    Java多线程读大文件 java多线程写文件:多线程往队列中写入数据

    Java多线程Executors批量执行数据实现限流

    Java多线程实现数据切割批量执行,实现限流操作。 java线程池Executors实现数据批量操作。 批量异步Executors处理数据,实现限流操作,QPS限流。 线程池调用第三方接口限流实现逻辑。 案例适合: 1.批量处理大数据。...

    java实现多线程文件传输

    多线程允许我们同时处理多个任务,这对于大文件传输或需要并行处理的场景尤其有用。本篇文章将深入探讨如何使用Java实现多线程文件传输,并涵盖以下几个关键知识点: 1. **线程基础**:在Java中,线程是程序执行的...

    java 多线程操作数据库

    在当今高度并发的应用环境中,Java多线程技术被广泛应用于处理数据库操作,以提升系统的响应速度和处理能力。本文将基于一个具体的Java多线程操作数据库的应用程序,深入探讨其背后的原理、实现细节以及潜在的挑战。...

    java多线程导出excel(千万级别)优化

    Java多线程导出Excel是处理大数据量时的一种高效策略,尤其在面对千万级别的数据时。传统的Apache POI库在处理大规模数据时可能会遇到栈溢出(StackOverflowError)和内存溢出(OutOfMemoryError)等问题,因为这些...

    java多线程进度条实例

    本实例将探讨如何利用Java实现一个具有进度条显示功能的多线程应用。进度条通常用于可视化地表示某个任务的完成程度,这对于长时间运行的操作如文件下载、上传或大型计算来说非常有用。 首先,我们要理解Java中的...

    java多线程经典案例

    在Java中,实现多线程有两种主要方式:通过实现Runnable接口或者继承Thread类。本案例将深入探讨Java多线程中的关键知识点,包括线程同步、线程通信和线程阻塞。 线程同步是为了防止多个线程同时访问共享资源,导致...

    JAVA多线程实现数据库之间的数据互导、连接池、及多表插入数据库功能

    在Java编程中,多线程技术是实现高效并发处理的关键,尤其在大数据量处理和分布式系统中。本主题聚焦于如何利用多线程实现在不同数据库间的数据互导,以及结合连接池技术来优化数据库操作,并实现多表插入功能。我们...

    java多线程查询数据库

    在Java编程中,多线程查询数据库是一种常见的优化策略,特别是在处理大数据量或者需要并行执行多个查询时。本文将详细探讨如何利用Java的多线程技术和线程池来实现并发查询数据库,以及相关的文件`BatchDataUtil....

    java通过线程控制程序执行超时

    在Java编程中,控制程序执行超时是一项关键任务,特别是在多线程环境下,我们需要确保某个任务不会无限期地运行,导致资源浪费或者阻塞其他重要任务。本篇将深入探讨如何利用Java的线程和定时器(Timer)来实现这个...

    java 多线程并发实例

    在Java编程中,多线程并发是提升程序执行效率、充分利用多核处理器资源的重要手段。本文将基于"java 多线程并发实例"这个主题,深入探讨Java中的多线程并发概念及其应用。 首先,我们要了解Java中的线程。线程是...

    java多线程加队列上传文件_后台处理

    本文将详细介绍一个基于Java实现的多线程文件上传系统,并结合队列管理技术来优化后台处理流程。该系统通过创建多个线程来并行处理客户端的文件上传请求,同时利用队列结构来协调任务的调度与执行。 #### 关键技术...

    java多线程分页查询

    #### 二、Java多线程分页查询原理及实现 ##### 1. 分页查询基础概念 分页查询是指在查询数据时,将数据分成多个页面展示,而不是一次性返回所有数据。这种方式能够有效地减少单次查询的数据量,从而提高查询速度和...

    java客户端从MQ队列接收消息的三种方法

    本篇文章将详细介绍三种不同的方法,帮助Java客户端从MQ队列接收消息。 1. **IBM WebSphere MQ JMS API** IBM的WebSphere MQ提供了一套Java Message Service (JMS) API,允许Java应用程序与MQ队列进行通信。首先,...

    Java多线程练习题

    在Java中,多线程的实现主要通过两种方式:继承Thread类和实现Runnable接口。理解并掌握多线程的使用对于任何Java开发者来说都至关重要。 一、线程的创建与启动 1. 继承Thread类:创建一个新的类,该类继承自Thread...

Global site tag (gtag.js) - Google Analytics