`
y806839048
  • 浏览: 1117401 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

stream聚合parallelStream原理

阅读更多

 

 

背景:ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。这种  “分治”  思想值得学习。

 

 

parallelStream  就是把后续的流操作一个对象一个线程去处理,后续的操作还是一样,线程的来源是forkjoinpool 其线程的数量来源于机器cpu的大小---工作在本地jvm执行

 

任务分小,但是执行任务的线程默认是cpu个数(1,拆分任务,2,固定线程池中执行)

 

threadpoolexcutor不能实现线程执行的等待合并,不能选择优先执行子任务,除非用futureTask,forkjoinpool用他自己的task就可以实现线程的结果合并,并且线程会主动切换

到未阻塞的任务(至于子任务线程的拆分由自定义的task通过递归实现)

 

forkjoinpool适用于拆分任务,更适用于有父子依赖关系的任务

 

分析与使用:

Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。

使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。

其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。

 

 

 

 

 

 

 

下面的UML类图显示了ForkJoinPool、ForkJoinTask之间的关系:

 

 

 

 

    

 

 

 

实战:

 

 

   例1:

                   有 返 回 值

 

import java.time.Duration;

import java.time.Instant;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.ForkJoinTask;

import java.util.concurrent.RecursiveTask;

import java.util.stream.LongStream;

 

import org.junit.Test;

 

public class TestForkJoinPool {

 

public static void main(String[] args) {

Instant start = Instant.now();

 

ForkJoinPool pool = new ForkJoinPool();

 

ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L);

 

Long sum = pool.invoke(task);

 

System.out.println(sum);

 

Instant end = Instant.now();

 

System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//166-1996-10590

}

 

@Test

public void test1(){

Instant start = Instant.now();

 

long sum = 0L;

 

for (long i = 0L; i <= 50000000000L; i++) {

sum += i;

}

 

System.out.println(sum);

 

Instant end = Instant.now();

 

System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//35-3142-15704

}

 

 

}

 

class ForkJoinSumCalculate extends RecursiveTask<Long>{

 

/**

*/

private static final long serialVersionUID = -259195479995561737L;

 

private long start;

private long end;

 

private static final long THURSHOLD = 10000L;  //临界值

 

public ForkJoinSumCalculate(long start, long end) {

this.start = start;

this.end = end;

}

 

@Override

protected Long compute() {

long length = end - start;

 

if(length <= THURSHOLD){

long sum = 0L;

 

for (long i = start; i <= end; i++) {

sum += i;

}

 

return sum;

}else{

 

long middle = (start + end) / 2;

 

ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle); //通过递归实现

left.fork(); //进行拆分,同时压入线程队列

 

ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);

right.fork(); //

 

return left.join() + right.join();

}

}

 

}

   根据结果可得,在数量超过一定的时候效率比普通的for循环明显,但是数较少的时候,优势体现不明显。

 

 

 

例2:

     无 返 回 值

 

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.RecursiveAction;

import java.util.concurrent.TimeUnit;

 

public class ForkJoinPoolAction {

    

    public static void main(String[] args) throws Exception{

        PrintTask task = new PrintTask(0, 300);

        //创建实例,并执行分割任务

        ForkJoinPool pool = new ForkJoinPool();

        pool.submit(task);

         //线程阻塞,等待所有任务完成

        pool.awaitTermination(2, TimeUnit.SECONDS);

        pool.shutdown();

    }

}

 

 

class PrintTask extends RecursiveAction{

    private static final int THRESHOLD = 50; //最多只能打印50个数

    private int start;

    private int end;

    

    

 

    public PrintTask(int start, int end) {

        super();

        this.start = start;

        this.end = end;

    }

 

 

 

    @Override

    protected void compute() {

        

        if(end - start < THRESHOLD){

            for(int i=start;i<end;i++){

                System.out.println(Thread.currentThread().getName()+"的i值:"+i);

            }

        }else {

            int middle =(start+end)/2;

            PrintTask left = new PrintTask(start, middle);

            PrintTask right = new PrintTask(middle, end);

            //并行执行两个“小任务”

            left.fork();

            right.fork();

        }

        

    }

    

}

 

 

总结:

 

 

在Java 7中引入了一种新的线程池:ForkJoinPool。

     它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

 

 ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。

这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。

       比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。

 

       那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

 

      所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

 

     以上程序的关键是fork()和join()方法。在ForkJoinPool使用的线程中,会使用一个内部队列来对需要执行的任务以及子任务进行操作来保证它们的执行顺序。

 

 

 

那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

 

    首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

 

ps:ForkJoinPool在执行过程中,会创建大量的子任务,导致GC进行垃圾回收,这些是需要注意的。

 

 

 

 

原文:https://blog.csdn.net/jsutdoit/article/details/81114144 

 https://www.jianshu.com/p/2472f53d6e2f

  • 大小: 137.5 KB
  • 大小: 47 KB
  • 大小: 228.4 KB
分享到:
评论

相关推荐

    关于Java8 parallelStream并发安全的深入讲解

    首先,让我们深入了解一下 `parallelStream` 的工作原理。`parallelStream` 基于 Java 8 的 Fork/Join 框架,该框架将大任务分解为小任务,并在多个处理器核心上并行执行这些小任务。然而,尽管这种并行化提高了执行...

    java8Stream方法简介-源码.rar

    在源码分析中,我们可以看到Stream API如何与集合类交互,了解其内部实现原理,如并行流(Parallel Stream)如何利用多核CPU提高效率,以及如何通过`Spliterator`接口自定义数据源。 为了更好地理解和应用Stream ...

    Java stream的延迟计算.pdf

    Java Stream 是Java 8引入的一个强大特性,它允许程序员以声明式的方式处理数据集合,尤其在进行数据过滤、映射和归约等操作...在实际编程中,理解Stream的工作原理和优化策略,能够帮助我们写出更加高效、简洁的代码。

    stream-api-example:流 API 示例

    通过这个示例项目,开发者可以深入理解 Stream API 的工作原理,并学习如何在实际项目中有效地使用它,优化代码并提高程序性能。同时,性能测试部分提供了衡量 Stream API 效率的方法,有助于我们在编写高性能代码时...

    stram流对象常见操作

    Optional&lt;Integer&gt; findAny = list.parallelStream().filter(x -&gt; x &gt; 6).findAny(); ``` - `anyMatch`: 检查是否至少有一个元素满足条件。 ```java boolean anyMatch = list.stream().anyMatch(x -&gt; x &gt; 6); ...

    javastream源码-sample-java-playground:用于测试Java特性(例如StreamAPI)的示例源代码

    在项目中,你可能会看到如何使用`parallelStream()`来创建并行流,这是一个强大的特性,它能够利用多核处理器的优势,通过并行化执行提高处理速度。但需要注意的是,并行流虽然能提高性能,但也可能引入并发问题,...

    javastream源码-kaunasjug3streamapi:关于Java8StreamAPIJava源代码的考纳斯-贾格会议#3演示

    在这个名为"javastream源码-kaunasjug3streamapi"的项目中,我们可以深入理解Stream API的实现原理和使用方法。这个项目是Kaunas-Jug(考纳斯Java用户组)会议的第3场演示,专门讨论了Stream API的Java源代码。 ...

    Apache Doris (Incubating) 原理与实践

    Apache Doris是基于MPP(Massively Parallel Processing)架构的列式存储数据仓库系统,它提供了高速的数据导入和查询能力,适用于大数据分析、商业智能等场景。Doris的主要特点包括: 1. 高性能:通过列式存储、...

    jdk8文档中文版.zip

    通过`.parallelStream()`可以创建并行流,从而提高执行效率。 **6. 新的日期和时间API** Java 8引入了全新的java.time包,提供了更强大且易于使用的日期、时间和时区处理功能。比如`LocalDate`、`LocalTime`、`...

    java8-集合元素归约.pdf

    对于大数据量的集合,使用并行流 (`parallelStream()`) 结合 `reduce()` 可以提高计算效率。并行流会将数据分成多个部分,分别在不同的处理器核心上进行归约,然后使用 `Combiner` 合并结果。这在处理大量数据时能...

    溪流:Subiendoguíastream de nuevo(阿莱明·埃尔·吉特·德拉地毯·普罗伊科托·辛奎尔)

    Java流可以分为三种类型:顺序流(Sequential)、并行流(Parallel)和无序流(unordered),每种都有其特定的使用场景和优势。 在“Streams-master”这个压缩包中,我们可能找到以下内容: 1. **源代码**:可能...

    JDKSourceCode1.8源码

    Stream API支持filter、map、reduce等操作,可以进行聚合、过滤、映射等操作。例如,`List&lt;String&gt; names = Arrays.asList("Alice", "Bob", "Charlie"); names.stream().filter(name -&gt; name.startsWith("A"))....

    java 8 源码 (jdk1.8.0-131.jdk src.zip)

    例如,你可以用一行代码完成数据过滤、转换和聚合的操作。 3. **默认方法**: 在接口中,Java 8引入了默认方法,即在接口中定义带有实现的方法。这使得接口可以有行为,而不只是纯粹的契约。默认方法通过`default`...

    java8-src-sample

    Stream API 提供了一种新的数据处理方式,可以进行序列化操作,包括过滤、映射、聚合等。Stream 支持串行和并行操作,非常适合大数据和并发场景。如 `names.stream().filter(name -&gt; name.startsWith("A")).forEach...

    Java9_Code.zip

    此外,`Stream.parallel()`方法现在能够更好地利用多核处理器,进一步提升并发处理能力。 4. **进程API增强** (`java.lang.ProcessHandle` 和 `java.lang.ProcessBuilder`):Java 9扩展了进程管理API,使得开发者...

    JDK1.8源代码

    Stream API提供了处理集合的新方式,支持序列化计算,如过滤、映射和聚合等操作。它可以方便地进行并行处理,提高程序性能。Stream API与Lambda表达式结合使用,为数据处理提供了强大的工具。 5. **Optional类**:...

    Java 1.8 源码

    Java 1.8 源码是学习Java编程语言核心机制和高级特性的宝贵资源,它揭示了Java平台内部的工作原理。在这个版本中,Java引入了许多重要的改进和新特性,如Lambda表达式、Stream API、默认方法以及对并发处理的优化。...

    java8集合源码分析-Blog:个人博客

    为了保持向后兼容,Java 8在集合接口(如List, Set, Map)中添加了默认方法,如`forEach()`、`stream()`和`parallelStream()`。这些默认方法使得集合可以直接支持函数式编程,而无需修改已有的实现类。 6. **Map...

    添加java源码-java8Source:阅读并在Java8源代码中添加描述

    9. **Parallel Collections**:Java 8的并行流提供了并行处理集合的能力,通过`parallelStream()`方法可以创建并行流,提高大规模数据处理的效率。 通过深入阅读和分析`java8Source-master`项目中的源代码,开发者...

    javajdk8源码-java-source-code:JDK8源码以及注解

    2. **Stream API**:Stream API为集合操作提供了新方法,如filter()、map()和reduce(),可以方便地进行数据流的过滤、转换和聚合操作,支持链式调用,提高了代码的可读性和效率。 3. **Optional类**:Optional类...

Global site tag (gtag.js) - Google Analytics