`

kafka Consumer

 
阅读更多

 

   Kafka Consumer详细说明

  (一)如何使用Kafka Consumer

 

public void test() {
		Properties props = new Properties();
		// 集群中某个(或几个)地址,最好配置多个,防止单台失败
		props.put("bootstrap.servers", "localhost:9092");
		// 同一个group.id可以有多个实例,默认情况下,多个实例会被balance的去处理订阅的topic下的全部消息
		props.put("group.id", "test");
		// 自动commit,如果为true,consumer会自动向broker确认offset;
		props.put("enable.auto.commit", "true");
		// 自动commit的时间间隔
		props.put("auto.commit.interval.ms", "1000");
		props.put("key.deserializer", 
				"org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", 
				"org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("foo", "bar"));
		// 每个consumer实例都可以动态订阅不同topic
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records)
				System.out.printf("offset = %d, key = %s, value = %s%n", 
						record.offset(), record.key(), record.value());
		}
	}

 

 


(二)负载均衡和失败处理    

Kafka的consumer是线程非安全的,同一个group.id下,推荐使用多个实例(可以是在多个线程,也可以是多机服务器)来进行业务处理

       那么问题来了,既然是多个实例,会存在以下两个问题:

(1)负载均衡

(2)consumer失败的处理;

下面我们来看看kafka如何处理这两个问题:

(1)负载均衡:kafka通过平均的分配partition给同一个gropu.id下的多个consumer来达到负载均衡;比如,如果某个topic下有6个partition,2个consumer,那么每个实例将各自拥有三个partition;

(2)如果某个consumer失败(超时),那么原本分配给它的partition将会被重新分配给其他存活的consumer; 同样,新的consumer加入或者增加partition数量也会引起重新分配。

另外,通过ConsumerRebalanceListener,可以监听重新分配的动作,来进行相应的业务处理;

 

当然,我们也可以通过调用assign(Collection)方法手动来为consumer分配partition, 这样就不会发生动态分配partition的情况:     

 

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

 

 

注意:assign和subscribe两种模式不能混用。 

 

 

    (三)Kafka是如何发现Consumer失败?

Consumer在调用pool(timeout)方法后会自动加入group,consumer会在后台发送心跳,如要在session.timeout.ms时间内,集群没有收到心跳,则判断consumer挂掉;

在某些情况下,Conusmer有可能碰到一种“livelock”的情况,就是后台一直在发送心跳,但实际上poll()方法没有执行;

这种情况下,单从心跳已经无法判断consumer是否真的挂掉了,所以kafka提供了  max.poll.interval.ms 这个设置来定义两次poll()调用的最大间隔。 如果超过这个时间,也会判断consumer挂掉,当前consumer会出现offset commit failure;

 

 max.poll.interval.ms :需要根据业务流程长度来定义合适的时间;         

 max.poll.records : 控制每次poll的消息数据,防止业务处理时间过长;

 

 

 

    (四)手工管理offset

(1)确认全部消息:

     

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");    //关闭自动commit
     props.put("key.deserializer", 
               "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", 
               "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();              // 手动commit全部消息
             buffer.clear();
         }
     }

 

 

在业务完成后,手动调用commitSync()来commit offset,可以达到每条消息“至少一次投递”的效果;如果业务操作失败,offset不会更新,消费失败的消息可以再次获取到;

 

note: 在enable.auto.commit设置为true的情况下,如果想要做到“至少一次投递”,必须在每次调用poll()方法前消费掉所有已获取的消息,或者保证在consumer.close()前消费掉所有已获取的消息,否则会出现消息丢失。

 

 

  (2)按partition进行确认:

 

 ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //获取每个partition最后一条消息的offset
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); //commit的offset+1,指向下一条未被消费的消息
             }

 

 

   (五)外部存储offset

外部存储的好外:如果要达到“有且只有一次投递”的效果,可以使用外部存储offset的方式。比如,在同一个事务里完成业务和offset的记录;(2)或者在存储数据文件的同时存储offset;

如何使用存储:

(1)将enable.auto.commit设置为false;

(2)通过调用方法:seek(TopicPartition, long),让offset指向最新位置;供下一次调用poll()方法时使用;

  (3)   seek()方法也可以用来跳过一段消息,或者重复消费旧的消息。

自动分配partition时的offset处理;

对于手工管理partition分配置的consumer来说,上面的做是最简单的实现。但对于自动分配partition的consume来说,就需要通过 subscribe(Collection, ConsumerRebalanceListener) 来实现了:

  (六)流速控制 

通过pause(Collection)  、resume(Collection)暂停或者继续某些partition的消费动作。

(七)事务消息     

事务消息可以跨topic和partition; 后续再补充

 

(八)多线程消费消息

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;

     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(10000);
                 // Handle new records
             }

         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }

     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup(); //调用wakeup后,poll()方法会抛出WakeupException
     }
 }

 

    

 (九)消费者数量与partition数量的关系

一个partition只能被同一个group内的一个consumer消费,所以一个group内消费者的数量,不要大于partition的数量,否则多余的consumer不会收到消息;

 

          (1)消费者数量 > partition数量

 

 

        

 

           (2)partition数量> consumer数量

 



  

 

 

  • 大小: 55.9 KB
  • 大小: 86.3 KB
分享到:
评论

相关推荐

    RdKafka::KafkaConsumer使用实例

    在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...

    kafkaConsumerDemo.zip

    1. **创建Consumer实例**:首先,我们需要配置一个`Properties`对象,设置必要的参数如bootstrap servers、group id等,然后使用`KafkaConsumer`类的构造函数创建消费者实例。 2. **订阅主题**:消费者通过调用`...

    Go-Go-consumergroup采用golang编写的kafkaconsumer库

    《Go-consumergroup:构建基于Golang的Kafka消费者库》 在现代软件开发中,消息队列系统如Apache Kafka扮演着至关重要的角色,它提供了高效、可靠的异步通信能力。而Go语言以其简洁的语法和高性能特性,成为了编写...

    KafkaConsumerDemo.java

    KafkaConsumerDemo.java

    kafka demo ,两种线程消费方式

    Kafka是一个发布/订阅模型的消息队列,它包含生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责发布消息到主题,而消费者则订阅这些主题并消费消息。消费者通过消费者组(Consumer Group)进行...

    kafka java 下载的jar

    KafkaConsumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, String&gt; records = consumer.poll(Duration.ofMillis(100)); ...

    kafkaconsumer

    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} import java.util.Properties val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...

    1号店电商实时数据分析系统-14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试.pptx

    项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试15.项目1-地区销售额-Bolt业务逻辑处理一16.项目1-地区销售额-优化Bolt支持重启及结果数据核查17.项目1-地区销售额-HighCharts图表开发一及Web端架构设计18....

    Kafka C++客户端库librdkafka笔记

    - KafkaConsumer:是创建消费者实例的静态接口。 - Consumer:是消费者结构,处理消费相关的逻辑。 - KafkaConsumerImpl:是消费者实现的具体封装。 librdkafka还提供了消费者回调机制,如ConsumeCb和EventCb,它们...

    Kafka设计解析(五)-KafkaConsumer设计解析

    很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播)。因此,KafkaHightL

    phpkafkaconsumer是一个kafkaconsumer库支持group和rebalance

    【标题】"phpkafkaconsumer"是一个专门为PHP设计的Kafka消费者库,它不仅提供了基本的Kafka消息消费功能,还特别支持了消费者组(Consumer Group)和再平衡(Rebalance)机制。Kafka是一种分布式流处理平台,常用于...

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看...

    kafka 插件kafka 插件kafka 插件

    **Kafka插件详解** Kafka插件是Apache Kafka与各种工具集成的重要组成部分,它使得开发者和运维人员能够更方便地在不同的系统中利用Kafka的功能。Kafka是一款分布式流处理平台,常用于构建实时数据管道和流应用,...

    springboot 基于spring-kafka动态创建kafka消费者

    5. **运行与测试**:启动Spring Boot应用,当`kafka.consumer.enabled`设置为`true`时,消费者将开始监听指定的Kafka主题。你可以通过发送消息到该主题来测试消费者的运行情况。 以上就是基于Spring Boot和Spring ...

    kafka_hdfs_consumer

    【标题】"kafka_hdfs_consumer"涉及到的关键技术是将数据从Kafka消费并存储到HDFS(Hadoop Distributed File System)中。这个过程通常在大数据处理和流处理场景下非常常见,它允许实时或近实时的数据从消息队列流向...

    KafkaConsumer:Spring Boot Kafka消费者

    在本文中,我们将深入探讨如何在Spring Boot应用中使用KafkaConsumer来消费Apache Kafka主题中的消息。Apache Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。Spring Boot简化了Kafka消费者的...

    kafka-consumer:Kafka消费者示例

    KafkaConsumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("myTopic")); // 订阅主题 while (true) { ConsumerRecords, String&gt; records = consumer.poll...

    Kafka-MySQL-Avro:Kafka Consumer将avro记录插入mysql

    Kafka Consumer将avro记录插入mysql git clone https://github.com/cahuja1992/Kafka-MySQL-Avro.git python kafka-mysql-avro/setup.py install #!/usr/bin/env python from divoltemysql.kafkamysql import ...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    在IT行业中,网络通信和大数据处理是两个至关重要的领域,Netty和Kafka分别是这两个领域的佼佼者。Netty是一个高性能、异步事件驱动的网络应用程序框架,常用于开发高并发、低延迟的网络应用,如TCP服务器。而Kafka...

Global site tag (gtag.js) - Google Analytics