`

多线程处理任务

 
阅读更多

业务需求是这样:接受大量性能数据,要求多线程处理性能数据,且在任一时刻同种性能数据只能有一条在处理。

这里有5个类:

ProcessScheduler:入口,用于接受性能数据,并将每条性能数据加到队列中处理

ActionExecutor:线程池包装类

ActionQueue:任务队列类,用于保存同种性能任务,保证线程安全及,队列中只有一条任务在一个时刻 处理

ProcessAction:任务类,每条性能任务包装成一个任务,且对数据处理的业务逻辑在此类中

ActionCommand:command类,实现Runnable接口,包装任务类,用于线程池处理

 

以下代码以最简洁方式呈现

 

public final class ActionQueue
{
//队列内部实现,其实用queue也行
    private final List<ProcessAction> runQueue = new ArrayList<ProcessAction>();
//锁对象,保证同一时刻只能存或取
    private final Object lockQueue = new Object();
//true代表这个队列中的同种任务有一个在处理当中
    public boolean IS_RUNNING = false;

    public ActionQueue(ProcessAction action)
    {
        addProcessAction(action);
    }
    public void addProcessAction(ProcessAction action)
    {
        synchronized (lockQueue)
        {
            runQueue.add(action);
        }
    }
    //  若这时没有这种任务在处理,则从actionQueue中获得一个可执行action,否则返回null
    public ProcessAction borrowOneCanExecuteAction()
    {
        synchronized (lockQueue)
        {
            if (!IS_RUNNING && runQueue.size()>0)
            {
                ProcessAction action = runQueue.get(0);
                runQueue.remove(0);
                IS_RUNNING = true;
                return action;
            }
            return null;
        }
    }

}

 

public final class ProcessAction
{
//任务要处理的数据
    private RawData data;
//同种数据的标识
    private String location;
    public ProcessAction(RawData data)
    {
        this.data = data;
        this.location = data.getLocation();
    }

    public void execute()
    {
        //这里是数据处理的业务逻辑
    }

}

 

public final class ActionCommand implements Runnable
{
//同种任务所在队列
    private final ActionQueue queue;
//包装的任务
    private final ProcessAction action;
    public ActionCommand(ProcessAction action, ActionQueue queue)
    {
        this.action = action;
        this.queue = queue;
    }
    public void run()
    {
//处理任务内部逻辑,完成后设置队列IS_RUNNING标识为false,接着调用方法处理此任务的下一个同类型任务
        action.execute();
        queue.IS_RUNNING = false;
        ProcessScheduler.instance().exeActionByLocation(action.getLocation());
    }

}

 

public final class ProcessScheduler
{
//单例
    private static ProcessScheduler inst = new ProcessScheduler();
//线程安全Map,key值是相同任务的标识,value值是存储相同任务的队列
    private final ConcurrentHashMap<String, ActionQueue> actionQueueMap = new ConcurrentHashMap<String, ActionQueue>();
    private final ActionExecutor actionExecutor = new ActionExecutor();
    private boolean bStart = false;
    public static ProcessScheduler instance()
    {
        return inst;
    }
    private ProcessScheduler()
    {
    }
    public void process(List<RawData> datas)
    {
        for (RawData data : datas)
        {
            String location = data.getLocation();
            ProcessAction action = new ProcessAction(data);
            addAction(location, action);
        }
    }

    private void addAction(String location, ProcessAction action)
    {
        if (location == null || action == null)
        {
            return;
        }
        if (!bStart)
        {
            start();
        }
//获取该任务所属队列,将任务加入队列,并处理队列中一条任务
        ActionQueue queueFind = actionQueueMap.putIfAbsent(location,new ActionQueue());
        if(queueFind == null)
        {
            queueFind = actionQueueMap.putIfAbsent(location, new ActionQueue());
        }
        queueFind.addProcessAction(action);
        processQueue(queueFind);
    }

    // 从任务队列仓库中根据location获取队列,并尝试处理队列中一条可处理任务
    public void exeActionByLocation(String location)
    {
        LOGGER.debug("Performs Process ProcessScheduler exeActionBylocation :{}",location);
        ActionQueue queueFind = actionQueueMap.get(location);
        if (queueFind == null)
        {
            return;
        }
        processQueue(queueFind);
    }

    // 尝试从任务队列queue取一条任务 并将其放入线程池处理
    private void processQueue(ActionQueue queue)
    {
        ProcessAction exeAction = queue.borrowOneCanExecuteAction();
        if (exeAction != null)
        {
            actionExecutor.executeAction(exeAction, queue);
        }
    }

   //设置状态bStart为true保证start方法只执行一次 调用actionExecutor的start方法
    private synchronized void start()
    {
        if (bStart)
        {
            return;
        }
        bStart = true;
        this.actionExecutor.start();
    }

}

 

public final class ActionExecutor
{
    private static final int THREAD_KEEPLIVE_MINUTE = 10;
    private int corePoolSize = 10;
    private int maxCurrentNum = 45;
    private final BlockingQueue<ActionCommand> actionList;
    private ThreadPoolExecutor executor = null;
//这个可以不要,因为只有ProcessScheduler的start方法调用这里的start方法,在外面已经保证只调用一次
    private boolean isStart = false;
//也可以不要,ProcessScheduler保证了只有在线程池start之后,才会往里丢任务
    private final Object lockStart = new Object();
    public ActionExecutor()
    {
        this.actionList = new LinkedBlockingQueue<ActionCommand>();
    }
//处理任务类,将其包装为一个command类,并放入线程池中
    public void executeAction(ProcessAction action, ActionQueue queue)
    {
        ActionCommand command = new ActionCommand(action, queue);
        
        synchronized (lockStart) {
                executor.execute(command);
        }
    }

//实例化线程池对象
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public void start()
    {
        synchronized (lockStart) {
            if (isStart) {
                return;
            }
            BlockingQueue blockingQueue = new LinkedBlockingQueue();
            RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy()
            {
                public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
                {
                    ActionExecutor.LOGGER.error("rejected Runnable Execution {}", r);
                }
            };
            this.executor = new ThreadPoolExecutor(corePoolSize, maxCurrentNum,
                    THREAD_KEEPLIVE_MINUTE, TimeUnit.MINUTES,
                    blockingQueue, handler);
            this.executor.setThreadFactory(new SchedulerThreadFactory());

            this.isStart = true;
        }
    }

    // 静态内部类用于包装新建线程,主要功能是设置线程名称、优先级以及将新建线程设为非守护线程
    static class SchedulerThreadFactory implements ThreadFactory
    {
        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);

        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        public SchedulerThreadFactory()
        {
            this.namePrefix = "PerformanceProcessPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r)
        {
            Thread t = new Thread(r, this.namePrefix + this.threadNumber.getAndIncrement());
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            return t;
        }

    }
}

 这就是对这个业务需求的处理,我已经删减过了

 

 

分享到:
评论

相关推荐

    详解SpringBoot 多线程处理任务 无法@Autowired注入bean问题解决

    SpringBoot 多线程处理任务中无法@Autowired注入bean问题解决 在 SpringBoot 应用程序中,多线程处理任务是一个常见的场景,但是,在多线程处理任务中无法使用 @Autowired 注入 bean,这是一个常见的问题。今天,...

    基于任务线程处理例子

    在Java编程中,任务线程处理是一种常见的并发编程模型,它允许我们利用多核处理器的优势,提高程序的执行效率。本示例将深入探讨如何在Java中实现基于任务的线程处理,包括线程的基本概念、任务调度以及队列的应用。...

    详解Spring-Boot中如何使用多线程处理任务

    Spring Boot 中的多线程处理任务 在 Spring Boot 项目中,使用多线程处理任务是非常重要的,特别是在监控项目中,每个被监控的业务都需要单独运行在一个线程中,具有自己的配置参数。那么,如何在 Spring Boot 项目...

    Spring-Boot中如何使用多线程处理任务方法

    "Spring Boot 中多线程处理任务方法详解" 在 Spring Boot 项目中,使用多线程处理任务是一种常见的需求,尤其是在需要并发处理多个任务的情况下。那么,如何使用多线程处理任务方法呢?下面我们将详细介绍 Spring ...

    Java多线程处理任务的封装

    近公司项目很多地方使用多线程处理一些任务,逻辑代码和java多线程处理代码混合在一起,造成...  * 测试多线程处理任务  * className: TaskMulThreadServiceTest  *  * @version 1.0  * Date Time: a  *

    多线程处理工具(高效、万能的复用)

    这个工具的设计理念是“万能复用”,这意味着它可以适应各种多线程处理任务。无论是在数据处理、网络请求、文件读写还是其他计算密集型任务中,它都能够提供稳定的性能支持。它的复用性主要体现在以下几个方面: 1....

    Qt 之多线程处理多任务

    在Qt框架中,多线程处理多任务是提高应用程序性能和响应能力的重要手段。Qt提供了丰富的多线程支持,使得开发者能够充分利用现代计算机的多核处理器资源,避免UI线程因执行耗时操作而导致的界面假死问题。本文将深入...

    易语言源码多线程多任务下载软件.rar

    本文将深入探讨易语言编程环境、多线程技术和多任务处理在软件开发中的应用。 首先,易语言是中国本土开发的一种面向对象的编程语言,以其“易学易用”的特点受到初学者和专业开发者喜爱。它采用中文语法,降低了...

    多线程多任务下载软件.zip易语言项目例子源码下载

    6. **错误处理和异常恢复**:在多线程多任务下载中,错误处理和异常恢复是必不可少的。例如,某个线程在下载过程中出现问题,软件应能自动检测到错误并尝试重新启动该线程或整个任务。易语言提供了异常处理机制,...

    解决SpringBoot项目使用多线程处理任务时无法通过@Autowired注入bean问题

    "解决SpringBoot项目使用多线程处理任务时无法通过@Autowired注入bean问题" Spring Boot项目在使用多线程处理任务时,经常会遇到无法通过@Autowired注入bean的问题。本文将介绍解决该问题的方法,并详细解释原因和...

    java多线程处理数据库数据

    在Java编程中,多线程处理是提升程序性能和效率的重要手段,特别是在处理大量数据库数据时。本主题将深入探讨如何使用Java的并发包(java.util.concurrent)来实现多线程对数据库数据的批量处理,包括增、删、改等...

    通过多线程任务处理大批量耗时业务并返回结果

    本示例着重讨论如何利用多线程处理大批量耗时业务并确保能够获取每个任务的结果。标题中的"通过多线程任务处理大批量耗时业务并返回结果"指的是在Java或其他支持多线程的编程语言中,如何有效地分配工作到多个线程,...

    相机采集、处理、显示多线程处理框架

    相机采集、处理、显示多线程处理是指使用多个线程来同时执行相机数据采集、处理和显示的操作。其中采集线程负责从相机中获取图像数据,处理线程负责对采集到的图像数据进行处理和算法分析,显示线程负责将处理后的...

    winform 多线程 多任务管理

    在本文中,我们将深入探讨如何在Winform应用程序中实现多线程和多任务管理,这对于提高应用程序的性能和响应性至关重要,特别是处理耗时操作时。 首先,了解多线程的概念是必要的。在单线程应用程序中,所有任务都...

    C# 服务 多线程 XMl读写

    综上所述,这个压缩包中的内容可能包含一个使用C#编写的后台服务,该服务利用多线程处理任务,并可能涉及到XML文件的读写操作。服务可能是为了执行周期性任务,如数据采集、日志记录或监控,而多线程技术确保了服务...

    java多线程处理大数据

    java多线程处理大数据,可根据配置的线程数,任务去调度处理

    完整版多线程多任务下载软件.e.rar

    《多线程多任务下载软件.e.rar》是一个压缩文件,其中包含了实现多线程多任务下载功能的软件。这种技术在互联网下载领域中扮演着重要角色,极大地提升了用户下载大文件或多个文件时的效率。下面我们将深入探讨多线程...

    IOS-多线程多任务下载

    尤其是在处理大文件下载时,传统的单线程下载方式可能会导致用户界面阻塞,无法进行其他操作,而多线程多任务下载则可以很好地解决这个问题。本文将详细讲解iOS中实现多线程多任务下载的核心知识点。 首先,我们...

    易语言多线程执行任务例程

    在这个“易语言多线程执行任务例程”中,我们将深入探讨如何在易语言中利用多线程技术来提升程序的运行效率和并发能力。 多线程是现代计算机编程中的一个重要概念,它允许一个程序同时执行多个任务,从而充分利用...

    C#多线程多任务管理模型.zip

    在C#编程中,多线程和多任务管理是一个关键的概念,它允许程序同时执行多个独立的任务,提高程序的响应速度和效率。本压缩包文件"**C#多线程多任务管理模型**"提供了相关的代码示例和项目资源,帮助开发者深入理解和...

Global site tag (gtag.js) - Google Analytics