`
sisyphe
  • 浏览: 2634 次
  • 性别: Icon_minigender_1
  • 来自: 上海
最近访客 更多访客>>
社区版块
存档分类
最新评论

为Groovy的Collection添加并行处理能力

阅读更多

我们知道Groovy中的集合操作collect是串行的。参见Groovy(1.8.6)的源代码org.codehaus.groovy.runtime.DefaultGroovyMethods

    /**
     * Iterates through this aggregate Object transforming each item into a new value using the
     * <code>transform</code> closure, returning a list of transformed values.
     * Example:
     * <pre class="groovyTestCase">def list = [1, 'a', 1.23, true ]
     * def types = list.collect { it.class }
     * assert types == [Integer, String, BigDecimal, Boolean]</pre>
     *
     * @param self      an aggregate Object with an Iterator returning its items
     * @param transform the closure used to transform each item of the aggregate object
     * @return a List of the transformed values
     * @since 1.0
     */
    public static <T> List<T> collect(Object self, Closure<T> transform) {
        return (List<T>) collect(self, new ArrayList<T>(), transform);
    }

 
collect最终使用Java的Iterator:

    /**
     * Iterates through this aggregate Object transforming each item into a new value using the <code>transform</code> closure
     * and adding it to the supplied <code>collector</code>.
     *
     * @param self      an aggregate Object with an Iterator returning its items
     * @param collector the Collection to which the transformed values are added
     * @param transform the closure used to transform each item of the aggregate object
     * @return the collector with all transformed values added to it
     * @since 1.0
     */
    public static <T> Collection<T> collect(Object self, Collection<T> collector, Closure<? extends T> transform) {
        for (Iterator iter = InvokerHelper.asIterator(self); iter.hasNext(); ) {
            collector.add(transform.call(iter.next()));
        }
        return collector;
    }

此处没有任何特殊的,自然就是串行执行transform.call()了。

如何为Collection增加并行处理能力?有个办法,原理很简单,就是将原始的closure包装到线程中,等所有线程完成后整个迭代操作才正式完成。

import java.util.concurrent.*

class ParallelFeature {
    static POOL_SIZE = 10

    static def collectParallel(collections, block) {
        return collectParallel(collections, 60, block)
    }

    static def collectParallel(collections, timeout, block) {
        def exec = Executors.newFixedThreadPool(POOL_SIZE)
        def latch = new CountDownLatch(collections.size())
        def result = collections.collect {
            exec.submit(new Callable() {
                def call() {
                    def result = block(it)
                    latch.countDown()
                    result
                }
            })
        }
        result = latch.await(timeout, TimeUnit.SECONDS) ? result.collect { it.get() } : null
        return result
    }
}

简单起见,该代码没有对异常过多处理。
此外,为了方便使用该方法,还需要用Groovy的metaClass在使用前将它植入。

    	java.util.Collection.metaClass.collectParallel = { block ->
    		ParallelFeature.collectParallel(delegate, block)
    	}

    	java.util.Collection.metaClass.collectParallel = { timeout, block ->
    		ParallelFeature.collectParallel(delegate, timeout, block)
    	}

然后可以直接替换原来代码中的collect操作了。 

files.collectParallel { file ->
    download(file)
}
是不是很简单?
分享到:
评论

相关推荐

    java7,32位+64位安装包

    - **Fork/Join框架**:Java 7引入了一个新的并发模型,Fork/Join框架,用于处理大型任务的分解,通过将大任务拆分为更小的任务并行处理,然后合并结果。它简化了编写并行代码的过程。 2. **try-with-resources...

    java jdk1.6

    2. **增强的垃圾收集(Garbage Collection)**:引入了并行弱引用垃圾收集器,提高了内存管理效率,降低了应用的暂停时间。 3. **动态代理类**:在Java.lang.reflect包下增加了Proxy类,允许创建接口的动态实现,...

    JDK1.7 亲自收藏,可以使用

    2. **动态类型语言支持**:JDK1.7通过JSR 292( invokespecial 字符串支持)添加了对动态类型语言的支持,使得Java平台能够更好地与脚本语言如Groovy、JRuby等交互。 3. **Try-with-resources**:这个新特性使得...

    JDK1.7api开发手册

    类型注解允许在类型声明(如泛型参数、变量、方法返回类型和参数类型)上添加元数据,增强了编译器和静态分析工具的能力。这对于编写安全的代码和实现代码验证非常有用。 3. **钻石操作符(Diamond Operator)** ...

    java7api

    Java 7 API,全称为Java Platform, Standard Edition 7 Application Programming Interface,是Java编程语言的一个重要版本,由Oracle公司发布于2011年。这个版本引入了许多新特性,优化了旧有的功能,并且增强了...

    大数据场景中语言虚拟机的应用和挑战.docx

    这些特性使得许多其他编程语言也能在JVM上运行,如Scala、Groovy等,极大地扩展了JVM的应用领域。目前,大量知名的大数据系统,如Hadoop、Spark、Flink等,都基于JVM构建。 ##### 2.2 CLR(公共语言运行时) CLR是...

    gradle-5.1.zip

    - 引入了Kotlin DSL的`@fileCollection`注解,允许开发者更直观地处理文件集合。 4. **Java插件改进**: - 对Java插件进行了更新,支持Jigsaw模块化(Java 9+),使得Gradle项目可以更好地与Java模块系统集成。 ...

    JDK_API_1_6_zh_CN.rar

    5. **并发编程改进**:包括改进的并发工具类如`ConcurrentHashMap`、`ConcurrentSkipListMap`等,以及`Fork/Join`框架,为并行计算提供支持。 6. **改进的垃圾收集(Garbage Collection)**:1.6版本优化了垃圾收集...

    jdk1.6.0_32.rar

    Java JDK 1.6.0_32 是Oracle公司发布的一个重要的Java开发工具包,它为开发者提供了编译、调试和运行Java应用程序所需的所有组件。这个版本是针对Java Development Kit的1.6更新32,包含了对先前版本的错误修复和...

    jdk-7u80-windows-x64

    5. **Fork/Join框架**:这是一个并行编程模型,用于执行任务分解,常用于大数据处理和计算密集型任务。 6. **NIO.2(New I/O 2)**:提供了新的文件系统API,支持异步I/O操作和文件通道。 7. **改进的垃圾收集...

    jdk1.7.0_40.zip

    3. **并行GC(Garbage Collection)改进**: - **G1垃圾收集器**:作为一种服务器级的垃圾收集器,G1在JDK 1.7中得到了更多优化,提供更稳定的暂停时间和更可预测的内存使用。 4. **动态语言支持**: - ** ...

    JDK6.0API官方中文版CHM

    Java Development Kit(JDK)6.0是Oracle公司发布的一个重要版本,它为Java开发者提供了丰富的类库和工具,用于构建、测试和部署Java应用程序。JDK 6.0 API(Application Programming Interface)文档是开发人员理解...

    The Java® Virtual Machine Specification Java SE 7 Edition.rar

    6. 异常处理:描述了Java异常处理的机制,包括异常表、捕获范围和异常类型的关联等。 7. 多线程:JVM提供了原生线程支持,以及线程同步和通信的方法,如synchronized关键字、wait()、notify()和notifyAll()等。 8....

Global site tag (gtag.js) - Google Analytics