`

Kafka的producer

 
阅读更多

producer示例代码

 

Producer  producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
String messageStr = new String("Message_" + messageNo);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));

  producer可以支持异步发送和同步发送两种方式

  config.producerType match {
    case "sync" =>
    case "async" =>
      sync = false
      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                       queue,
                                                       eventHandler,
                                                       config.queueBufferingMaxMs,
                                                       config.batchNumMessages,
                                                       config.clientId)
      producerSendThread.start()
  }

   客户端代码调用send方法发送数据,对于sync方法直接调用eventHandler.handle方法同步发送请求。我们先来看异步发送的方法

 

 

  /**
   * Sends the data, partitioned by key to the topic using either the
   * synchronous or the asynchronous producer
   * @param messages the producer data object that encapsulates the topic, key and message data
   */
  def send(messages: KeyedMessage[K,V]*) {
    lock synchronized {
      if (hasShutdown.get)
        throw new ProducerClosedException
      recordStats(messages)
      sync match {
        case true => eventHandler.handle(messages)
        case false => asyncSend(messages)
      }
    }
  }

   异步方法发送数据到queue

 

 

  private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
    for (message <- messages) {
      val added = config.queueEnqueueTimeoutMs match {
        case 0  =>
          queue.offer(message)
        case _  =>
          try {
            config.queueEnqueueTimeoutMs < 0 match {
            case true =>
              queue.put(message)
              true
            case _ =>
              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
            }
          }
          catch {
            case e: InterruptedException =>
              false
          }
      }
      if(!added) {
        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
      }else {
        trace("Added to send queue an event: " + message.toString)
        trace("Remaining queue size: " + queue.remainingCapacity)
      }
    }
  }

   异步处理事件的线程ProducerSendThread,这个线程会从queue中获取事件,当事件的个数达到或者超时时,就调用   handler.handle(events)方法去处理事件

private def processEvents() {
    var lastSend = SystemTime.milliseconds
    var events = new ArrayBuffer[KeyedMessage[K,V]]
    var full: Boolean = false

    // drain the queue until you get a shutdown command
    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
                      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
      currentQueueItem =>
        val elapsed = (SystemTime.milliseconds - lastSend)
        // check if the queue time is reached. This happens when the poll method above returns after a timeout and
        // returns a null object
        val expired = currentQueueItem == null
        if(currentQueueItem != null) {
          trace("Dequeued item for topic %s, partition key: %s, data: %s"
              .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
          events += currentQueueItem
        }

        // check if the batch size is reached
        full = events.size >= batchSize

        if(full || expired) {
          if(expired)
            debug(elapsed + " ms elapsed. Queue time reached. Sending..")
          if(full)
            debug("Batch full. Sending..")
          // if either queue time has reached or batch size has reached, dispatch to event handler
          tryToHandle(events)
          lastSend = SystemTime.milliseconds
          events = new ArrayBuffer[KeyedMessage[K,V]]
        }
    }
    // send the last batch of events
    tryToHandle(events)
    if(queue.size > 0)
      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
        .format(queue.size))
  }

 DefaultEventHandler的handle方法处理事件

 def handle(events: Seq[KeyedMessage[K,V]]) {
    val serializedData = serialize(events)
    serializedData.foreach {
      keyed =>
        val dataSize = keyed.message.payloadSize
        producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
        producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
    }
    var outstandingProduceRequests = serializedData
    var remainingRetries = config.messageSendMaxRetries + 1
    val correlationIdStart = correlationId.get()
    debug("Handling %d events".format(events.size))
    while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
      //如果超时了,就更新topic的meta data
      if (topicMetadataRefreshInterval >= 0 &&
          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        topicMetadataToRefresh.clear
        lastTopicMetadataRefreshTime = SystemTime.milliseconds
      }
      //发送数据
      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
      //如果失败了,更新失败的topic的metadata
      if (outstandingProduceRequests.size > 0) {
        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
        // back off and update the topic metadata cache before attempting another send operation
        Thread.sleep(config.retryBackoffMs)
        // get topics of the outstanding produce requests and refresh metadata for those
        Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        remainingRetries -= 1
        producerStats.resendRate.mark()
      }
    }
    if(outstandingProduceRequests.size > 0) {
      producerStats.failedSendRate.mark()
      val correlationIdEnd = correlationId.get()
      error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
        .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
        correlationIdStart, correlationIdEnd-1))
      throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
    }
  }

 

 private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
    //将发送的消息映射成要发送到的broker,以及相应的message的map
    val partitionedDataOpt = partitionAndCollate(messages)
    partitionedDataOpt match {
      case Some(partitionedData) =>
        val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
        try {
          for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
            if (logger.isTraceEnabled)
              messagesPerBrokerMap.foreach(partitionAndEvent =>
                trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
            val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
            //使用java的nio发送
            val failedTopicPartitions = send(brokerid, messageSetPerBroker)
            failedTopicPartitions.foreach(topicPartition => {
              messagesPerBrokerMap.get(topicPartition) match {
                case Some(data) => failedProduceRequests.appendAll(data)
                case None => // nothing
              }
            })
          }
        } catch {
          case t: Throwable => error("Failed to send messages", t)
        }
        failedProduceRequests
      case None => // all produce requests failed
        messages
    }
  }

 

 

分享到:
评论

相关推荐

    Kafka Producer机制优化-提高发送消息可靠性

    ### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...

    pentaho-kafka-producer.zip

    Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...

    第12单元 Kafka producer拦截器与Kafka Streams1

    "Kafka Producer拦截器与Kafka Streams" Kafka Producer拦截器是Kafka 0.10版本引入的主要用于实现clients端的定制化控制逻辑。 Producer拦截器使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制...

    jemter-kafka连接器

    《JMeter与Kafka连接器:构建高并发数据流测试》 在现代大数据处理系统中,Apache Kafka作为一款分布式消息中间件,广泛应用于实时数据流处理。为了验证和优化Kafka系统的性能,开发者通常需要进行大规模并发数据...

    kettle kafka 消息生产插件

    在文件“pentaho-kafka-producer”中,我们可以预期找到与这个插件相关的源代码、配置文件、文档或其他支持资源。这些资源可能包括: 1. **源代码**:可能包含Java或Groovy代码,实现了Kettle插件的逻辑,包括连接...

    kafka中文文档producer配置参数

    在Kafka中,Producer(生产者)负责将数据流发送到Kafka集群中。生产者的配置参数非常关键,因为它们会直接影响到生产者的行为和性能。以下是根据提供的文件内容整理的Kafka生产者配置参数的知识点: 1. bootstrap....

    flume.kafka:基于新 Kafka Producer 的 Flume kafka sink,可配置

    基于新 Kafka Producer 的 Flume kafka sink,高性能且可配置。 它依赖于很少的项目/库,只有 Flume 1.5.2 kafka-clients-0.8.2.1 或更高版本,slf4j。 类似于 Flume 1.6 KafkaSink,但这里有一些不同: Flume 1.6 ...

    Python-kafka集群搭建PythonAPI调用Producer和Consumer

    在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...

    kafka-pub-sub:带有Rest URL的Kafka Producer和Consumer API的Spring Boot应用程序

    带有Rest URL的Kafka Producer和Consumer API的Spring Boot应用程序 生产者:将数据或消息发送到kafka服务器的应用程序 消息:一小段数据,即kafka的字节数组 使用者:数据的接收者,即从kafka服务器读取数据 Kafka...

    kafka生产者连接池

    在分布式消息系统中,Kafka是一个广泛使用的高吞吐量、低延迟的开源消息队列。为了优化性能和提高效率,开发人员常常会利用连接池技术来管理Kafka生产者的连接。本文将深入探讨"Kafka生产者连接池"的概念、实现原理...

    kafka-producer-stub:一个简单的Kafka Producer存根

    @ hitmands / kafka-producer-stub 简单的Kafka堆栈可促进消费者的本地发展 入门 docker run -tid \ -e ' HKPS_BROKERS=localhost:9092 ' \ -v $( pwd ) /examples:/data \ hitmands/kafka-producer-stub:latest...

    kafka-java-demo 基于java的kafka生产消费者示例

    【Kafka Producer】 Kafka生产者是负责将数据发布到Kafka主题的组件。在Java中,我们可以创建一个Producer实例,配置相关的生产者属性(如bootstrap servers、key-value序列化类等),然后使用`send()`方法将消息...

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。

    阿里云消息队列kafka demo

    1. `Producer.scala`:包含Kafka Producer的实现,用于发布消息到Topic。 2. `Consumer.scala`:包含Kafka Consumer的实现,用于订阅Topic并消费消息。 3. `Config.scala`或`application.conf`:可能包含了连接阿里...

    kafkaProducerDemo.zip

    【Kafka Producer 消息发送】 在大数据处理和实时流计算领域,Apache Kafka 是一个不可或缺的组件。它是一个分布式消息中间件,用于构建实时数据管道和流应用。本示例将详细介绍如何使用 Java 1.8 和 Kafka 2.4.1 ...

    rust-kafka-producer:用锈写的kafka生产者的简单例子

    ./producer -b &lt;broker&gt; -t 您也可以使用vscode上的目标“调试可执行文件'生产者'”来调试可执行文件。 如何使用数据 要使用此生产者生成的数据,可以使用。 待办事项: 使用仅一次语义(EOS) 推送图像(二进制...

    kafka 知识要点,基于0.9、 0.10版本,很全面

    Kafka Producer 配置中的一些重要参数包括: - **metadata.broker.list**: 用于指定 Broker 和分区的静态信息。可以只提供部分 Broker 的信息,Kafka 会自动发现其他 Broker。 - **topic.metadata.refresh.interval...

    Kafka的一些常用功能点

    你可以编写Java、Python或其他语言的程序,使用Kafka的Producer API来发送消息。消息通常是键值对形式,Producer会决定如何将消息分配到Topic的不同分区。 3. **安装与启动Kafka** 安装Kafka涉及下载Apache Kafka...

    kafka_producer.zip

    在这个项目中,`kafka_producer.zip`包含了实现这一功能的所有必要文件:一个配置文件`config.conf`和一个Python脚本`producer.py`。 首先,让我们了解`config.conf`文件的作用。这个配置文件通常包含Kafka生产者...

Global site tag (gtag.js) - Google Analytics