`

第七章 - 使用并行流

 
阅读更多

流 (Streams) 是一个数据序列 (它不是一个数据结构),它允许你使用串行或并行方式采用一系列的操作来对数据进行过滤,转换,排序,缩减或收集成一个最终的对象。

Streams 有三个不同的组成部分:

  • 数据源 source:数据源提供Stream所需的数据
  • 一个或多个中间操作,中间操作生成另外一个流作为输出
  • 一个终端操作生成一个对象,这个生成的对象可以是一个简单的对象或一个诸如数据,队列或哈希表。也可以有不产生任何结果的终端操作

Streams的数据源

Stream的数据源生成供Stream对象使用的数据。你可以从不同的源来生成数据流。例如,Collection接口在Java 8中提供了 Stream() 方法来生成一个用串行处理的数据流以及 parallelStream() 并行处理的数据流。这允许你使用流来处理大部分的数据结构如:lists (ArrayList, LinkedList 等等), sets (HashSet, EnumSet),或并行数据结构 (LinkedBlockingDeque, PriorityBlockingQueue 等等), Array 类提供了四种版本的 stream() 方法。如果你提供了一个 int 类型的数组给 stream() 方法,它会生成一个 IntStream。这个一个特殊的数据流用来处理整型的数据 (你也可以使用 Stream<Integer>,但是性能相比IntStream 慢很多),同样的还有 LongStream 或 DoubleStream。

通常情况下,如果你传给 stream() 方法一个对象数组,那么你会获得一个相同类型的通用数据流。这种情况下没有 parallelStream() 方法,但是当你获得普通数据流后,你可以调用 parallel() 方法来把串行数据流转化为并行数据流。

 

你还可以使用数据流来处理一个文件或文件夹的内容。 Files 类提供了几个不同的方法使用数据流处理。例如:

  • find() 方法返回的是符合条件的文件的 Path 对象数据流
  • list() 方法返回的是一个文件夹里内容的 Path 对象数据流
  • walk() 方法返回的是一个使用深入优先算法遍历文件树得到的 Path 对象数据流
  • lines():该方法和 readAllLines() 方法不同,它不是把每一行内容读到 List 里,它返回的是包含每行字符串的数据流 (Stream<String>)

Stream 接口提供了 generate() 和 iterate() 方法来生成数据流,具体使用参考文档。

 

流的数据源也可以是以下:

  • String.chars():返回一个 IntStream
  • Random.ints(), Random.doubles() 或 Random.longs():分别返回 IntStream, DoubleStream, 和 LongStream。你也可以传入一个随机数的区间,例如: Random.ints(10, 20)
  • SplittableRandom类:该类提供了类似于Random类的方法来生成随机数,但是更适合于并行处理。
  • Stream.concat():该方法接收两个 stream 作为参数,创建一个新的含有第一 stream 和 第二个 stream 元素的新的 stream

流的中间操作

中间操作最大的特点是它返回另一个流作为输出,输出流里的对象可以和输入流对象是不同的,你可以有一个或多个中间处理。Stream 接口最重要的中间操作有:

  • distinct():返回的流中的数据都是唯一的,重复的都被删除
  • filter():筛选流中的数据
  • flatMap():该方法用于把一个含有数据流的数据流转换成一个单一的数据流,例如一个数据流的每个元素都是一个字符串数组,那么flatMap()方法得到的数据流是指包含数组里的字符串,也就是所说的扁平化。
  • limit():返回一个新的数据流,数据流中的数据是经过删减的,长度等于传入的参数
  • map():这个方法用来把数据流里的数据从一个类型转化为另一个类型
  • peek():该方法返回相同的流,可用来用针对每个流中的数据运行额外的代码
  • skip():该方法忽略参数中指定的头几个数据
  • sorted():该方法对流中的数据进行排列

终端操作

  • collect():该方法收集流中的数据,返回指定的对象
  • count():返回流内的元素个数
  • max():返回流中最大的元素
  • min():返回流中最小的元素
  • reduce():把一个数据流整合处理得到一个单个的对象。
  • forEach() / forEachOrdered():该方法对数据流中的每个数据逐一处理
  • findFirst() / findAny():返回流中的第一个数据 / 返回流中的任意一个数据 (通常返回第一个数据,但是无法保证一定返回第一个)
  • anyMatch() / allMatch() / nonMatch():接收一个 predicate 作为参数并返回一个 Boolean 值代表是否有元素匹配 / 是否流中的所有数据都匹配 / 是否流中没有数据匹配
  • toArray():把流中的数据转化为数组并返回

例子 - 数字汇总应用程序

一个普遍的需求是你需要处理一个大数据来得出一定的结论。例如你有店内商品销售的数据,你可以计算所有产品的总销售数量;每一种产品的销售数量;或者每一个消费者在每种商品上的花费。我们称这种数据处理为数字汇总。

我们的数字汇总应用相对比较简单,它具有以下的组件:

  • Record:该类定义了文件里记录的每一个数据。该类有21个属性以及相应的 get() 和 set() 方法。
  • ConcurrentDataLoader:该类从文件中加载数据并把每条数据转换为 Record 对象。我们即将使用 streams 来读取数据并做相应的转换
  • ConcurrentStatistics:该类实现一些操作允许我们队数据进行处理
  • ConcurrentMain:该类实现了 main() 方法来调用 ConcurrentStatistics 类中的各个方法
// 该类负责从文件中读取数据并转换成Record类
public class ConcurrentDataLoader {
    public static List<Record> load(Path path) throws IOException {
        System.out.println("Loading data");
        
        // 读取文件中的每行数据到list里
        List<String> lines = Files.readAllLines(path);
        List<Record> records = lines
                .parallelStream()  // 创建一个并行流来处理文件的每行数据
                .skip(1) // 忽略文件的第一行
                .map(l -> l.split(";")) // 把流中的每一个数据使用分号分开转换成数组
                .map(t -> new Record(t)) // 把流中的每个数组转换成Record对象
                .collect(Collectors.toList()); // 把流中的数据收集保存在list中
        return records;
    }
}

 

 

// 实现一系列对数据操作的方法
public class ConcurrentStatistics {

    public static void jobDataFromSubscribers(List<Record> records) {
        System.out.println("****************************************");
        System.out.println("Job info for Deposit subscribers");
        ConcurrentMap<String, List<Record>> map =
                records.parallelStream()
                        .filter(r -> r.getSubscribe().equals("yes"))
                        .collect(Collectors.groupingByConcurrent
                                (Record::getJob));
        map.forEach((k, l) -> System.out.println(k + ": " + l.size()));
        System.out.println("****************************************");
    }

    public static void ageDataFromSubscribers(List<Record>
                                                      records) {
        System.out.println("****************************************");
        System.out.println("Age info for Deposit subscribers");
        DoubleSummaryStatistics statistics =
                records.parallelStream()
                        .filter(r -> r.getSubscribe().equals("yes"))
                        .collect(Collectors.summarizingDouble
                                (Record::getAge));
        System.out.println("Min: " + statistics.getMin());
        System.out.println("Max: " + statistics.getMax());
        System.out.println("Average: " + statistics.getAverage());
        System.out.println("****************************************");
    }

    public static void maritalDataFromSubscribers(List<Record> records) {
        System.out.println("****************************************");
        System.out.println("Marital info for Deposit subscribers");
        records.parallelStream()
                .filter(r -> r.getSubscribe().equals("yes"))
                .map(r -> r.getMarital())
                .distinct()
                .sorted()
                .forEachOrdered(System.out::println);
        System.out.println("****************************************");
    }

    public static void campaignDataFromNonSubscribersBad(List<Record> records) {
        System.out.println("****************************************");
        System.out.println("Number of contacts for Non Subscriber");
                IntStream stream = records.parallelStream()
                        .filter(Record::isNotSubscriber)
                        .mapToInt(r -> r.getCampaign());
        System.out.println("Max number of contacts: " +
                        stream.max().getAsInt());
        System.out.println("Min number of contacts: " +
                        stream.min().getAsInt());
        System.out.println("****************************************");
    }

    public static void campaignDataFromNonSubscribersOk(List<Record> records) {
        System.out.println("****************************************");
        System.out.println("Number of contacts for Non Subscriber");
        int value = records.parallelStream()
                .filter(Record::isNotSubscriber)
                .map(r -> r.getCampaign())
                .mapToInt(Integer::intValue)
                .max()
                .getAsInt();
        System.out.println("Max number of contacts: " + value);
        value = records.parallelStream()
                .filter(Record::isNotSubscriber)
                .map(r -> r.getCampaign())
                .mapToInt(Integer::intValue)
                .min()
                .getAsInt();
        System.out.println("Min number of contacts: " + value);
        System.out.println("****************************************");
    }

    // 该方法是实现需要多个数据过滤的一种方法,但不是最优的方法
    // 建议使用方法 multipleFilterDataPredicate() 那样的方式
    public static void multipleFilterData(List<Record> records) {
        System.out.println("****************************************");
        System.out.println("Multiple filter");
        Stream<Record> stream1 = records.parallelStream()
                .filter(Record::isDefaultCredit);
        Stream<Record> stream2 = records.parallelStream()
                .filter(r -> !(r.isHousing()));
        Stream<Record> stream3 = records.parallelStream()
                .filter(r -> !(r.isLoan()));
        Stream<Record> complete = Stream.concat(stream1, stream2);
        complete = Stream.concat(complete, stream3);
        long value = complete
                .parallel()
                .unordered()
                .distinct()
                .count();
        System.out.println("Number of people: " + value);
        System.out.println("****************************************");
    }

    public static void multipleFilterDataPredicate(List<Record>records) {
        System.out.println("****************************************");
        System.out.println("Multiple filter with Predicate");
        Predicate<Record> p1 = r -> r.isDefaultCredit();
        Predicate<Record> p2 = r -> !r.isHousing();
        Predicate<Record> p3 = r -> !r.isLoan();
        Predicate<Record> pred = Stream.of(p1, p2, p3)
                .reduce(Predicate::or).get();
        long value = records.parallelStream().filter(pred).count();
        System.out.println("Number of people: " + value);
        System.out.println("****************************************");
    }

    // 找出10个打点最常电话确没有订购的人
    public static void durationDataForNonSubscribers(List<Record> records) {
        System.out.println("****************************************");
        System.out.println("Duration data for non subscribers");
        records.parallelStream().filter(r -> r.isNotSubscriber())
                .sorted(Comparator.comparingInt (Record::getDuration)
                        .reversed())
                .limit(10)
                .forEachOrdered(
                        r -> System.out.println("Education: "
                                + r.getEducation() + "; Duration: " +
                                r.getDuration()));
        System.out.println("****************************************");
    }

    public static void peopleBetween25and50(List<Record> records) {
        System.out.println("****************************************");
        System.out.println("People between 25 and 50");
        int count=records.parallelStream()
                .map(r -> r.getAge())
                .filter(a -> (a >=25 ) && (a <=50))
                .mapToInt(a -> 1)
                .reduce(0, Integer::sum);
        System.out.println("People between 25 and 50: "+count);
        System.out.println("****************************************");
    }
}

 

Reduction 操作

根据前面例子中显示,reduce 操作对数据流中的元素做了汇总操作,以生成一个汇总结果。汇总结果可以是和流中数据一样的类型或者不同的数据类型。一个reduce操作的简单例子是计算数据流中的元素求和。

Stream API 提供了reduce() 方法来实现 reduction 操作。该方法有以下三种类型:

  • reduce(accumulator):该版本对流中的每个元素调用 accumulator 方法。这种情况下没有初始值。它返回一个包含 accumulator 方法运行结果的 Optional 对象或者一个空的 Optional 对象(如果流是空的)。该 accumulator 方法必须是一个 associative 方法。它实现了 BinaryOperator 接口。两个参数既可以是数据流中的元素或是上一次调用 accumulator 得到的结果。
  • reduce(identity, accumulator):当最终生成结果和流中元素具有相同数据类型,就应该使用该版本。identity 值必须是 accumulator 方法的恒等值。也就是说对于所有的值 t, accumulator.apply(identity, t) 是等于 t 的。因此 identity 的值也是作为第一次调用 accumulator 方法得到的值;如果数据流里没有任何数据,那么该值也是 reduce() 方法调用的返回值。
  • reduce(identity, accumulator, combiner):当最终结果和流中的数据类型不同或使用并行流时,必须使用该版本。使用并行流数据是被并行地处理,combiner的作用就是把多个并行处理结果整合成一个。当最终结果和流中元素数据类型不同时,必须使用 combiner 告诉编译器两个中间结果是如何汇总的,编译器才能推算出 accumulator 中两个参数的正确数据类型。

 

// 如下代码演示了当最终结果和流中数据结果不同时该怎样使用 reduce(identify, accumulator, combiner) 方法
public class Test {
    public static void main(String[] args) {
        User user1 = new User(1, "Test1", 1);
        User user2 = new User(2, "Test2", 2);
        User user3 = new User(3, "Test3", 3);
        User user4 = new User(4, "Test4", 4);
        User user5 = new User(5, "Test5", 5);
        User user6 = new User(6, "Test6", 6);
        User user7 = new User(7, "Test7", 7);
        User user8 = new User(8, "Test8", 8);

        User[] users = {user1, user2, user3, user4, user5, user6, user7, user8};

        // 因为流中的数据类型是 User 类,那么如果没有使用combiner,那么编译器
        // 就会误认为 partialResult 的数据类型也是 Result,对 Result 对象采用 "+" 操作就会报错
        int totalAge = Arrays.stream(users).reduce(0, (partialResult, user) -> partialResult + user.getAge(), Integer::sum);
        System.out.println(totalAge);
    }
}


class User {
    private int id;
    private String name;
    private int age;

    public User(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}
 

 

分享到:
评论

相关推荐

    CUDA并行程序设计 GPU编程指南

    第7章细述多任务的CPU和GPU协同,并介绍多个CPU/GPU编程秘技。第8章介绍如何在应用程序中编写和使用多GPU。第9章详述CUDA编程性能限制因素、分析CUDA代码的工具和技术。第10章介绍编程实践中的库与软件开发工具包。...

    第七章-《大数据导论》大数据处理平台.pdf

    + 大量复杂的计算和分析 缺点: 依赖于单机性能:CPU + RAM (摩尔定律) 难以处理海量数据 分布式计算 基本思想: 使用一组计算机协调完成一项工作 分布式系统开发:MPI(消息传递接口) 总共287个函数 MPI_Send( )...

    计算机系统结构课件:第4章 指令级并行.ppt

    在第4章“指令级并行”中,主要探讨了如何利用硬件和软件的结合来发掘和利用程序中的并行性。\n\n1. **指令级并行的概念**\n - 流水线处理机通过指令重叠执行以提高效率,这种潜在的并行性被称为指令级并行。CPI...

    C#并行计算书籍Professional Parallel Programming with CSharp

    ##### 第7章:Visual Studio 2010任务调试功能 - **任务视图器**:介绍Visual Studio 2010中的任务视图器工具,用于跟踪和调试任务。 - **调试技巧**:分享使用任务视图器进行调试的一些实用技巧。 - **多线程调试**...

    WF从入门到精通(第十一章):并行活动源码

    在设计工作流时,通过XAML或者代码方式将`CustomParallelActivity`实例化,并添加子活动,即可在工作流中使用自定义的并行活动。 总结: 本章深入介绍了Windows Workflow Foundation中的并行活动源码,包括并行活动...

    高级Java人才培训专家-第七章:一站制造:任务流调度AirFlow

    ### 高级Java人才培训专家-第七章:一站制造:任务流调度AirFlow #### 一站制造任务调度 一站制造的第七章主要介绍了任务流调度的相关技术,特别是聚焦于Apache Airflow这一开源平台。该章节的目标是让学员能够...

    并行计算导论

    第7章着重介绍了快速傅里叶变换(FFT)的并行算法以及在消息传递环境下的实现。第8章通过二维Poisson方程的差分格式,说明了区域分解方法的并行算法设计与并行程序编制。第9章则采用二维热传导方程的交替方向隐式...

    parrel并行计算导论1

    第7章讲解快速傅里叶变换(FFT)的并行处理。第8章采用点Jacobi迭代法解决二维Poisson方程,介绍了基于区域分解的并行算法。第9章通过二维热传导方程的ADI格式,展示了基于流水线方法的并行算法设计。 第三部分...

    高性能计算并行编程技术—MPI并行程序设计

    - **指令与数据:** 并行计算机按照指令流和数据流可以分为单指令流多数据流(SIMD)和多指令流多数据流(MIMD)。SIMD系统中的所有处理器同时执行相同的指令,但作用于不同的数据上;而MIMD系统中的每个处理器可以...

    第02章-数据通信.pptx

    第二章 数据通信与广域网技术深入探讨了数据通信的基础知识和关键技术,涵盖了从基本概念到实际应用的广泛领域。本章的学习目标是理解和掌握数据通信的核心元素,以及广域网(WAN)中的数据交换技术和差错控制方法。...

    第4章-深度学习平台.pdf

    GPU(图形处理器)因其并行计算能力在深度学习中被广泛采用,用于加速神经网络的训练过程。特别是NVIDIA的CUDA架构,为GPU加速深度学习计算提供了便利。 2. **系统平台**:尽管Linux系统因其稳定性和性能优势在深度...

    java课件--耿秋义

    7. **Java第10章 - 接口与抽象类**:接口和抽象类是实现多态的关键,学习者将学习到如何定义接口,以及如何使用抽象类来设计可扩展的系统。 8. **Java第11章 - 泛型与枚举**:泛型提供了一种在编译时检查类型安全的...

    广州周立功arm培训的ppt第6章-我见过最好的arm多媒体教程

    7. **实际应用案例**:教程可能会提供实际的开发示例,如移动游戏、流媒体服务或实时视频处理,以帮助学习者理解这些技术在实际项目中的应用。 8. **调试与分析工具**:介绍如何使用ARM的开发工具,如ARM DS-5或gdb...

    java第18章java-chapter18.rar

    第18章可能涉及输入输出操作,如文件读写、数据转换、管道流和过滤流的使用。 4. **反射机制**:Java反射API允许我们在运行时检查类、接口、字段和方法的信息,甚至动态调用方法和访问私有成员。这部分内容可能包括...

    (电子商务)第三章-电子商务网络技术.ppt

    按照通信传输方式的不同分类为点到点式网络和广播式网络,按照网络控制方式的不同分类为集中式、分散式和分布式计算机网络,按照网络使用范围的不同分类为公用网和专用网。 计算机网络的通信方式和传输技术 计算机...

    《工作流管理技术基础》第七章

    《工作流管理技术基础》第七章深入探讨了工作流管理系统(Workflow Management System,WfMS)的核心概念与技术实现,是理解现代企业信息化建设中流程自动化关键的一课。本章节通过对工作流的基本定义、工作流管理...

    并行job开发者指南

    - 第7章:如何管理和监控DataStage作业的执行。 - 第8章:介绍DataStage的安全性,包括数据安全和访问控制。 - 第9章:作业的调度和性能优化。 - 第10章:错误处理、日志记录和故障排除。 - 第11章:DataStage...

    微机技术第11章并行接口.ppt

    在微机技术中,第11章主要讲解了并行接口,特别是8255A芯片的应用。并行接口是一种常见的数据传输方式,它以计算机的字长(如8位、16位或32位)为单位,一次性传输整个字长的数据,适合于短距离、大量且快速的信息...

    Code(WF从入门到精通).rar

    第七章:基本活动的操作 第八章:调用外部方法及工作流 第九章:逻辑流活动 第十章:事件活动 第十一章:并行活动 第十二章:策略和规则 第十三章:打造自定义活动 第十四章:基于状态的工作流 第十五章:工作流和...

Global site tag (gtag.js) - Google Analytics