producer示例代码
Producer producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); String messageStr = new String("Message_" + messageNo); producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
producer可以支持异步发送和同步发送两种方式
config.producerType match { case "sync" => case "async" => sync = false producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId, queue, eventHandler, config.queueBufferingMaxMs, config.batchNumMessages, config.clientId) producerSendThread.start() }
客户端代码调用send方法发送数据,对于sync方法直接调用eventHandler.handle方法同步发送请求。我们先来看异步发送的方法
/** * Sends the data, partitioned by key to the topic using either the * synchronous or the asynchronous producer * @param messages the producer data object that encapsulates the topic, key and message data */ def send(messages: KeyedMessage[K,V]*) { lock synchronized { if (hasShutdown.get) throw new ProducerClosedException recordStats(messages) sync match { case true => eventHandler.handle(messages) case false => asyncSend(messages) } } }
异步方法发送数据到queue
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { val added = config.queueEnqueueTimeoutMs match { case 0 => queue.offer(message) case _ => try { config.queueEnqueueTimeoutMs < 0 match { case true => queue.put(message) true case _ => queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS) } } catch { case e: InterruptedException => false } } if(!added) { producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark() producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark() throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) }else { trace("Added to send queue an event: " + message.toString) trace("Remaining queue size: " + queue.remainingCapacity) } } }
异步处理事件的线程ProducerSendThread,这个线程会从queue中获取事件,当事件的个数达到或者超时时,就调用 handler.handle(events)方法去处理事件
private def processEvents() { var lastSend = SystemTime.milliseconds var events = new ArrayBuffer[KeyedMessage[K,V]] var full: Boolean = false // drain the queue until you get a shutdown command Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)) .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach { currentQueueItem => val elapsed = (SystemTime.milliseconds - lastSend) // check if the queue time is reached. This happens when the poll method above returns after a timeout and // returns a null object val expired = currentQueueItem == null if(currentQueueItem != null) { trace("Dequeued item for topic %s, partition key: %s, data: %s" .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message)) events += currentQueueItem } // check if the batch size is reached full = events.size >= batchSize if(full || expired) { if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..") if(full) debug("Batch full. Sending..") // if either queue time has reached or batch size has reached, dispatch to event handler tryToHandle(events) lastSend = SystemTime.milliseconds events = new ArrayBuffer[KeyedMessage[K,V]] } } // send the last batch of events tryToHandle(events) if(queue.size > 0) throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue" .format(queue.size)) }
DefaultEventHandler的handle方法处理事件
def handle(events: Seq[KeyedMessage[K,V]]) { val serializedData = serialize(events) serializedData.foreach { keyed => val dataSize = keyed.message.payloadSize producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize) producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize) } var outstandingProduceRequests = serializedData var remainingRetries = config.messageSendMaxRetries + 1 val correlationIdStart = correlationId.get() debug("Handling %d events".format(events.size)) while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) //如果超时了,就更新topic的meta data if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds } //发送数据 outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) //如果失败了,更新失败的topic的metadata if (outstandingProduceRequests.size > 0) { info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() remainingRetries -= 1 producerStats.resendRate.mark() } } if(outstandingProduceRequests.size > 0) { producerStats.failedSendRate.mark() val correlationIdEnd = correlationId.get() error("Failed to send requests for topics %s with correlation ids in [%d,%d]" .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","), correlationIdStart, correlationIdEnd-1)) throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null) } }
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = { //将发送的消息映射成要发送到的broker,以及相应的message的map val partitionedDataOpt = partitionAndCollate(messages) partitionedDataOpt match { case Some(partitionedData) => val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] try { for ((brokerid, messagesPerBrokerMap) <- partitionedData) { if (logger.isTraceEnabled) messagesPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap) //使用java的nio发送 val failedTopicPartitions = send(brokerid, messageSetPerBroker) failedTopicPartitions.foreach(topicPartition => { messagesPerBrokerMap.get(topicPartition) match { case Some(data) => failedProduceRequests.appendAll(data) case None => // nothing } }) } } catch { case t: Throwable => error("Failed to send messages", t) } failedProduceRequests case None => // all produce requests failed messages } }
相关推荐
### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...
Pentaho Kafka Producer是一款用于Pentaho Data Integration(Kettle)平台的插件,它允许用户在数据集成过程中将数据流发布到Apache Kafka消息队列。Kafka是一个分布式流处理平台,广泛应用于实时数据管道和流应用...
"Kafka Producer拦截器与Kafka Streams" Kafka Producer拦截器是Kafka 0.10版本引入的主要用于实现clients端的定制化控制逻辑。 Producer拦截器使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制...
《JMeter与Kafka连接器:构建高并发数据流测试》 在现代大数据处理系统中,Apache Kafka作为一款分布式消息中间件,广泛应用于实时数据流处理。为了验证和优化Kafka系统的性能,开发者通常需要进行大规模并发数据...
在文件“pentaho-kafka-producer”中,我们可以预期找到与这个插件相关的源代码、配置文件、文档或其他支持资源。这些资源可能包括: 1. **源代码**:可能包含Java或Groovy代码,实现了Kettle插件的逻辑,包括连接...
在Kafka中,Producer(生产者)负责将数据流发送到Kafka集群中。生产者的配置参数非常关键,因为它们会直接影响到生产者的行为和性能。以下是根据提供的文件内容整理的Kafka生产者配置参数的知识点: 1. bootstrap....
基于新 Kafka Producer 的 Flume kafka sink,高性能且可配置。 它依赖于很少的项目/库,只有 Flume 1.5.2 kafka-clients-0.8.2.1 或更高版本,slf4j。 类似于 Flume 1.6 KafkaSink,但这里有一些不同: Flume 1.6 ...
在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...
带有Rest URL的Kafka Producer和Consumer API的Spring Boot应用程序 生产者:将数据或消息发送到kafka服务器的应用程序 消息:一小段数据,即kafka的字节数组 使用者:数据的接收者,即从kafka服务器读取数据 Kafka...
在分布式消息系统中,Kafka是一个广泛使用的高吞吐量、低延迟的开源消息队列。为了优化性能和提高效率,开发人员常常会利用连接池技术来管理Kafka生产者的连接。本文将深入探讨"Kafka生产者连接池"的概念、实现原理...
@ hitmands / kafka-producer-stub 简单的Kafka堆栈可促进消费者的本地发展 入门 docker run -tid \ -e ' HKPS_BROKERS=localhost:9092 ' \ -v $( pwd ) /examples:/data \ hitmands/kafka-producer-stub:latest...
【Kafka Producer】 Kafka生产者是负责将数据发布到Kafka主题的组件。在Java中,我们可以创建一个Producer实例,配置相关的生产者属性(如bootstrap servers、key-value序列化类等),然后使用`send()`方法将消息...
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。
1. `Producer.scala`:包含Kafka Producer的实现,用于发布消息到Topic。 2. `Consumer.scala`:包含Kafka Consumer的实现,用于订阅Topic并消费消息。 3. `Config.scala`或`application.conf`:可能包含了连接阿里...
【Kafka Producer 消息发送】 在大数据处理和实时流计算领域,Apache Kafka 是一个不可或缺的组件。它是一个分布式消息中间件,用于构建实时数据管道和流应用。本示例将详细介绍如何使用 Java 1.8 和 Kafka 2.4.1 ...
./producer -b <broker> -t 您也可以使用vscode上的目标“调试可执行文件'生产者'”来调试可执行文件。 如何使用数据 要使用此生产者生成的数据,可以使用。 待办事项: 使用仅一次语义(EOS) 推送图像(二进制...
Kafka Producer 配置中的一些重要参数包括: - **metadata.broker.list**: 用于指定 Broker 和分区的静态信息。可以只提供部分 Broker 的信息,Kafka 会自动发现其他 Broker。 - **topic.metadata.refresh.interval...
你可以编写Java、Python或其他语言的程序,使用Kafka的Producer API来发送消息。消息通常是键值对形式,Producer会决定如何将消息分配到Topic的不同分区。 3. **安装与启动Kafka** 安装Kafka涉及下载Apache Kafka...
在这个项目中,`kafka_producer.zip`包含了实现这一功能的所有必要文件:一个配置文件`config.conf`和一个Python脚本`producer.py`。 首先,让我们了解`config.conf`文件的作用。这个配置文件通常包含Kafka生产者...