`
xylong
  • 浏览: 191257 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

scala与生产者消费者模式

阅读更多
object ProdConSample {

  class Producer(drop: Drop) extends Runnable {
    val importantInfo: Array[String] = Array(
      "Mares eat oats",
      "Does eat oats",
      "Little lambs eat ivy",
      "A kid will eat ivy too");

    override def run(): Unit =
      {
        importantInfo.foreach((msg) => drop.put(msg))
        drop.put("DONE")
      }
  }

  class Consumer(drop: Drop) extends Runnable {
    override def run(): Unit =
      {
        var message = drop.take()
        while (message != "DONE") {
          System.out.format("MESSAGE RECEIVED: %s%n", message)
          message = drop.take()
        }
      }
  }

  class Drop {
    var message: String = ""
    var empty: Boolean = true
    var lock: AnyRef = new Object()

    def put(x: String): Unit =
      lock.synchronized {
        // Wait until message has been retrieved
        await(empty == true)
        // Toggle status
        empty = false
        // Store message
        message = x
        // Notify consumer that status has changed
        lock.notifyAll()
      }

    def take(): String =
      lock.synchronized {
        // Wait until message is available.
        await(empty == false)
        // Toggle status
        empty = true
        // Notify producer that staus has changed
        lock.notifyAll()
        // Return the message
        message
      }

    private def await(cond: => Boolean) =
      while (!cond) { lock.wait() }
   
  }

  def main(args: Array[String]): Unit =
    {
      // Create Drop
      val drop = new Drop();

      // Spawn Producer
      new Thread(new Producer(drop)).start();

      // Spawn Consumer
      new Thread(new Consumer(drop)).start();
    }

}
分享到:
评论

相关推荐

    快学 Scala(第二版) Second Edition

    Kafka是一个分布式流处理平台,通常用Scala构建生产者和消费者。Scala的便利性使得与Kafka的交互更加高效: 1. **KafkaProducer**:Scala API提供了创建Kafka生产者的接口,可以方便地发送消息到主题。 2. **Kafka...

    producer-consumer-akka-actor:使用Akka演员(!Streams)的生产者消费者的简单实现

    多个消费者畅通无阻的消费者和生产者(演员) 背压没有锁定,通过消息传递完成同步它无法处理的事情: 多个生产者-我故意留下它是因为我正在考虑用于多个消费者的相同模式,并将其扩展到多个生产者。 因此,如果我...

    kafka-2.13-3.5.0+scala-2.13.2

    在使用Kafka进行大数据流处理时,还需要掌握如何编写生产者和消费者应用,理解消息的序列化和反序列化,以及如何利用Kafka Connect与其他系统集成。总的来说,Kafka是一个强大且灵活的工具,适用于构建实时数据管道...

    Kafka示例代码

    通过提供的Kafka示例代码,你可以学习如何在Java和Scala环境中设置和操作Kafka的生产者和消费者,理解Kafka与Hadoop的集成方式,以及如何在Scala这样的函数式语言中优雅地使用Kafka API。这些知识对于构建实时数据...

    KafkaDemo.rar

    2. **消费者代码**:与生产者类似,这里可能是消费者应用程序的实现,用于订阅并处理生产者发布的消息。消费者代码会展示如何创建Consumer实例,订阅主题,以及如何迭代处理消息。 3. **配置文件**:Kafka应用通常...

    clickhouse-scala-client:具有响应流支持的Clickhouse Scala客户端

    在clickhouse-scala-client中,这些组件被用来创建一个可管理的数据流,使得数据的生产和消费可以在不阻塞主线程的情况下进行,这对于高吞吐量的数据库操作至关重要。 三、Akka Stream集成 clickhouse-scala-...

    kafka kafka 2.13

    - **发布/订阅模式**:生产者发布消息到主题,消费者订阅感兴趣的主题并消费消息。 - **幂等性**:生产者可以多次发送同一条消息,但 broker 只会确保消息被处理一次,防止重复数据。 - **顺序保证**:在单分区下...

    Kafka 50道面试题和答案.docx

    Kafka 中有几个关键组件:主题、生产者、消费者、经纪人等。 主题 主题是 Kafka 中的一组或一堆消息。生产者发布消息到主题中,而消费者则订阅主题并读取和处理消息。 生产者 生产者是 Kafka 中的主要组件之一,它...

    worldwindjava源码-franzy:ClojureKafka客户端,支持Kafka生产者、消费者、重新平衡、管理和验证

    面向客户端的核心功能,即消费者、生产者、模式等。 Franzy-Common,Kafka 客户端 客户 使用 Clojure 管理 Kafka,获取 Clojure 数据输入/输出,创建主题,添加分区,列出代理等。 Franzy-Common,Kafka 服务器...

    中子:基于Fs2构建的纯功能性Scala Apache Pulsar客户端

    使用中子客户端,开发者可以方便地创建生产者和消费者,发布和接收Pulsar消息。客户端提供了丰富的API,用于设置主题、订阅模式、消息序列化和反序列化等。由于其纯函数式的设计,测试和调试也变得更加简单,可以...

    kafka-serde-scala:隐式将类型类编码器转换为kafka序列化器,反序列化器和Serde

    通过实现`Serde`,开发者可以自定义数据的序列化和反序列化方式,确保数据在生产者、消费者和Kafka主题之间的正确流动。`kafka-serde-scala`库允许我们无需显式实现这些接口,而是利用Scala的类型类机制,将已存在的...

    akka-rabbitmq:Scala和Akka演员中的RabbitMq客户端

    3. **消费者和生产者**:库支持两种类型的actor,一种是消费者,负责监听RabbitMQ队列并处理接收到的消息;另一种是生产者,用于发送消息到指定的队列。 4. **消息确认**:RabbitMQ支持消息确认机制,确保消息被...

    Kafka 实战演练 6

    消费者通过消费者组来组织,同一个组内的消费者会协作消费主题中的消息,确保每个消息只被消费一次(如果配置了正确的模式)。 ### 2. 消费者API Kafka提供了Java和Scala的消费者API,允许开发者编写应用程序来...

    KAFKA集群文档

    若每个消费者都属于不同的消费组,则每个消费者都会收到消息的副本,类似于发布-订阅模式。 总结而言,Kafka集群文档详细介绍了Kafka集群的基本概念、架构设计、工作原理以及如何进行消息生产和消费。文档从分区、...

    kafka安装包-2.13-3.6.2

    Kafka支持两种消费模式:单消费者模式(每个分区只能有一个消费者)和消费者组模式(多个消费者可以组成一个组,共享主题的分区)。 5. **Partitions**: 主题被分成多个分区,分区内的消息按照顺序存储,且每个分区...

    Kafka是一个分布式消息队列系统.docx

    - Kafka采用发布/订阅模式,生产者将消息发布到特定的Topic,消费者订阅并消费这些消息。 - 消息一旦发布到Kafka,就会被持久化到磁盘,并可供消费者随时消费。 **生产流程:** - 生产者将业务数据封装成...

    kafkacs_kafka消费_

    开发者可以使用Kafka提供的Java或Scala API来创建生产者实例,设置配置(如acks、batch size等),然后将消息发送到指定的主题。 2. **主题与分区**:主题是逻辑上的分类,可以理解为数据库中的表。每个主题可以被...

    大数据相关面试题Spark,Kakfa等

    生产者负责发布消息到主题,消费者订阅主题并消费消息,代理是 Kafka 的服务器端,负责存储和转发消息,主题是逻辑上的分类,将消息分区存储。 3. Kafka 如何保证消息的可靠性? Kafka 提供了消息持久化功能,将...

    kafka_2.12-2.3.0.zip

    4. 消费者(Consumer):从Kafka的主题中订阅并消费消息,支持多消费者组模式,实现负载均衡。 5. 集群(Cluster):由多个服务器(Broker)组成,提供高可用性和容错性。 二、Kafka的架构与特性 1. 分布式:Kafka...

Global site tag (gtag.js) - Google Analytics