`

漫游Kafka之过期数据清理

 
阅读更多

Kafka将数据持久化到了硬盘上,允许你配置一定的策略对数据清理,清理的策略有两个,删除和压缩。

数据清理的方式

删除

log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:
清理超过指定时间清理:  
log.retention.hours=16
超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824
为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

压缩

将数据压缩,只保留每个key最后一个版本的数据。
首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的。
在topic的配置中设置log.cleanup.policy=compact启用压缩策略。

压缩策略的细节

如上图,在整个数据流中,每个Key都有可能出现多次,压缩时将根据Key将消息聚合,只保留最后一次出现时的数据。这样,无论什么时候消费消息,都能拿到每个Key的最新版本的数据。
压缩后的offset可能是不连续的,比如上图中没有5和7,因为这些offset的消息被merge了,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,比如,当试图获取offset为5的消息时,实际上会拿到offset为6的消息,并从这个位置开始消费。
这种策略只适合特俗场景,比如消息的key是用户ID,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
压缩策略支持删除,当某个Key的最新版本的消息没有内容时,这个Key将被删除,这也符合以上逻辑。

分享到:
评论

相关推荐

    StormStorm集成Kafka 从Kafka中读取数据

    本文将深入探讨如何实现Storm与Kafka的集成,重点在于如何从Kafka中读取数据。 **一、整合说明** Apache Storm是一个开源的分布式实时计算系统,它能够持续处理无限的数据流,确保每个事件都得到精确一次(Exactly...

    向kafka插入数据测试

    这个场景中,我们关注的是“向Kafka插入数据”的测试。这涉及到多个知识点,包括Kafka的基本概念、生产者API、数据模型、以及如何进行测试。 1. **Kafka基本概念**:Kafka是一个高吞吐量、低延迟的消息队列系统,它...

    kafka读取写入数据

    在分布式消息系统领域,Apache Kafka 是一个非常关键的组件,被广泛用于实时数据流处理和构建数据管道。本文将深入探讨“Kafka 读取写入数据”的核心知识点,包括 Kafka 的基本架构、数据模型、生产者与消费者原理...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    标题"使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据"揭示了这个项目的核心内容:通过Netty接收TCP长连接的数据,并将这些数据存储到Kafka中,同时利用Kafka的批量消费功能对数据进行处理。下面我们将...

    .NET CORE 代码使用kafka推送数据

    .NET CORE 代码使用 Kafka 推送数据涉及到一系列关键知识点,主要涵盖 .NET Core 开发环境、Kafka 概念、Confluent.Kafka .NET 客户端库以及消息生产和消费的实现。以下是对这些主题的详细解释: 1. **.NET Core**...

    代码:kafka数据接入到mysql中

    在大数据处理领域,将Kafka数据接入到MySQL中是一个常见的需求。Kafka作为一个高吞吐量、分布式的实时消息发布订阅系统,常用于日志收集、流式数据处理等场景。而MySQL则作为广泛应用的关系型数据库,用于持久化和...

    Flume采集数据到Kafka,然后从kafka取数据存储到HDFS的方法思路和完整步骤

    ### Flume采集数据到Kafka,然后从Kafka取数据存储到HDFS的方法思路和完整步骤 #### 一、概述 随着大数据技术的发展,高效的数据采集、处理与存储变得尤为重要。本文将详细介绍如何利用Flume采集日志数据,并将其...

    flume-kafka之日志数据模拟

    本篇文章将深入探讨如何利用Flume和Kafka来模拟并处理日志数据,特别是针对Java应用程序生成的日志。 Flume是一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的系统。它设计得非常灵活,易于配置,并...

    Flink消费Kafka数据1

    Flink消费Kafka数据 Flink 是一种大数据处理引擎,可以消费 Kafka 数据。本文将介绍如何使用 Flink 消费 Kafka 数据,并提供了实验环境的配置和资源购买的详细步骤。 一、实验环境配置 在本实验中,我们将使用...

    图解 Kafka 之实战指南

    ### 图解Kafka之实战指南知识点详述 #### 一、Kafka简介 **Kafka** 起初由LinkedIn采用Scala语言开发,后捐赠给Apache基金会,现已成为一款广泛应用于分布式流处理平台的成熟软件。它凭借高吞吐量、可持久化存储、...

    图解 Kafka 之实战指南.7z

    《图解 Kafka 之实战指南》是一本深入解析Apache Kafka的实用书籍,旨在帮助读者理解和掌握这个分布式消息系统的精髓。Kafka是一个高吞吐、低延迟的开源流处理平台,常用于实时数据管道和流应用的构建。在这个实战...

    Flink实时同步Kafka数据到Doris

    Flink实时同步Kafka数据到Doris

    对python操作kafka写入json数据的简单demo分享

    安装kafka支持库pip install kafka-python from kafka import KafkaProducer import json ''' 生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下...

    Kafka读数工具

    本工具是专为方便用户从Kafka中读取数据而设计的,下面将详细介绍其工作原理、使用方法以及与Kafka数据序列化相关的知识。 首先,了解Kafka的基本概念。Kafka是一个发布/订阅模型的消息系统,其中生产者负责发布...

    Flink实时读取Kafka数据批量聚合(定时按数量)写入Mysql.rar

    标题中的"Flink实时读取Kafka数据批量聚合(定时/按数量)写入Mysql"是一个典型的实时数据处理场景,涉及到大数据技术栈中的三个关键组件:Apache Flink、Apache Kafka和MySQL。以下是对这些技术及其在该场景下应用...

    flink 实现将kafka的数据读取出来,并经过xml配置和重写算子方式实现数据处理

    本篇文章将深入探讨如何使用 Flink 从 Kafka 消费数据,然后进行定制化处理,最后将处理结果回写到 Kafka。我们将重点讨论两个主要方面:通过 XML 配置方式设置数据源和转换,以及自定义 Flink 算子来实现数据处理。...

    行为日志采集kafka接入数据格式(终端、web,手机端)(1)(1).docx

    行为日志采集 Kafka 接入数据格式 _behavior 日志采集是指在客户端或服务器端采集用户行为数据,以便进行数据分析和应用优化。Kafka 是一个流行的分布式流媒体平台,常用于构建实时数据处理系统。在本文中,我们将...

    unity 利用kafka 接收数据

    unity利用kafka接收数据,只需填写ip端口,topic 即可接收消息;适用范围,unity编辑器,发布PC应用 说明:如果发布PC不可用,请手动将Plugins\X64文件夹里的dll 文件拷贝到发布文件kafka-Test_Data\Managed 路径下...

    java语言kafka数据批量导入到Elasticsearch实例

    消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...

    3、Druid的load data 示例(实时kafka数据和离线-本地或hdfs数据)

    Apache Druid 是一个高性能的列式数据存储系统,常用于实时分析和大数据处理。在本篇文章中,我们将深入探讨如何使用 Druid 进行数据加载,特别是...这种强大的数据摄取机制是 Druid 成为实时分析领域的重要工具之一。

Global site tag (gtag.js) - Google Analytics