`
amazon10
  • 浏览: 29469 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

Kafka的三种消费模式

阅读更多
  • 自动提交offset
     以下实例代码展示了如何自动提交topic的offset:
public void autoOffsetCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers""localhost:9092");
    props.put("group.id""test");
    props.put("enable.auto.commit""true");
    props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<StringString> consumer = new KafkaConsumer<StringString>(props);
    consumer.subscribe(Arrays.asList("foo""bar"));
    while (true) {
        ConsumerRecords<StringString> records = consumer.poll(100);
        for (ConsumerRecord<StringString> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n"record.offset()record.key()record.value());
        }
    }
}
     Properties的实例props中存放的key意义:
     1)bootstrap.servers表示要连接的Kafka集群中的节点,其中9092表示端口号;
     2)enable.auto.commit为true,表示在auto.commit.interval.ms时间后会自动提交topic的offset,其中auto.commit.interval.ms默认值为5000ms;
     3)其中foo和bar为要消费的topic名称,由group.id为test作为consumer group统一进行管理;
     4)key.deserializer和value.deserializer表示指定将字节序列化为对象。
  • 手动提交offset
      生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。
      以下实例代码展示了如何手动提交topic的offset:
     
public void manualOffsetCommit() {
    Properties props = new Properties();
    props.put("bootstrap.servers""localhost:9092");
    props.put("group.id""test");
    props.put("enable.auto.commit""false");
    props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer""org.apache.kafka.common.serialization.Stirng.Deserializer");

    KafkaConsumer<StringString> consumer = new KafkaConsumer<StringString>(props);
    consumer.subscribe(Arrays.asList("foo""bar"));
    final int minBatchSize = 200;
    List<ConsumerRecord<StringString>> buffer = new ArrayList<>();
    while (true) {
        ConsumerRecords<StringString> records = consumer.poll(100);
        for (ConsumerRecord<StringString> record : records) {
            buffer.add(record);
        }

        if (buffer.size() >= minBatchSize) {
            // operation to handle data
            consumer.commitSync();
            buffer.clear();
        }
    }
}
 
      本方案的缺点是必须保证所有数据被处理后,才提交topic的offset。为避免数据的重复消费,可以用第三种方案,根据每个partition的数据消费情况进行提交,称之为“at-least-once”。
  • 手动提交partition的offset
     以下实例代码展示了如何手动提交topic中每一partition的offset:
     
public void manualOffsetCommitOfPartition() {
    Properties props = new Properties();
    props.put("bootstrap.servers""localhost:9092");
    props.put("group.id""test");
    props.put("enable.auto.commit""false");
    props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer""org.apache.kafka.common.serialization.Stirng.Deserializer");

    KafkaConsumer<StringString> consumer = new KafkaConsumer<StringString>(props);
    consumer.subscribe(Arrays.asList("foo""bar"));

    boolean running = true;
    try {
        while (running) {
            ConsumerRecords<StringString> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<StringString>> partitionRecords = records.records(partition);
                for (ConsumerRecord<StringString> record : partitionRecords) {
                    System.out.println(record.offset() + " : " + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    } finally {
        consumer.close();
    }
}

 

 
分享到:
评论

相关推荐

    kafka demo 两种线程消费方式

    【标题】"Kafka Demo 两种线程消费方式"展示了在Kafka中处理消息的两种常见线程模型,这是理解Kafka消费者工作原理的关键部分。Kafka是一个分布式流处理平台,用于构建实时数据管道和应用,它允许生产者发布消息到...

    springboot 基于spring-kafka动态创建kafka消费者

    本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:Kafka是一种分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、低延迟的消息传递,...

    kafka demo ,两种线程消费方式

    在本文中,我们将深入探讨Apache Kafka的两种线程消费方式,这是基于提供的标题"Kafka Demo,两种线程消费方式"。...记住,无论选择哪种方式,都需要确保代码的正确性和线程安全性,以及对Kafka消费者API的熟练掌握。

    kafka模拟生产者消费者(集群模式)实例

    在本文中,我们将深入探讨如何在集群模式下模拟Kafka的生产者和消费者。Kafka是一种分布式流处理平台,常用于大数据实时处理和消息传递。它由Apache开发,以其高吞吐量、低延迟和可扩展性而闻名。 首先,我们要理解...

    kafka生产和消费示例

    这里的`@KafkaListener`注解使得`listen`方法成为Kafka消费者,它会监听"testTopic"上的消息。当有新消息到达时,该方法会被调用,打印出接收到的消息。 在Spring MVC的环境中,你可以将消费者方法与业务逻辑结合...

    SpringBoot整合kafka,代码简洁,自动分配分区和指定分区消费(亲测可用)

    在本文中,我们将深入探讨如何将SpringBoot与Apache Kafka进行集成,实现消息生产和消费功能,同时涵盖自动分配分区和指定分区消费的策略。SpringBoot以其轻量级、快速开发的特性,深受开发者喜爱,而Kafka作为一个...

    kafka生产者消费者实例

    4. **offset管理**:消费者负责维护自己的消费位置,Kafka 提供自动提交和手动提交两种模式。 了解并熟练掌握这些核心概念,将有助于我们构建高效、可靠的数据处理系统。在实际应用中,根据业务需求选择合适的配置...

    大数据采集技术-Kafka的消费模式.pptx

    **大数据采集技术——Kafka消费模式详解** 在大数据领域,数据采集是整个流程的基础,而Apache Kafka作为一个高效、可扩展的分布式流处理平台,扮演着关键角色。Kafka以其高吞吐量、低延迟和持久化特性,在实时数据...

    kafka生产及消费示例代码,下到就是赚到

    5. **消息模型**:Kafka支持发布/订阅模式,生产者发送消息到主题,消费者订阅主题来接收消息。示例代码可能展示了如何定义和使用这种模型。 6. **Offset管理**:消费者组中的每个消费者都会维护一个offset,表示其...

    springboot整合kafka的发布/消费demo项目源码

    在本文中,我们将深入探讨如何将Spring Boot与Apache Kafka整合,以便实现发布/消费消息队列模式。Apache Kafka是一款高效、可扩展且分布式的消息中间件,而Spring Boot则是一个简化了Spring应用程序开发的框架。...

    kafka-java-demo 基于java的kafka生产消费者例子

    Kafka是一种高吞吐量、低延迟的消息队列系统,它允许应用程序以发布/订阅模式进行通信。在Kafka中,数据以主题(Topic)的形式存储,每个主题可以分为多个分区(Partition),确保数据的分布和负载均衡。此外,Kafka...

    java kafka 生产者/消费者demo

    二、Kafka消费者 1. 添加依赖:同样需要添加Kafka的Java客户端库。 2. 创建消费者实例:配置消费者的属性,如bootstrap servers,group.id(消费组ID),key.deserializer和value.deserializer。 3. 注册监听器:...

    kafka安装包-2.13-3.6.2

    Kafka支持两种消费模式:单消费者模式(每个分区只能有一个消费者)和消费者组模式(多个消费者可以组成一个组,共享主题的分区)。 5. **Partitions**: 主题被分成多个分区,分区内的消息按照顺序存储,且每个分区...

    Kafka the Definitive Guide 2nd Edition

    * 消费者组管理:Kafka 的消费者组管理机制,用于管理消费者组和消费者。 * 集群元数据管理:Kafka 的集群元数据管理机制,用于管理集群和代理节点。 6. Kafka 的应用 Kafka 的应用包括: * 数据分析平台:Kafka ...

    代码:kafka数据接入到mysql中

    - 为了确保数据的一致性,可以设置Kafka Connect为幂等模式,这样即使同一消息被多次处理,MySQL中的数据也不会重复。 - 另外,可以设置适当的Kafka保留策略,避免数据丢失。 7. **异常处理和故障恢复** - 如果...

    java操作kafka

    ### 三、Java Kafka消费者 消费者从Kafka主题中读取和处理消息。消费者的知识点包括: 1. **配置**: 与生产者类似,消费者也需要配置,如group.id(消费者组标识)和enable.auto.commit(是否自动提交偏移量)等。...

    kafka-2.13-3.4.0.tgz

    Kafka支持发布订阅模式,生产者发布消息到主题(Topics),消费者订阅感兴趣的主题并消费消息。这种模式灵活且易于扩展,适合处理多种场景的数据交换。 5. **消费者组**: Kafka引入了消费者组的概念,每个组内的...

    kafkacs_kafka消费_

    2. **消费模式**:有两种消费模式——拉取(Pull)和推送(Push)。默认情况下,消费者主动向服务器请求数据(拉取)。Kafka Connect提供了一种自动推送模式,通过持续轮询来获取新数据。 3. **偏移量管理**:消费...

    Flume采集数据到Kafka,然后从kafka取数据存储到HDFS的方法思路和完整步骤

    #### 三、Flume采集数据至Kafka **3.1 配置Flume Agent** 1. **创建Flume配置文件**: - 编写Flume配置文件,例如`a1.sources = r1`用于定义数据源。 2. **配置Flume Source**: - 使用`exec`或`spooling_...

    kafka可视化工具--kafkatool

    7. **性能测试**:Kafkatool包含一个简单的生产者模式,可以用于测试Kafka集群的吞吐量和延迟,帮助优化集群性能。 **安装与使用Kafkatool**: 在Windows环境下,你可以下载名为“kafkatool_64bit.exe”的可执行...

Global site tag (gtag.js) - Google Analytics