`
distantlight1
  • 浏览: 44269 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

kafka producer线程与吞吐量

阅读更多

1.问题背景

kafka是以高吞吐量著称的,但日前解决一个实际问题中,发现使用不当仍会无法充分利用起吞吐量。我们的场景如下:

有两个kafka集群,需要从上游kafka读一个topic的消息,做一些自定义处理,再写到下游kafka的特定topic(有人说用flume,确实可以,不过自定义处理比较复杂的时候用flume就有点麻烦了)

这里集中在写这一端(读没有问题),开始使用最简单的方式,配一个Producer的bean,然后Producer.send()写下游。压测的时候发现写出去的流量很低,单进程出口流量大概只有1-2Mbps,低的难以接受

 

2.配置项

开始以为是配置有问题,所以尝试修改一些Producer的配置项。我们用的是异步模式(producer.sync=false),设置了一些提高吞吐量的配置项(包括有些可能牺牲数据一致性的选项),主要包括下面这些项

queue.enqueue.timeout.ms=0 #异步队列满后不阻塞

batch.num.messages=500 #加大异步发送批次大小(减少连接次数)

compression.codec=snappy #使用消息压缩

request.required.acks=-1 #不要求接收端回复ack

修改配置后吞吐量确实有一些提升,出口流量到5Mbps左右,但是仍然远低于预期,说明配置不是主要问题

 

3.源码排查

配置无法解决,只好去查一下源码。看到异步模式下Producer实际发送是在一个独立的线程类ProducerSendThread中进行,然后关键来了:一个Producer实例只包含一个ProducerSendThread线程(1对1,相关源码如下)

class Producer[K,V](val config: ProducerConfig,
                    private val eventHandler: EventHandler[K,V])  // only for unit testing
  extends Logging {

  private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)

  private var sync: Boolean = true
  private var producerSendThread: ProducerSendThread[K,V] = null
  private val lock = new Object()

  config.producerType match {
    case "sync" =>
    case "async" =>
      sync = false
      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue,  eventHandler, config.queueBufferingMaxMs, config.batchNumMessages, config.clientId)
      producerSendThread.start()  // 初始化发送线程
  }

  private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)

  KafkaMetricsReporter.startReporters(config.props)
  AppInfo.registerInfo()

  def this(config: ProducerConfig) =
    this(config,
         new DefaultEventHandler[K,V](config,
                                      CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),
                                      CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),
                                      CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),
                                      new ProducerPool(config)))

  /**
   * Sends the data, partitioned by key to the topic using either the
   * synchronous or the asynchronous producer
   * @param messages the producer data object that encapsulates the topic, key and message data
   */
  def send(messages: KeyedMessage[K,V]*) {
    lock synchronized {
      if (hasShutdown.get)
        throw new ProducerClosedException
      recordStats(messages)
      sync match {
        case true => eventHandler.handle(messages)
        case false => asyncSend(messages)
      }
    }
  }

  private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
    for (message <- messages) {
      val added = config.queueEnqueueTimeoutMs match {
        case 0  =>
          queue.offer(message)
        case _  =>
          try {
            config.queueEnqueueTimeoutMs < 0 match {
            case true =>
              queue.put(message)
              true
            case _ =>
              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
            }
          }
          catch {
            case e: InterruptedException =>
              false
          }
      }
      if(!added) {
        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
      }else {
        trace("Added to send queue an event: " + message.toString)
        trace("Remaining queue size: " + queue.remainingCapacity)
      }
    }
  }

}

  

然后Spring的bean默认又是单例的,所以实际上每个进程只有一个线程在写kafka,而单线程的吞吐量显然是有限的(并没有完全利用kafka集群的高吞吐量)

 

4.解决方法

既然kafka Producer是单线程的,那么就在上层封装一个Producer的实例池,进行并发写。优化以后,使用10个线程写,出口流量显著提升到了60Mbps左右

	<bean id="producerClient" class="com.halo.kafka.producer.client.ProducerClient" init-method="init"
		destroy-method="close" scope="prototype">
		<property name="brokerList" value="${borker.list}"/>
		<property name="sync" value="false"/>
                ......
	</bean>

	<bean id="producerPool" class="com.halo.dc.support.KafkaProducerPool">
		<property name="threadNum" value="${thread.num}"/>
	</bean>

 

public class KafkaProducerPool implements ApplicationContextAware {

	private ProducerClient[] pool;

	private int threadNum;
	private int index = 0; // 轮循id

	private static final Logger logger = LoggerFactory.getLogger(KafkaProducerPool.class);

	@Override
	public void setApplicationContext(ApplicationContext ctx) throws BeansException {
		logger.info("Init DCKafkaProducerPool: threadNum=" + threadNum);
		pool = new ProducerClient[threadNum];
		for (int i = 0; i < threadNum; i++) {
			pool[i] = ctx.getBean(ProducerClient.class);
		}
	}

	public void send(String topic, String message) {
		pool[index++ % threadNum].send(topic, message);
	}
}

 

 

5.总结

kafka的高吞吐量是针对服务端(集群)而言,并不是针对单客户端。具体到Producer端,需要自己创造多线程并发环境才能提高客户端的出口吞吐量,kafka并没有提供类似线程池的api(也许有?)。kafka设计上是针对分布式的,实际场景通常是有很多客户端(多进程),这也许是没有提供多线程api的原因。但是某些情况下仍然需要自己实现多线程写,比如压测的压力端,或者类似上面提到的处理转发场景

分享到:
评论

相关推荐

    单线程Kafka代码实例

    Kafka是一个高吞吐量、低延迟的消息队列系统,由LinkedIn开源并维护,现在是Apache软件基金会的一部分。它支持发布和订阅模式,允许生产者发布消息到主题(Topic),消费者则订阅这些主题并消费消息。 在单线程...

    kafka配置调优实践

    * 定期批量写文件可以大幅度提高 producer 写入吞吐量。 * 建议:每当 producer 写入 10000 条消息时,刷数据到磁盘log.flush.interval.messages=10000,每间隔 1 秒钟时间,刷数据到磁盘log.flush.interval.ms=1000...

    kafka集群方案选型

    从上表可以看到,Kafka的吞吐量最高,达到29w/s,而ActiveMQ和RabbitMQ的吞吐量相对较低。同时,Kafka的开发语言为Scala/Java,主要维护者为Apache,成熟度也较高。 二、Kafka集群方案选型 Kafka集群方案选型有...

    C#_Kafka_Demo.rar

    它被设计为高吞吐量、低延迟的消息系统,广泛应用于实时数据管道和流式应用。在.NET环境中,我们通常使用Kafka.Net库来与Kafka进行交互。以下将详细讲解如何利用C#和Kafka.Net实现在.NET Framework 4.5环境下构建一...

    kafka2.3.0 linux版本

    Kafka主要设计目标是提供高吞吐量、低延迟的消息发布与订阅服务,同时也支持数据持久化和实时数据处理。在Linux环境下部署Kafka 2.3.0,可以充分利用Linux的稳定性和高性能,使其成为大数据实时处理的核心组件。 **...

    kafka学习笔记.doc

    消费者实例的并行度取决于分区数量,每个分区只能在一个线程中消费,因此合理分配分区数量和消费者实例可以优化吞吐量。 7. Kafka控制器Kafka控制器是集群中的关键组件,负责管理和维护集群的元数据,包括分区的...

    kafka-2.12.0.11.01版本

    Kafka是一款高吞吐量的分布式消息系统,由LinkedIn开发并开源,现在已经成为Apache软件基金会的顶级项目。它设计目标是提供一个低延迟、高吞吐量、持久化的发布/订阅消息系统,支持大数据的实时处理。Kafka 2.12....

    kafka examples source code

    它以其高吞吐量、可扩展性和容错性而备受赞誉。在本文中,我们将深入探讨Kafka 2.10_0.10.1.0版本的代码示例,以帮助开发者更好地理解和应用这个强大的工具。 Kafka的核心组件包括生产者(Producer)、消费者...

    Kafka性能测试报告.pdf

    测试结果表明,work6的处理能力和吞吐量大致为hmaster的两倍,可以判断出producer的性能瓶颈不是磁盘IO,而可能是CPU。 五、异步API性能测试 使用Kafka提供的异步API性能测试工具kafka-run-class.sh org.apache....

    win32 C++ kafka 库

    它被设计为高吞吐量、低延迟的消息中间件,适用于构建实时数据管道和流应用。本资源专注于在Windows 32位环境下,利用C++语言与Kafka进行交互的库。 标题"win32 C++ kafka 库"指的是一个专门为32位Windows操作系统...

    kafka开发运维实战分享.pptx

    Kafka 是一个高吞吐量、分布式的消息发布与订阅系统,常用于大数据处理和实时流数据平台。本篇内容将深入探讨 Kafka 的开发和运维实践,帮助读者理解其核心特性和在生产环境中的最佳实践。 ### 1. Kafka 版本与特性...

    kafka大文件的代码

    - `buffer.memory`:定义生产者用于缓存未发送消息的内存大小,应根据系统资源和预期吞吐量调整。 - `compression.type`:指定压缩算法,如gzip或lz4,压缩可以减小网络传输的数据量,适合处理大文件。 2. **消费...

    springboot-kafka

    Kafka 提供高吞吐量、低延迟、持久化、可扩展性和容错性,是大数据实时处理的理想选择。 2. **Spring Boot 与 Kafka 结合** Spring Boot 提供了 `spring-kafka` 模块,使得开发者可以利用 Spring 的注解驱动和配置...

    Kafka技术参考手册.docx

    它由Apache软件基金会开发,使用Scala和Java编写,支持高吞吐量的消息传递,常用于处理网站用户行为数据、日志聚合、流式数据处理等场景。 **发布与订阅** Kafka的核心功能是发布和订阅消息。生产者(Publishers)...

    千锋-kafka11

    Kafka是由LinkedIn开发并贡献给Apache基金会的一个开源流处理平台,它最初设计的目标是作为一个高吞吐量、低延迟的消息中间件,用于构建实时数据管道和流应用。Kafka以其出色的可扩展性和容错性,以及对大规模数据...

    kafka_study.zip

    Kafka是一款高吞吐量、分布式的消息中间件,常用于实时数据流处理和大数据分析。以下是关于Kafka、SpringBoot以及它们之间整合的关键知识点: 1. **Kafka基础知识**: - **主题(Topic)**:Kafka中的基本单位,...

    Springboot整合kafka做消息通信_20200410.docx

    Kafka 是一个分布式流处理平台,由 Apache 软件基金会开发,它允许应用程序发布和订阅实时数据流,提供高吞吐量、低延迟的消息传递。 一、Kafka 原理: Kafka 的核心概念包括 Topic、Partition 和 Broker。Topic 是...

    最新最全kafka学习文档

    * 为了方便扩展和提高吞吐量,一个topic可以被分成多个partition。 * 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费(注意:一个partition中的数据只能由消费者组中的一个group来消费)。 * 为了提高...

    Apache Kafka.pdf

    Kafka 的设计灵感来源于传统的消息队列系统,但它更侧重于处理流数据,并且能够为应用程序提供一个统一的、高吞吐量的数据处理模型。 #### 二、使用场景 Kafka 主要应用于以下几个领域: 1. **日志聚合**:Kafka ...

Global site tag (gtag.js) - Google Analytics