`
bit1129
  • 浏览: 1069668 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark101】Scala Promise/Future在Spark中的应用

 
阅读更多

Promise和Future是Scala用于异步调用并实现结果汇集的并发原语,Scala的Future同JUC里面的Future接口含义相同,Promise理解起来就有些绕。等有时间了再仔细的研究下Promise和Future的语义以及应用场景,具体参见Scala在线文档:http://docs.scala-lang.org/sips/completed/futures-promises.html

 

如下代码来自于BlockTransferService的fetchBlockSync方法,因为只是拉取一个Block的数据,Spark在此处定义为同步获取,而不是异步获取。异步获取的实现是BlockTransferService的fetchBlocks方法,它可以批量获取多个Blocks,返回结果放于回调函数的ManageBuffer中了。

 

如下代码,首先定义了Promis类型的result变量,Promise将放入ManagedBuffer类型的数据,一旦放入,那么Promise.future将从等待结果的状态中返回。因此,Promise的语义可以理解为Promise会在某个时间点放入一个数据,而Promise.future的语义是等待这个值的放入,放入完成后future从阻塞等待的状态立即返回。

 

Promise数据的放入是通过Promise.success和Promise.failure操作实现的,分别表示放入了异步操作得到正确的结果和异步操作失败而放入失败的结果。

 

 

 

 

  /**
   * A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
   *
   * It is also only available after [[init]] is invoked.
   */
  def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = {
    // A monitor for the thread to wait on.
    //创建Promise对象result,result.future将等待Promise对象写入数据
    val result = Promise[ManagedBuffer]()
    //通过fetchBlocks发起异步获取Block的请求,请求返回后根据调用结果调用BlockFetchingListener的onBlockFetchFailure或者onBlockFetchSuccess方法,在两个方法中
    ///为Promise变量写入请求返回的数据值,此后,result.future将从等待状态返回
    fetchBlocks(host, port, execId, Array(blockId),
      new BlockFetchingListener {
        override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
          result.failure(exception)
        }
        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
          val ret = ByteBuffer.allocate(data.size.toInt)
          ret.put(data.nioByteBuffer())
          ret.flip()
          result.success(new NioManagedBuffer(ret))
        }
      })
    //Await.result是Scala并发框架提供的同步等待原语,它等待的事件是它的第一个参数,此处是result.future
    Await.result(result.future, Duration.Inf)
  }

 

从下面Await的result方法可以看到,result.future是一个Awaitable类型的实例,即Future实现了Awaitable,

    @throws(classOf[Exception])
    def result[T](awaitable: Awaitable[T], atMost: Duration): T =
      blocking(awaitable.result(atMost)(AwaitPermission))
  }

 

Awaitable接口的注释:

/**
 * An object that may eventually be completed with a result value of type `T` which may be
 * awaited using blocking methods.
 * 
 * The [[Await]] object provides methods that allow accessing the result of an `Awaitable`
 * by blocking the current thread until the `Awaitable` has been completed or a timeout has
 * occurred.
 */
trait Awaitable[+T] {

 

Future接口

 

/** The trait that represents futures.
 *
 *  Asynchronous computations that yield futures are created with the `future` call:
 *
 *  {{{
 *  val s = "Hello"
 *  val f: Future[String] = future {
 *    s + " future!"
 *  }
 *  f onSuccess {
 *    case msg => println(msg)
 *  }
 *  }}}
 *
 *  @author  Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang
 *
 *  @define multipleCallbacks
 *  Multiple callbacks may be registered; there is no guarantee that they will be
 *  executed in a particular order.
 *
 *  @define caughtThrowables
 *  The future may contain a throwable object and this means that the future failed.
 *  Futures obtained through combinators have the same exception as the future they were obtained from.
 *  The following throwable objects are not contained in the future:
 *  - `Error` - errors are not contained within futures
 *  - `InterruptedException` - not contained within futures
 *  - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
 *
 *  Instead, the future is completed with a ExecutionException with one of the exceptions above
 *  as the cause.
 *  If a future is failed with a `scala.runtime.NonLocalReturnControl`,
 *  it is completed with a value from that throwable instead.
 *
 *  @define nonDeterministic
 *  Note: using this method yields nondeterministic dataflow programs.
 *
 *  @define forComprehensionExamples
 *  Example:
 *
 *  {{{
 *  val f = future { 5 }
 *  val g = future { 3 }
 *  val h = for {
 *    x: Int <- f // returns Future(5)
 *    y: Int <- g // returns Future(5)
 *  } yield x + y
 *  }}}
 *
 *  is translated to:
 *
 *  {{{
 *  f flatMap { (x: Int) => g map { (y: Int) => x + y } }
 *  }}}
 *
 * @define callbackInContext
 * The provided callback always runs in the provided implicit
 *`ExecutionContext`, though there is no guarantee that the
 * `execute()` method on the `ExecutionContext` will be called once
 * per callback or that `execute()` will be called in the current
 * thread. That is, the implementation may run multiple callbacks
 * in a batch within a single `execute()` and it may run
 * `execute()` either immediately or asynchronously.
 */
trait Future[+T] extends Awaitable[T] {

 

 

关于Promise的用法,http://stackoverflow.com/questions/13381134/what-are-the-use-cases-of-scala-concurrent-promise解释了基本含义:

The Promise and Future are complementary concepts. The Future is a value which will be retrieved, well, sometime in the future and you can do stuff with it when that event happens. It is, therefore, the read or out endpoint of a computation - it is something that you retrieve a value from.

A Promise is, by analogy(与此类似), the writing side of the computation. You create a promise which is the place where you'll put the result of the computation and from that promise you get a future that will be used to read the result that was put into the promise. When you'll complete a Promise, either by failure or success, you will trigger all the behavior which was attached to the associated Future.

Regarding your first question, how can it be that for a promise p we have p.future == p. You can imagine this like a single-item buffer - a container which is initially empty and you can afterwords store one value which will become its content forever. Now, depending on your point of view this is both a Promise and a Future. It is promise for someone who intends to write the value in the buffer. It is a future for someone who waits for that value to be put in the buffer.

 

Regarding the real-world use: Most of the time you won't deal with promises directly. If you'll use a library which performs asynchronous computation then you'll just work with the futures returned by the library's methods. Promises are, in this case, created by the library - you're just working with the reading end of what those methods do.

But if you need to implement your own asynchronous API you'll have to start working with them. Suppose you need to implement an async HTTP client on top of, lets say, Netty. Then your code will look somewhat like this

 

    def makeHTTPCall(request: Request): Future[Response] = {
        val p = Promise[Response]
        registerOnCompleteCallback(buffer => {
            val response = makeResponse(buffer)
            p success response
        })
        p.future
    }

 

 

分享到:
评论

相关推荐

    Scala开发规范_最新版本

    - 使用`Future`和`Promise`进行异步编程。 - 利用Scala的actor模型进行并发控制,保证线程安全。 8. **Spark与Scala结合** - Spark的核心API是用Scala构建的,因此理解Scala有助于更好地使用Spark。 - 在Spark...

    快学Scala完整版&Scala编程(中文版)&Scala程序设计-多线程编程实践

    书中可能详细讲解了Scala如何利用JVM的并行能力,比如使用 Futures 和 Promises 实现异步编程,通过 Actors 实现消息驱动的并发,以及如何利用 Scala 的并发原语如 `Await`、`Promise` 和 `Future` 来编写高效、安全...

    spark总结

    根据给定的文件信息,我们将围绕“Spark总结”这一主题展开,主要聚焦于Scala语言的介绍及其在开发Spark应用程序中的应用。以下为详细的知识点: ### Spark与Scala概述 1. **Spark简介**: - Apache Spark是一款...

    scala-2.10.4.rar

    Scala在大数据处理领域中与Apache Spark紧密关联。Spark是一个快速、通用且可扩展的数据处理引擎,其核心库广泛使用Scala编写。使用Scala编写Spark应用程序可以充分利用其高级语言特性,如高阶函数、类型推断和模式...

    快学 Scala(第二版) Second Edition

    3. **Concurrent编程**:Scala的并发库,如`Future`和`Promise`,可以帮助处理Kafka异步操作。 4. **Akka整合**:Scala与Akka的集成使得构建容错、高并发的Kafka应用更加简单。 通过阅读《快学 Scala(第二版)》,...

    Scala编程完整版

    Scala提供了Future和Promise概念来支持异步编程,使得在非阻塞环境中处理任务成为可能。这种方式提高了程序的并发性能,尤其适用于I/O密集型的应用。 **8. Scala的模式匹配** 模式匹配是Scala的一大特色,它允许...

    scala入门教程pdf

    8. **未来和承诺**:Scala支持异步编程,未来(Future)和承诺(Promise)是处理异步操作的关键概念。 9. **泛型**:Scala的泛型提供了一种方式来编写类型安全的代码,允许在类、方法和对象中使用类型参数。 10. *...

    Scala 书籍资料

    最后,这份资料可能还会介绍Scala与其他技术的集成,比如Play Framework用于构建Web应用,Apache Spark用于大数据处理,或者Scala.js用于将Scala代码编译为JavaScript,实现前后端的无缝对接。 总的来说,这份...

    Scala数据结构和算法.docx

    8. **并行和并发**:Scala的`Future`、`Promise`、`Akka`等库提供了强大的并发和并行处理能力,这在大数据场景下尤其重要。 9. **算法实现**:Scala可以实现各种经典算法,如排序(快速排序、归并排序)、查找(二...

    Scala.for.the.Impatient.2nd.2017.pdf

    - **并发与并行**:Scala内置了对并发编程的支持,包括Actor模型和Future/Promise机制,可以帮助开发者轻松地编写高度并发的应用程序。 **4. 实际应用案例** - **Web开发**:使用Play框架可以快速构建高性能的Web...

    effective scala

    在并发编程领域,Scala提供了Future和Promise等并发工具来处理异步编程,这是构建大容量分布式系统服务不可或缺的特性。掌握并发编程的技巧对于写出高效且稳定的系统至关重要。 控制结构部分主要讨论了递归、返回...

    快学 scala 中文版 带完整目录

    17.1 在future中运行任务 278 17.2 等待结果 280 17.3 Try类 281 17.4 回调 282 17.5 组合future任务 283 17.6 其他future变换 286 17.7 Future对象中的方法 288 17.8 Promise 289 17.9 执行上下文 291 ...

    spark-lp:基于Apache Spark的分布式线性编程求解器

    4. **Concurrency and Parallelism**:Spark-LP利用Scala的并发特性,如Future和Promise,以及Spark的并行执行模型,实现了高效的任务调度和并发执行。 5. **Error Handling and Fault Tolerance**:Apache Spark...

    formation-scala:Scala培训的代码示例,练习和项目

    Scala是一种强大的多范式编程语言,它融合了面向对象和函数式编程的特性,使得它在处理并发和大规模数据处理时表现出色。...通过实践其中的代码示例和项目,可以深入理解Scala语言的精髓及其在实际开发中的应用。

    learning-scala-v2

    我们将学习如何在Scala中进行线程安全的编程,以及如何利用Future和Promise来处理异步任务。 7. **案例研究篇**:通过实际项目或示例,展示Scala在Web开发(如Play Framework)、大数据处理(如Spark)等方面的应用...

    COT5930-001_Functional_Programming_with_Scala:我的函数式编程和Scala研究生课程的课程

    此外,课程还将涉及Scala的并发编程,由于函数式编程天然支持并行计算,Scala的Akka框架和Future/Promise机制让学生能够在并发环境中编写高效、安全的代码。 最后,课程可能还会涵盖Scala在大数据处理领域的应用,...

    advanced-scala

    10. **未来和承诺(Future and Promise)**:Scala的并发库提供了`Future`和`Promise`,用于处理异步操作。`Future`代表一个可能尚未完成的结果,而`Promise`则用于创建和控制`Future`。 11. **集合库**:Scala的...

    jackexploit

    9. **未来与Promise**:Scala提供了Future和Promise,用于异步编程。它们允许开发者编写非阻塞的代码,提高程序的执行效率。 10. **社区与工具**:Scala拥有活跃的社区和丰富的库,如Slick用于数据库操作,Cats和...

    浇口:火花+流口水

    7. **并发与并行**:Scala的Future和Promise API为异步编程提供了便利,可以轻松处理并发任务,同时Actor模型则支持并行计算。 8. **FP与OO的结合**:Scala的创新之处在于它能够将两种编程范式融合在一起,使得...

    Grid_TP_Aguesse_Jacquet_Wiki

    3. **并发与并行**:Scala内置的并发支持,如Future、Promise和Akka框架,使得在Grid计算中处理大量并发任务变得简单高效。 4. **分布式计算**:理解分布式计算的基本原理,如数据分片、任务调度和错误处理,以及...

Global site tag (gtag.js) - Google Analytics