http://colobu.com/2014/11/18/Java-8-Stream/
在现代的Java应用程序中很少不用到集合类和数组。 可以对集合进行增,删,改,插, 统计(聚合aggregate)。 这些操作的概念在SQL操作上也会用到。 但是对集合的操作却没有像SQL那样方便简捷。 为什么我们不能实现一种类似SQL语句一样方便的编程方式呢, 去取代一遍又一遍loop遍历的方式处理集合和数组中的数据?
另外,对于大数据量的集合, 能不能充分利用多核的优势, 并行的处理?
Stream是就是这种处理数据的风格, 一种流式风格。 这种风格在其它语言中也有实现, 比如Javascript (Node.js stream)。
这种风格将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。
元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。
1
2
3
|
+--------------------+ +------+ +------+ +---+ +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+ +------+ +------+ +---+ +-------+
|
一个简单的例子:
1
2
3
4
5
6
|
List<Integer> transactionsIds =
widgets.stream()
.filter(b -> b.getColor() == RED)
.sorted((x,y) -> x.getWeight() - y.getWeight())
.mapToInt(Widget::getWeight)
.sum();
|
应用Stream
首先,我们先了解一些Stream处理的概念。
什么是流 Stream
流是一个来自数据源的元素队列并支持聚合操作
- 元素队列 元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
- 数据源 流的来源。 可以是集合,数组,I/O channel, 产生器generator 等。
- 聚合操作 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。
注意 这里的流和Java I/O操作的流如InputStream/OutputStream不是一个概念。
和以前的Collection操作不同, Stream操作还有两个基础的特征:
- Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
- 内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。
除了操作不同, 从实现角度比较, Stream和Collection也有众多不同:
- 不存储数据。 流不是一个存储元素的数据结构。 它只是传递源(source)的数据。
- 功能性的(Functional in nature)。 在流上操作只是产生一个结果,不会修改源。 例如filter只是生成一个筛选后的stream,不会删除源里的元素。
- 延迟搜索。 许多流操作, 如filter, map等,都是延迟执行。 中间操作总是lazy的。
- Stream可能是无界的。 而集合总是有界的(元素数量是有限大小)。 短路操作如limit(n) , findFirst()可以在有限的时间内完成在无界的stream
- 可消费的(Consumable)。 不是太好翻译, 意思流的元素在流的声明周期内只能访问一次。 再次访问只能再重新从源中生成一个Stream
几种流生成的方式:
- 集合类的stream() 和 parallelStream()方法;
- 数组Arrays.stream(Object[]);
- Stream类的静态工厂方法: Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator);
- 文件行 BufferedReader.lines();
- Files类的获取文件路径列表: find(), lines(), list(), walk();
- Random.ints() 随机数流, 无界的;
- 其它一些产生流的方法:BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence),JarFile.stream().
- 通过StreamSupport辅助类从spliterator产生流
前面提到过, 流操作分为中间操作(Intermediate operation)和最终操作(Terminal operation)。 中间操作是lazy的, 不会立即执行,只不过是返回一个记录操作的新的流。 最终操作会最终使用流管道,使用后不能在被使用。 大部分情况下, 最终操作都是eager的。
中间操作又进一步分为无状态的操作和有状态的操作。 像filter,map都是无状态的操作, 处理一个新的元素时不需要获得先前遍历过的元素的状态。 而有状态的操作,像distinct, sorted, 需要得到先前访问的元素的状态。
有状态的操作在产生结果前需要获得完整的输入。 因此有状态的操作一个并行流时, 可能需要多次传入数据或者需要缓存数据。 而无状态的操作只需传入一次数据。
Collection.stream() 和 Collection.parallelStream() 分别产生序列化流(普通流)和并行流。 注意并行(parallel)和并发(concurrency)是有区别的。 并发是指多线程有竞争关系,在单核的情况下只有一个线程运行。 而并行是指在多核的情况下同时运行, 单核谈并行是无意义的。
记住,并行不一定快,尤其在数据量很小的情况下,可能比普通流更慢。 只有在大数据量和多核的情况下才考虑并行流。
除非操作明显是不确定的,比如findAny, 否则普通流和并行流应该返回一致的结果。
尽管可以从非线程安全的集合如ArrayList生成流,如果在流管道执行过程中修改了源,可能会抛出java.util.ConcurrentModificationException异常。 下面一个简单的例子演示这种情况。
1
2
3
4
|
List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5));
long count = list.stream().filter(x -> {list.remove(0);return true;})
.count();
System.out.println(count);
|
concurrent下的并发集合不会抛出ConcurrentModificationException异常,在中间操作对源数据的修改会反射到最终结果:
1
2
3
4
|
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));
|
如果传递给中间操作的lambda表达式是有状态, 并行流的最终结果可能会不一样:
1
2
|
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
|
"Side-effect"不鼓励使用。Side-effect是只一个方法不只是返回一个结果,还会改变对象的状态。 例如:
1
2
3
|
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
|
可以改为
1
2
3
|
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
|
如果源是有序的,则相应的流也是有序的。 这里有序是顺序的意思,不是排序。 比如Array和List都是有序的。 HashSet则不是。 有序流上的操作的记过基本上也是有序的, 比如[1,2,3]通过map(x -> x*2)的结果必然是[2,4,6], 如果是无序流, [6,2,4], [4,6,2]等都是合法的结果。
一些reduction方法(也就做fold方法)处理一系列的数据得到一个唯一的结果。 比如reduce, collect,sum,max,count等。
比如以前我们计算数组的sum值:
1
2
3
4
5
|
int sum = 0;
for (int x : numbers) {
sum += x;
}
|
现在使用reduce:
1
2
3
4
|
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
或
int sum = numbers.stream().reduce(0, Integer::sum);
int sum = numbers.parallelStream().reduce(0, Integer::sum);
|
揭秘Stream的实现
在我们进入Stream接口的代码实现之前,我们先看两个Iterator,Spliterator:
Iterator
Java 8中为Iterator新增加一个缺省方法forEachRemaining(Consumer<? super E> action)
。这个缺省方法的实现很简单, 对未处理的元素执行action
, 直到处理完或者action抛出异常。
1
2
3
4
5
|
default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}
|
Spliterator
正如其名,Spliterator可以看作一个“splittable Iterator”。 在单线程情况下使用没有问题,但是它提供了trySplit()
,为多线程提供处理数据片。
它是为了并行处理流而新增的一个迭代类。
它依然实现了顺序迭代方法default void forEachRemaining(Consumer<? super T> action)
。 内部用一个循环执行:
1
2
3
|
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}
|
而tryAdvance
方法则对下一个为处理的操作执行action并返回true, 如果没有下一个元素,返回false。
看一个trySplit()
的例子, ArrayListSpliterator的trySplit采用二分法,将前一半数据返回, 如果数据太小不能分了,返回null。
1
2
3
4
5
6
|
public ArrayListSpliterator<E> trySplit() {
int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
return (lo >= mid) ? null : // divide range in half unless too small
new ArrayListSpliterator<E>(list, lo, index = mid,
expectedModCount);
}
|
而ConcurrentLinkedQueue和ConcurrentLinkedDeque的相应的Spliterator处理稍微复杂一点, 第一次取一个,第二个取两个,不超过MAX_BATCH.
Stream
事实上Stream只是一个接口,并没有操作的缺省实现。最主要的实现是ReferencePipeline
,而它的一些具体实现又是由AbstractPipeline
完成的。
下面我们看一下这两个类。
1
2
3
|
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT>
|
AbstractPipeline类实现了所有的 Stream的中间操作和最终操作。 我们重点的挑一些来分析。
首先这个类本身没有定义field, 所有我们只需关注它的每一个具体方法即可,简化了我们的分析。
看一个无状态的操作filter:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
|
可以看到这个操作只是返回一个StatelessOp对象(此类依然继承于ReferencePipeline),它的一个回调函数opWrapSink会返回一个Sink
对象链表。Sink
代表管道操作的每一个阶段, 比如本例的filter阶段。 在调用accept之前,先调用begin通知数据来了,数据发送后调用end。
而map类似。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
|
那么问题来了stream.filter(....).map(...)
怎么形成一个链的?
filter返回一个StatelessOp,我们记为StatelessOp1, 而map返回另外一个StatelessOp,我们记为StatelessOp2.
在调用StatelessOp1.map时, StatelessOp2是这样生成的:
1
|
return new StatelessOp<P_OUT, R>(StatelessOp1,......);
|
管道中的每一个阶段的stream都保留前一个流(upstream)的引用。
有状态的操作比较复杂,有专门的类来处理:
1
2
3
4
5
6
7
8
|
@Override
public final Stream<P_OUT> distinct() {
return DistinctOps.makeRef(this);
}
@Override
public final Stream<P_OUT> sorted() {
return SortedOps.makeRef(this);
}
|
最终操作基本由父类的final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp)
完成。
count由mapToLong实现。
前面讲到, 只有最终操作才对源数据进行操作,中间操作都是lazy的。 怎么实现的呢?
沿着这个调用terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()))
-> helper.wrapAndCopyInto(this, spliterator).get()
-> copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
找到wrapSink
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
|
首先找到最后一个操作,也就是最终操作, 执行它的opWrapSink,事实上得到一个链表,最终返回第一个Sink, 执行第一个Sink的accept
将触发链式操作, 将管道中的操作在一个迭代中执行一次。
代码稍显乱而复杂,简单化的将,事实上Java是将所有的操作形成一个类似链接的结构(通过Sink的downstream,upstream),在遇到最终操作时触发链式反应, 通过各种数据类型特定的spliterator的一次迭代最终得到结果。
并行操作是通过ForkJoinTask框架实现。
模拟流的处理链
Java Stream的实现比较复杂,因为它需要处理各种操作已经并行计算。 我们可以用一个简单的例子来模拟它的过程, 这个例子将Stream简化,但是处理方法是一致。
这个例子模拟使用Sink建立链表,在最后一个操作的时候回溯链表, 并调用Spliterator的forEachRemaining方法进行一次遍历, 每访问一个数组的元素就会从头开始调用链表的每个节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
import java.util.Arrays;
import java.util.Spliterator;
import java.util.function.Consumer;
public class SinkChain {
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) {
int[] source = {1,2,3,4,5};
Spliterator spliterator = Arrays.stream(source).spliterator();
//setup upstream
Sink<Integer> sink0 = new Sink<Integer>("source sink", null);
Sink<Integer> sink4 = sink0.op("sink1").op("sink2").op("sink3").op("terminal sink");
//setup downstream chain
Sink wrappedSink = wrapSink(sink4);
assert(wrappedSink == sink0); //now get the first (source) stage
//in one loop, handle elements: 1,2,3,4,5
spliterator.forEachRemaining(wrappedSink);
}
public static Sink wrapSink(Sink sink) {
while(sink.upstream != null) {
sink.upstream.downstream = sink;
sink = sink.upstream;
}
return sink;
}
static class Sink<T> implements Consumer<T>{
private Sink upstream;
private Sink downstream;
private String name;
public Sink(String name, Sink upstream) {
this.name = name;
this.upstream = upstream;
}
public Sink op(String name) {
return new Sink(name, this);
}
@Override
public void accept(T t) {
System.out.println(name + " handles " + t);
if (downstream != null)
downstream.accept(t);
}
}
}
|
参考
- http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
- http://www.oracle.com/technetwork/articles/java/architect-streams-pt2-2227132.html
- http://java.dzone.com/articles/understanding-java-8-streams-1
- http://stackoverflow.com/questions/224648/external-iterator-vs-internal-iterator
- http://codereview.stackexchange.com/questions/52050/spliterator-implementation
- https://www.inkling.com/read/schildt-java-complete-reference-9th/chapter-18/spliterators
相关推荐
在Java编程中,Stream API是Java 8引入的一个强大特性,它允许我们以声明性方式处理数据集合。本文将深入探讨如何使用Stream API对日期进行排序。日期排序在数据分析、日志处理、报表生成等场景中非常常见。下面我们...
- **基本类型的流**:对于int、long和double这些基本类型,Java8提供了专门的Stream实现:`IntStream`、`LongStream`和`DoubleStream`。 ```java IntStream intStream = IntStream.range(0, 10); // 生成0到9的...
在test方法中用各种java stream操作,实现了类似相应SQL的输出效果。本例子不依赖第三方程序,直接在java开发环境中编译运行。 以下是程序部分代码: List, Object>> list2 = list.stream().filter(map -> Integer...
Java 8 Stream 表达式实现 if/else 逻辑 Java 8 Stream 表达式是一种功能强大且灵活的数据处理方式,但是在实际使用中,我们经常会遇到if/else判断情况的处理问题。传统的写法是将if/else逻辑写在forEach方法中,...
Java 8 Stream 的分组功能是通过使用 Collectors.groupingBy() 方法实现的。该方法可以将 Stream 中的元素按照某个或多个条件分组,例如按照学生的班级、年龄、性别等条件分组。 下面是一个简单的示例代码,演示了...
根据提供的文件信息,本文将详细介绍Java 8中Stream API的核心概念、使用方法及其实战案例。Stream API作为Java 8的重要特性之一,极大地简化了集合处理的代码编写过程,提升了程序的可读性和效率。 ### Stream概述...
java stream doc.
Java8 Stream 自定义分组求和并排序的实现 Java8 Stream 自定义分组求和并排序的实现是 Java8 中一个非常重要的知识点,该知识点的实现可以帮助开发者快速地实现分组求和并排序的操作,从而提高开发效率和代码质量...
Java Stream是Java 8引入的重要特性,它提供了一种声明式处理数据集合的方式,极大地提升了代码的可读性和性能。Stream允许程序员更加专注于业务逻辑,而不是底层数据处理的细节。 **1. Java Stream简介** Java ...
在Java 8中,Stream API引入了一种新的处理数据的方式,它允许我们以声明性风格对集合、数组等数据源进行高效的操作。Stream是数据渠道,它可以用来处理一系列元素,如从集合或数组中生成的元素序列。Stream操作通常...
java8中通过stream流对List类型进行一些操作的测试Demo类
Java 8 是一个非常成功的版本,这个版本新增的Stream,配合同版本出现的 Lambda,给我们操作集合(Collection)提供了极大的便利。 Stream将要处理的元素集合看作一种流,在流的过程中,借助`Stream API`对流中的...
Java 8的流式处理通过内部迭代实现了对数据的处理,它通常包括三个阶段:流的创建(转换成流)、中间操作和终端操作。中间操作如`filter()`、`map()`等不会立即执行,而是构建一个操作序列;终端操作如`collect()`、...
java8 stream 常见例子
Java8 Stream详解.md
5. **数据管道**:在Java 8之前,为了实现类似Stream API的链式操作,开发者可能需要创建多个中间操作(Intermediate Operations),如过滤、映射,然后在最后进行终端操作(Terminal Operation)。这种方法比较繁琐...
在Java 8 Stream API中,我们可以实现一些复杂的操作,比如使用`filter()`筛选出满足条件的元素,`map()`将元素转换为另一种类型,然后使用`reduce()`进行聚合操作。此外,`groupingBy()`方法可以用于按指定字段进行...
在 Java 8 引入的 Stream API 中,java.util.stream.Collectors 类扮演着至关重要的角色。它提供了一种高级的方式来处理集合数据,使得数据聚合和转换操作变得简单而高效。本文将深入探讨 Collectors 类的作用、常用...
Java 8 Stream API 是Java开发中的一个重要特性,它引入了一种新的编程范式,使得处理集合数据更加简洁、高效和可读。Stream API是Java 8对函数式编程的有力支持,它允许开发者以声明性方式处理数据,而无需关注底层...
Java8并行流中自定义线程池操作示例 Java8并行流中自定义线程池操作示例主要介绍了Java8并行流中自定义线程池操作,结合实例形式分析了并行流的相关概念、定义及自定义线程池的相关操作技巧。 1. 概览 Java8引入了...