`
talentluke
  • 浏览: 604568 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

海量数据处理系列之(一)Java线程池使用

 
阅读更多

前言:最近在做分布式海量数据处理项目,使用到了java的线程池,所以搜集了一些资料对它的使用做了一下总结和探究,

前面介绍的东西大多都是从网上搜集整理而来。文中最核心的东西在于后面两节无界队列线程池和有界队列线程池的实例

使用以及线上问题处理方案。                                

 

1.  为什么要用线程池?

      在Java中,如果每当一个请求到达就创建一个新线程,开销是相当大的。在实际使用中,每个请求创建新线程的服务器

在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在实际处理实际的用户请求的时间和资源要多的多。除

了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个JVM中创建太多的线程,可能会导致系统由于

过度消耗内存或者“切换过度”而导致系统资源不足。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻

处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象

来进行服务,这就是“池化资源”技术产生的原因。

     线程池主要用来解决线程生命周期开销问题和资源不足问题,通过对多个任务重用线程,线程创建的开销被分摊到多个任

务上了,而且由于在请求到达时线程已经存在,所以消除了创建所带来的延迟。这样,就可以立即请求服务,使应用程序响

应更快。另外,通过适当的调整线程池中的线程数据可以防止出现资源不足的情况。

      网上找来的这段话,清晰的描述了为什么要使用线程池,使用线程池有哪些好处。工程项目中使用线程池的场景比比皆是。

本文关注的重点是如何在实战中来使用好线程池这一技术,来满足海量数据大并发用户请求的场景。

 

2. ThreadPoolExecutor类

       Java中的线程池技术主要用的是ThreadPoolExecutor 这个类。先来看这个类的构造函数,

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

    corePoolSize       线程池维护线程的最少数量

    maximumPoolSize    线程池维护线程的最大数量 

    keepAliveTime      线程池维护线程所允许的空闲时间  

    workQueue          任务队列,用来存放我们所定义的任务处理线程

    threadFactory      线程创建工厂

    handler            线程池对拒绝任务的处理策略

     ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize 设置的边界自动调整池大小。当新任务在方法

execute(Runnable) 中提交时, 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是

空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。 如果设置的

corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。

     ThreadPoolExecutor是Executors类的实现Executors类里面提供了一些静态工厂,生成一些常用的线程池,主

要有以下几个:

     newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行

所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任

务的提交顺序执行。  

     newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线

程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

     newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分

空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池

大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

      在实际的项目中,我们会使用得到比较多的是newFixedThreadPool,创建固定大小的线程池,但是这个方法在真实的线上

环境中还是会有很多问题,这个将会在下面一节中详细讲到。

      当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler接

口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。

      1)CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

             if (!e.isShutdown()) {

                 r.run();

            }

        }

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。

     2)AbortPolicy处理程序遭到拒绝将抛出运行时 RejectedExecutionException

         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

              throw new RejectedExecutionException();

        }

 这种策略直接抛出异常,丢弃任务。

      3)DiscardPolicy不能执行的任务将被删除

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}

   这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

     4)DiscardOldestPolicy如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,

则重复此过程)

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                e.getQueue().poll();

                e.execute(r);

            }

        }

      该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略

需要适当小心。

 

3.  ThreadPoolExecutor无界队列使用

   public class ThreadPool {

        private final static String poolName = "mypool";

        static private ThreadPool threadFixedPool = new ThreadPool(2);

       private ExecutorService executor;

      static public ThreadPool getFixedInstance() {

           return threadFixedPool;

       }

    private ThreadPool(int num) {

           executor = Executors.newFixedThreadPool(num, new DaemonThreadFactory(poolName));

}

public void execute(Runnable r) {

           executor.execute(r);

}

public static void main(String[] params) {

           class MyRunnable implements Runnable {

                    public void run() {

                             System.out.println("OK!");

                             try {

                                       Thread.sleep(10);

                             } catch (InterruptedException e) {

                                       e.printStackTrace();

                             }

                    }

           }

           for (int i = 0; i < 10; i++) {

             ThreadPool.getFixedInstance().execute(new MyRunnable());

           }

           try {

                    Thread.sleep(2000);

                    System.out.println("Process end.");

           } catch (InterruptedException e) {

                    e.printStackTrace();

           }

}

}

       在这段代码中,我们发现我们用到了Executors.newFixedThreadPool()函数,这个函数的实现是这样子的:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); 

       它实际上是创建了一个无界队列的固定大小的线程池。执行这段代码,我们发现所有的任务都正常处理了。但是在真实的线上环

境中会存在这样的一个问题,前端的用户请求源源不断的过来,后端的处理线程如果处理时间变长,无法快速的将用户请求处理

完返回结果给前端,那么任务队列中将堵塞大量的请求。这些请求在前端都是有超时时间设置的,假设请求是通过套接字过来,

当我们的后端处理进程处理完一个请求后,从队列中拿下一个任务,发现这个任务的套接字已经无效了,这是因为在用户端已经

超时,将套接字建立的连接关闭了。这样一来我们这边的处理程序再去读取套接字时,就会发生I/0 Exception. 恶性循环,导致我

们所有的处理服务线程读的都是超时的套接字,所有的请求过来都抛I/O异常,这样等于我们整个系统都挂掉了,已经无法对外提供

正常的服务了。

     对于海量数据的处理,现在业界都是采用集群系统来进行处理,当请求的数量不断加大的时候,我们可以通过增加处理节点,反正现

在硬件设备相对便宜。但是要保证系统的可靠性和稳定性,在程序方面我们还是可以进一步的优化的,我们下一节要讲述的就是针对

线上出现的这类问题的一种处理策略。

 

4.   ThreadPoolExecutor有界队列使用

public class ThreadPool {

         private final static String poolName = "mypool";

         static private ThreadPool threadFixedPool = null;

         public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

         private ExecutorService executor;

 

         static public ThreadPool getFixedInstance() {

                   return threadFixedPool;

         }

         private ThreadPool(int num) {

                   executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory

(poolName), new ThreadPoolExecutor.AbortPolicy());

         }

         public void execute(Runnable r) {

                   executor.execute(r);

         }

        

         public static void main(String[] params) {

                   class MyRunnable implements Runnable {

                            public void run() {

                                     System.out.println("OK!");

                                     try {

                                               Thread.sleep(10);

                                     } catch (InterruptedException e) {

                                               e.printStackTrace();

                                     }

                            }

                   }

                   int count = 0;

                   for (int i = 0; i < 10; i++) {

                            try {

                                     ThreadPool.getFixedInstance().execute(new MyRunnable());

                            } catch (RejectedExecutionException e) {

                                     e.printStackTrace();

                                     count++;

                            }

                   }

                   try {

                            log.info("queue size:" + ThreadPool.getFixedInstance().queue.size());

                            Thread.sleep(2000);

                   } catch (InterruptedException e) {

                            e.printStackTrace();

                   }

                   System.out.println("Reject task: " + count);

         }

}

       首先我们来看下这段代码几个重要的参数,corePoolSize 为2,maximumPoolSize为4,任务队列大小为2,每个任务平

均处理时间为10ms,一共有10个并发任务。

      执行这段代码,我们会发现,有4个任务失败了。这里就验证了我们在上面提到有界队列时候线程池的执行顺序。当新任务在

方法 execute(Runnable) 中提交时, 如果运行的线程少于 corePoolSize,则创建新线程来处理请求。 如果运行的线程多于

corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程,如果此时线程数量达到maximumPoolSize,并且队

列已经满,就会拒绝继续进来的请求。

    现在我们调整一下代码中的几个参数,将并发任务数改为200,执行结果Reject task: 182,说明有18个任务成功了,线程处理

完一个请求后会接着去处理下一个过来的请求。在真实的线上环境中,会源源不断的有新的请求过来,当前的被拒绝了,但只要线

程池线程把当下的任务处理完之后还是可以处理下一个发送过来的请求。

     通过有界队列可以实现系统的过载保护,在高压的情况下,我们的系统处理能力不会变为0,还能正常对外进行服务,虽然有些服

务可能会被拒绝,至于如何减少被拒绝的数量以及对拒绝的请求采取何种处理策略我将会在下一篇文章《系统的过载保护》中继续

阐述。

 

参考文献:

  1. ThreadPoolExecutor使用与思考(上)-线程池大小设置与BlockedQueue的三种实现区别 http://dongxuan.iteye.com/blog/901689
  2. ThreadPoolExecutor使用与思考(中)-keepAliveTime及拒绝策略http://dongxuan.iteye.com/blog/902571
  3. ThreadPoolExecutor源代码
  4. Java线程池介绍以及简单实例 http://wenku.baidu.com/view/e4543a7a5acfa1c7aa00cc25.html

摘自http://www.cnblogs.com/cstar/archive/2012/06/14/2549494.html

分享到:
评论

相关推荐

    通过EasyExcel+线程池实现百万级数据从Excel导入到数据库

    在处理大量数据导入数据库的场景中,使用阿里巴巴开源的EasyExcel库结合线程池技术可以有效地实现从Excel文件导入到数据库的过程...这种方式可以提高数据处理的效率,减少内存占用,并且能够更好地利用多核CPU的优势。

    java多线程导出excel(千万级别)优化

    Java多线程导出Excel是处理大数据量时的一种高效策略,尤其在面对千万级别的数据时。传统的Apache POI库在处理大规模数据时可能会遇到栈溢出(StackOverflowError)和内存溢出(OutOfMemoryError)等问题,因为这些...

    多线程以JDBC的方式返回海量数据

    在处理海量数据时,我们可以通过创建多个线程,每个线程负责获取和处理一部分数据,而不是一次性加载所有数据。 具体实现上,我们可以设计一个线程池来管理这些工作线程。线程池可以有效地管理和控制线程,避免因...

    java实现csv导出千万级数据实例

    CSV(Comma Separated Values)格式因其简单、通用性而广泛用于数据交换,但在处理海量数据时,传统的导出方法可能面临效率和内存管理的难题。 在Java中,Apache POI库通常用于操作Microsoft Office文档,包括Excel...

    高并发、分布式、高可用、微服务、海量数据处理知识

    在IT行业中,高并发、分布式、高可用...总的来说,理解并掌握高并发、分布式、高可用、微服务和海量数据处理的知识,对于Java开发人员来说至关重要,这将帮助他们构建出适应互联网时代需求的高效、稳定、可扩展的系统。

    互联网 Java 工程师进阶知识完全扫盲:涵盖高并发、分布式、高可用、微服务、海量数据处理等领域知识

    本教程旨在为Java工程师提供全面的进阶知识,帮助他们提升在高并发、分布式、高可用、微服务以及海量数据处理等关键领域的专业技能。 1. **高并发处理**: 高并发是互联网应用的常态,Java工程师需要理解线程池的...

    java csv大数据量导出(千万级别,不会内存溢出)

    在Java开发中,处理大数据量的数据导出是一个常见的挑战,特别是在CSV格式的文件处理上。CSV(Comma Separated Values)是一种广泛使用的数据交换格式,因其简单性和通用性而受到青睐。然而,当数据量达到千万级别时...

    Java 实现当当泄露数据快速检索

    这个任务的关键在于优化数据处理速度,以便在海量信息中迅速找到目标数据。以下是实现这一目标的一些关键知识点: 1. **数据结构与算法**:高效的检索往往依赖于合适的数据结构。例如,B树、B+树或哈希表可以用于...

    Java多线程优化百万级数据

    在处理海量数据时,如果单线程串行执行,可能会造成CPU利用率低下,程序响应时间过长。 优化策略通常包括以下几个方面: 1. **任务拆分**:将大规模任务分解为若干个小任务,每个任务由一个独立的线程执行。例如,...

    JAVA面试整理,吐血整理

    5. 海量数据处理: - 在处理海量数据时,了解高效的数据结构和算法,掌握分布式计算框架如Hadoop、Spark等的使用,对于提升处理能力和效率至关重要。 6. 并发与多线程: - Java多线程编程知识包括线程创建、线程...

    Java项目开发实用案例_第4章代码_高效海量访问系统

    这个章节的代码示例旨在教会开发者如何构建一个能够处理大量并发请求的系统,这对于现代互联网应用来说至关重要。明日科技出版的这本书籍,通过实际的代码实践,深入浅出地讲解了Java在高并发场景下的解决方案。 ...

    网络机器人java编程指南

    Java的JDBC API可以连接各种数据库,或者你可以使用NoSQL数据库如MongoDB,它们更适合处理大规模非结构化数据。 6. **延迟加载和JavaScript执行**:现代网页大量使用JavaScript进行动态加载,这就需要使用如...

    互联网 Java 工程师进阶知识完全扫盲

    "互联网 Java 工程师进阶知识完全扫盲"是一个全面的学习资源,旨在帮助Java开发者提升技能,掌握在高并发、分布式、高可用、微服务以及海量数据处理等关键领域的专业知识。 首先,我们要讨论的是高并发处理。在...

    Java基础+Android面试题

    21. Android海量数据处理算法:介绍了散列查找、排序、海量数据处理等算法。 22. 剑指offer:涵盖了常见的算法和数据结构面试题及其解答。 通过这些知识点的覆盖,我们可以看出文档所涉猎的是希望对Java和Android...

    easyExcel实现大数据导出

    在Java开发中,数据导出是一项常见的任务,特别是在大数据处理中。阿里巴巴的开源库`easyExcel`为此提供了高效且易用的解决方案。`easyExcel`是专门为处理大量数据设计的一个轻量级工具,它能够有效地解决内存溢出...

    实战Java高并发程序设计(高清版)

    在互联网服务中,高并发通常与海量数据、分布式系统和微服务架构紧密相关。Java作为一种多平台、高性能的编程语言,是实现高并发程序的理想选择。 书中的内容可能涵盖了以下几个核心知识点: 1. **线程基础**:...

    多线程搜索引擎java实现源代码

    在IT领域,搜索引擎是至关重要的技术之一,它使得海量数据的快速检索成为可能。本项目以"多线程搜索引擎java实现源代码"为标题,旨在介绍如何使用Java编程语言构建一个具备多线程特性的搜索引擎。这个搜索引擎可以...

    elecfans.com-Java网络编程与分布式计算

    1. Hadoop:开源的大数据处理框架,基于HDFS和MapReduce,处理海量数据。 2. Spark:快速、通用且可扩展的大数据处理框架,支持批处理、交互式查询和实时流处理。 3. Dubbo:阿里巴巴开源的分布式服务框架,用于实现...

    java程序员需要掌握的知识点

    - **Redis**与**MongoDB**:两种非常流行的非关系型数据库,分别代表键值存储和文档存储类型,常用于缓存和大数据处理场景。 #### 4. 数据库知识 良好的数据库设计和管理能力是Java程序员不可或缺的技能之一。这...

    基于java的开发源码-毕业论文:搜索引擎系统附源代码.zip

    在搜索引擎开发中,Java通常用于构建后端服务,处理大量的数据处理和计算任务。 【压缩包子文件的文件名称列表】中仅有一个条目“codefans.net”,这可能是一个链接或者错误,因为正常情况下,压缩包内的文件名应该...

Global site tag (gtag.js) - Google Analytics