`
flylynne
  • 浏览: 375993 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Kafka几种消息方式

 
阅读更多

1.消费位移确认

Kafka消费者消费位移确认有自动提交与手动提交两种策略。在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。手动提交需要由客户端自己控制偏移量的提交。

(1)自动提交。在创建一个消费者时,默认是自动提交偏移量,当然我们也可以显示设置为自动。例如,我们创建一个消费者,该消费者自动提交偏移量

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 创建消费者
consumer.subscribe(Arrays.asList("test"));// 订阅主题

while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

Properties的实例props中存放的key意义:
1)bootstrap.servers表示要连接的Kafka集群中的节点,其中9092表示端口号;
2)enable.auto.commit为true,表示在auto.commit.interval.ms时间后会自动提交topic的offset,其中auto.commit.interval.ms默认值为5000ms;
3)其中foo和bar为要消费的topic名称,由group.id为test作为consumer group统一进行管理;
4)key.deserializer和value.deserializer表示指定将字节序列化为对象。

(2)手动提交offset。   生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。

在有些场景我们可能对消费偏移量有更精确的管理,以保证消息不被重复消费以及消息不被丢失。假设我们对拉取到的消息需要进行写入数据库处理,或者用于其他网络访问请求等等复杂的业务处理,在这种场景下,所有的业务处理完成后才认为消息被成功消费,这种场景下,我们必须手动控制偏移量的提交。

Kafka 提供了异步提交(commitAsync)及同步提交(commitSync)两种手动提交的方式。两者的主要区别在于同步模式下提交失败时一直尝试提交,直到遇到无法重试的情况下才会结束,同时,同步方式下消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误。而异步方式下消费者线程不会被阻塞,可能在提交偏移量操作的结果还未返

回时就开始进行下一次的拉取操作,在提交失败时也不会尝试提交。

实现手动提交前需要在创建消费者时关闭自动提交,即设置enable.auto.commit=false。然后在业务处理成功后调用commitAsync()或commitSync()方法手动提交偏移量。由于同步提交会阻塞线程直到提交消费偏移量执行结果返回,而异步提交并不会等消费偏移量提交成功后再继续下一次拉取消息的操作,因此异步提交还提供了一个偏移量提交回调的方法commitAsync(OffsetCommitCallback callback)。当提交偏移量完成后会回调OffsetCommitCallback 接口的onComplete()方法,这样客户端根据回调结果执行不同的逻辑处理。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("fetch.max.bytes", 1024);// 为了便于测试,这里设置一次fetch 请求取得的数据最大值为1KB,默认是5MB
props.put("enable.auto.commit", false);// 设置手动提交偏移量
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Arrays.asList("test"));
try {
      int minCommitSize = 10;// 最少处理10 条消息后才进行提交
      int icount = 0 ;// 消息计算器
      while (true) {
// 等待拉取消息
           ConsumerRecords<String, String> records = consumer.poll(1000);
           for (ConsumerRecord<String, String> record : records) {
// 简单打印出消息内容,模拟业务处理
                System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record. partition(), record.offset(), record.key(),record.value());
                icount++;
           }
// 在业务逻辑处理成功后提交偏移量
      if (icount >= minCommitSize){
           consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                     if (null == exception) {
                     // TODO 表示偏移量成功提交
                          System.out.println("提交成功");
                   } else {
                     // TODO 表示提交偏移量发生了异常,根据业务进行相关处理
                          System.out.println("发生了异常");
                     }
                 }
                });
           icount=0; // 重置计数器
      }
}
} catch(Exception e){
// TODO 异常处理
      e.printStackTrace();
} finally {
      consumer.close();
}

本方案的缺点是必须保证所有数据被处理后,才提交topic的offset。为避免数据的重复消费,可以用第三种方案,根据每个partition的数据消费情况进行提交,称之为“at-least-once”。

3.手动提交partition的offset

Kafka 在0.10.1.1 版本增加了时间戳索引文件,因此我们除了直接根据偏移量索引文件查询消息之外,还可以根据时间戳来访问消息。consumer-API 提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法入参为一个Map 对象,Key 为待查询的分区,Value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要注意的是,若待查询的分区不存在,则该方法会被一直阻塞。

假设我们希望从某个时间段开始消费,那们就可以用offsetsForTimes()方法定位到离这个时间最近的第一条消息的偏移量,在查到偏移量之后调用seek(TopicPartition partition, long offset)方法将消费偏移量重置到所查询的偏移量位置,然后调用poll()方法长轮询拉取消息。例如,我们希望从主题“stock-quotation”第0 分区距离当前时间相差12 小时之前的位置开始拉取消息

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
try {
      Map<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition,Long>();
      // 构造待查询的分区
      TopicPartition partition = new TopicPartition("stock-quotation", 0);
      // 设置查询12 小时之前消息的偏移量
      timestampsToSearch.put(partition, (System.currentTimeMillis() - 12 * 3600 * 1000));
      // 会返回时间大于等于查找时间的第一个偏移量
      Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes (timestampsToSearch);
      OffsetAndTimestamp offsetTimestamp = null;
      // 这里依然用for 轮询,当然由于本例是查询的一个分区,因此也可以用if 处理
      for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
      // 若查询时间大于时间戳索引文件中最大记录索引时间,
      // 此时value 为空,即待查询时间点之后没有新消息生成
           offsetTimestamp = entry.getValue();
           if (null != offsetTimestamp) {
           // 重置消费起始偏移量
                consumer.seek(partition, entry.getValue().offset());
           }
           }
      while (true) {
      // 等待拉取消息
      ConsumerRecords<String, String> records = consumer.poll(1000);
      for (ConsumerRecord<String, String> record : records){
      // 简单打印出消息内容
           System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record.partition(), record.offset(), record.key(),record.value());
      }
      }
} catch (Exception e) {
      e.printStackTrace();
} finally {
      consumer.close();
}

4、消费速度控制

提供 pause(Collection<TopicPartition> partitions)和resume(Collection<TopicPartition>
partitions)方法,分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作。通过这两个方法可以对消费速度加以控制,结合业务使用。

分享到:
评论

相关推荐

    Apache Kafka 企业级消息队列

    Kafka的操作集群主要有两种方式:命令行方式和Kafka自带的管理界面工具kafka-manager。命令行方式使用 kafka-topics.sh、kafka-consumer-groups.sh 等脚本进行主题或消费者组的管理和查看;kafka-manager是一个基于...

    Kafka分布式消息系统

    Kafka 是一种高性能的分布式消息队列,最初由 LinkedIn 开发,现在已成为 Apache 软件基金会的顶级项目。Kafka 主要设计用于处理大规模的日志数据,它以高吞吐量、低延迟和容错性著称。LinkedIn 的日志数据主要包含...

    kafka报文模拟工具

    Kafka是一种分布式流处理平台,它允许我们发布和订阅数据流,就像一个消息队列一样。主题(Topic)是Kafka中消息的分类,每个主题可以被分成多个分区(Partition),分区是有序的,并且具有高可用性和可扩展性。消费...

    浅谈分布式消息技术:Kafka.docx

    Kafka中的几个关键术语值得深入理解:Broker是消息处理节点,一个Kafka集群由多个broker组成;Topic是消息类别,集群可以同时处理多个topic;Partition是topic的物理划分,每个topic可被分为多个分区,每个分区是一...

    kafka原理文档1

    kafka是一种高吞吐量、基于发布/订阅模式的消息队列系统。下面是kafka的原理文档,涵盖了kafka的架构、设计理念、消息模型、 Partition机制、日志策略、消息可靠性机制等方面。 一、kafka架构 kafka的架构主要包括...

    kafka细心原理与实战

    Kafka使用了一种高效的文件组织结构来存储消息,以实现高性能的读写操作。 **1. 顺序读写**:Kafka将消息按顺序追加到文件末尾,利用磁盘顺序读写的高效性。 2. **零拷贝**:通过映射文件到内存,避免了数据从内核...

    kafka安装包-2.13-3.6.2

    Kafka主要由以下几个核心组件构成: 1. **Brokers**: Kafka集群中的服务器节点,负责存储和转发消息。它们是无状态的,可以水平扩展以增加吞吐量和容错性。 2. **Topics**: 消息的主题,可以看作是消息的分类或...

    Kafka集群搭建(3台机)

    Kafka是一种分布式的流处理平台,它具备消息队列的特性,常用于构建实时数据管道和流应用程序。搭建Kafka集群涉及到对虚拟机的安装配置、JDK环境的搭建、Zookeeper的安装配置等关键步骤。下面详细介绍各个知识点。 ...

    SpringBoot整合kafka,代码简洁,自动分配分区和指定分区消费(亲测可用)

    在本文中,我们将深入探讨如何将SpringBoot与Apache Kafka进行集成,实现消息生产和消费功能,同时涵盖自动分配分区和指定分区消费的策略。SpringBoot以其轻量级、快速开发的特性,深受开发者喜爱,而Kafka作为一个...

    kafka流培训材料

    但是,为了实现高效并发,Kafka采用了追加写入的方式,在每个分区中,消息总是按顺序追加,因此同一个分区内的消息可以保持严格的顺序。如果应用需要跨分区保持消息的顺序,那么Kafka就无法保证了。 Kafka的连接器...

    向kafka插入数据测试

    在IT行业中,Kafka是一种广泛使用的分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。这个场景中,我们关注的是“向Kafka插入数据”的测试。这涉及到多个知识点,包括Kafka的基本概念、生产者API、数据...

    kafka集群搭建与使用

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,使用 Scala 编写。Kafka 拥有作为一个消息系统应该具备的功能,但是确有着独特的设计。Kafka 集群的搭建和使用是基于 Kafka 的设计理念和架构。 Kafka 概念 * ...

    Kafka入门.pdf

    消息系统是一种允许独立的应用程序之间通过交换消息的方式进行通信的软件,它主要用于解耦应用程序,提高系统的灵活性和可伸缩性。消息系统的功能是将数据从一个应用程序传输到另一个应用程序,确保发送方和接收方的...

    kafka概述及原理.pdf

    鉴于Kafka的强大特性和优秀性能,其应用场景非常广泛,主要包括但不限于以下几种: - **日志收集**:Kafka可以作为集中式日志收集平台,将分散在各个服务器上的日志数据统一收集并转发至日志分析系统。 - **消息...

    自用kafka简单测试

    3. **消息模型**:Kafka支持两种消息模型——发布/订阅(pub/sub)和队列。发布/订阅模型中,所有订阅者都能收到消息;队列模型下,每条消息仅被一个消费者消费。 4. **副本与容错**:Kafka通过副本策略实现容错,...

    kafka简介与kafka深入浅出两个资料.rar

    2. **分区(Partition)**:每个主题可以被划分为多个分区,分区是有序的并且提供了一种水平扩展的方式,因为不同的分区可以分布在不同的服务器上。 3. **生产者(Producer)**:生产者负责将消息发送到Kafka的主题...

    消息队列-kafka1

    Kafka 是一种高性能、可扩展的消息队列系统,广泛应用于实时数据处理、日志处理、流数据处理等领域。下面,我们将深入探讨 Kafka 消息队列技术的相关知识点。 1. Kafka 概念 Kafka 是一种基于发布订阅模式的消息...

    kafka_2.11-1.1.1安装包

    在1.1.1版本中,Kafka为用户提供了一种高效、可扩展且持久化的方式来存储和传输消息。 首先,Kafka的核心概念包括生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责生成数据并发布到特定的主题,...

    kafka需要的jar包集合

    1. **kafka-clients.jar**:这是Kafka Java客户端的核心库,包含了生产者、消费者以及其他客户端API,用于连接Kafka服务器,发送和接收消息。 2. **slf4j-api.jar**:简单日志门面(SLF4J)是一个用于各种日志框架...

Global site tag (gtag.js) - Google Analytics