`
y806839048
  • 浏览: 1120747 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Clickhouse Kafka Engine 使用

阅读更多

在使用ClickHouse的过程中,数据接入的方式有很多种,最近在尝试使用kafka的方式进行数据的入库,目前大概有两种方案:

  1. 内部kafka 引擎方式接入
  2. clickhouse_sinker

尝试第一种方式,既kafka引擎方式进行接入。

具体操作步骤如下:

    • 搭建kafka服务器 可以参照 spring cloud stream Kafka 示例 里,kafka搭建方式
    • 创建kafka 引擎的表
    • CREATE TABLE tkafka (
      timestamp UInt64,
      level String,
      message String
      ) ENGINE = Kafka SETTINGS kafka_broker_list = ‘192.168.1.198:9092’,
      kafka_topic_list = ‘test2’,
      kafka_group_name = ‘group1’,
      kafka_format = ‘JSONEachRow’,
      kafka_row_delimiter = ‘\n’,
      kafka_num_consumers = 1;

    • 创建一个结构表
    • CREATE TABLE daily (
      day Date,
      level String,
      total UInt64
      ) ENGINE = SummingMergeTree(day, (day, level), 8192);

    • 创建物化视图
    • CREATE MATERIALIZED VIEW consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM tkafka GROUP BY day, level;

整个过程就完毕了,其中需要消息发送主要是JSONEachRow,也就是JSON格式的数据,那么往topic 里面写入JSON数据即可。

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test2
>{"timestamp":"1562209583","level":"2","message":"hello2"}

发送数据后需要关闭通道,不然无法查询到数据。

SELECT level, sum(total) FROM daily GROUP BY level;

问题解答

1. Cannot parse input: expected { before: \0: (at row 2)

问题出在引擎版本上,我使用的是19.3.4 版本。19.1 版本没有问题, 19.5.2.6 版本解决了此问题,也就是中间版本存在这个问题。

原因: 消息中数据之间的分割符号未指定,导致无法处理。

解决办法: 添加 kafka_row_delimiter = ‘\n’,也就是上文键标红的部分。

参考解决地址: https://github.com/yandex/ClickHouse/issues/4442

2. 消息发送后,数据无法查询。

原因:kafka 引擎默认消费根据条数与时间进行入库,不然肯定是没效率的。

解决办法:其中对应的参数有两个。 max_insert_block_size ,stream_flush_interval_ms。

这两个参数都是全局性的。

max_insert_block_size 默认值为: Default value: 1,048,576.

参考地址: https://clickhouse.yandex/docs/zh/operations/settings/settings/#settings-max_insert_block_size

<iframe id="aswift_2" style="border-width: 0px; border-style: initial; font-family: inherit; font-style: inherit; font-weight: inherit; margin: 0px; outline: 0px; padding: 0px; vertical-align: baseline; max-width: 100%; left: 0px; position: absolute; top: 0px; width: 700px; height: 175px;" name="aswift_2" frameborder="0" marginwidth="0" marginheight="0" scrolling="no" width="700" height="175"></iframe>

stream-flush-interval-ms 默认值为: The default value is 7500.

 

实战:

 格式错误,会导致不再可以接受消息,此时可以重建物化表有关

 

 注意发送后关闭通道

     严格的物化顺序,不允许中间删除再补

             user.xml文件中:有效

            <max_memory_usage>120000000000</max_memory_usage>  解决查询峰值问题,查询异常不稳定

            <max_insert_block_size>2048576</max_insert_block_size>  防止溢出

 

            <stream_flush_interval_ms>750</stream_flush_interval_ms>  解决快速看效果

 

可用脚本:

 

 

 

例子:

创建队列:

./kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic log_test1

发送消息:

./kafka-console-consumer.sh --bootstrap-server node01:9092  --topic log_test

接受消息:

 

sh bin/kafka-console-producer.sh --broker-list node01:9092 --topic log_test1

 

 

 

{"timestamp":"1562209583","level":"2","message":"hello2"}

 

 

 

CREATE TABLE default.tkafka (

timestamp UInt64,

level String,

message String

) ENGINE = Kafka SETTINGS kafka_broker_list = '192.168.202.135:9092,192.168.202.136:9092,192.168.202.185:9092',

kafka_topic_list = 'log_test55',

kafka_group_name = 'group1',

kafka_format = 'JSONEachRow',

kafka_row_delimiter = '\n',

kafka_num_consumers = 1;

 

 

CREATE TABLE default.daily (

day Date,

level String,

total UInt64

) ENGINE = SummingMergeTree(day, (day, level), 8192);

 

 

CREATE MATERIALIZED VIEW default.consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM default.tkafka GROUP BY day, level;

 

 

 

 

参考地址:

https://clickhouse.yandex/docs/zh/operations/settings/settings/#stream-flush-interval-ms

这个参数改小时影响整个数据库的,所以如果不好调整请采用方案2。clickhouse_sinker.

github 地址: https://github.com/housepower/clickhouse_sinker

 

参考:https://www.cqmaple.com/201907/clickhouse-kafka-engine.html

 

 

 

 

分享到:
评论

相关推荐

    clickhouse--kafka引擎接入同步表.docx

    ClickHouse-Kafka引擎接入同步表是指使用ClickHouse和Kafka两种大数据处理技术来实现数据同步的解决方案。这种解决方案可以将Kafka中的数据实时同步到ClickHouse中,实现数据的实时同步和分析。 ClickHouse-Kafka...

    spark+clickhouse+hive+kafka+vue+hbase大型分析系统

    基于Flink+ClickHouse构建的分析平台,涉及 Flink1.9.0 、ClickHouse、Hadoop、Hbase、Kafka、Hive、Jmeter、Docker 、HDFS、MapReduce 、Zookeeper 等技术

    golang接受 kafka 日志数据 格式转化后 保存到clickhouse 批量 高速 结构化日志保存

    后端服务把json日志保存到文件 filebeat服务收集起来并发送到 kafka消息队列 golang服务 从kafka接收日志数据 ...保存到clickhouse数据库 供检索分析使用 这是一个完整的日志收集循环 适合一个小型分布式或单体服务使用

    word源码java-SpringBoot-kafka-clickhouse:灯塔-繁星开发团队的第一个项目

    word源码java Lighthouse-Stars-FirstProject 灯塔-繁星开发团队的第一个学习任务项目 学习任务 虚拟化方向 ...整合springboot,kafka,clickhouse做一个小项目, 通过springboot模拟数据发送到kafka, Spri

    Spring Boot集群管理工具KafkaAdminClient使用方法解析

    Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...

    filebeat+kafka+clickhouse+springboot框架搭建及应用

    本教程将深入探讨如何使用`Filebeat`、`Kafka`、`ClickHouse`以及`SpringBoot`框架搭建一个高效的数据流处理和存储平台。这四个组件各自扮演着关键的角色,共同构建了一个实时、可扩展且高效率的数据管道。 **...

    RdKafka::KafkaConsumer使用实例

    在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...

    使用seatunnel同步Kafka的数据到clickhouse(保姆级)

    本教程将详细介绍如何使用Seatunnel实现从Kafka到ClickHouse的数据同步,这是一份保姆级别的指南,旨在帮助初学者轻松上手。 **Apache Kafka** Kafka是一种分布式流处理平台,它被广泛用于构建实时数据管道和流应用...

    [] - 2022-07-30 替代ELK:ClickHouse+Kafka+FlieBeat.pdf

    互联网资讯,技术简介,IT、AI技术,人工智能互联网资讯,技术简介,IT、AI技术,人工智能互联网资讯,技术简介,IT、AI技术,人工智能互联网资讯,技术简介,IT、AI技术,人工智能互联网资讯,技术简介,IT、AI技术...

    kafka搭建与使用.doc

    Kafka 集群搭建与使用详解 Kafka 是一种分布式流媒体平台,由 Apache 开源项目提供。它主要用来构建实时数据管道和流媒体处理系统。本文档将详细介绍 Kafka 集群的搭建和使用,包括创建、删除、生产者、消费者等...

    kafka 简单使用(附阿里云kafka使用)

    (阿里云kafka使用时需要结合阿里云提供的认证部分,另外上传文件时,阿里云kafka还在公测期间);如果你够厉害,而且数据规模足够,可以使用storm+kafka,彼时kafka当然就不会这么简单的使用了。

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    标题"使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据"揭示了这个项目的核心内容:通过Netty接收TCP长连接的数据,并将这些数据存储到Kafka中,同时利用Kafka的批量消费功能对数据进行处理。下面我们将...

    行为日志采集kafka接入数据格式(终端、web,手机端)(1)(1).docx

    行为日志采集 Kafka 接入数据格式 _behavior 日志采集是指在客户端或服务器端采集用户行为数据,以便进行数据分析和应用... ClickHouse 对接 Kafka 消息可以实时处理和分析用户行为数据,以便进行数据分析和应用优化。

    使用flink的standalone模式同步Kafka的数据到clickhouse的flink代码和jar包

    本项目主要涉及的是如何使用Flink的Standalone模式从Kafka源获取数据,并将这些数据同步到ClickHouse存储系统中。下面我们将详细介绍这一过程的关键步骤和涉及的技术知识点。 1. **Flink Standalone模式**:Flink ...

    kafka部署和使用详尽PDF

    《Kafka部署与使用详解》 Kafka是一种分布式流处理平台,广泛应用于大数据实时处理、日志收集、消息系统等领域。这份详尽的PDF文档详细介绍了如何在Linux环境下部署和使用Kafka,包括单机部署和集群部署。 一、...

    5、kafka监控工具Kafka-Eagle介绍及使用

    使用 Kafka-Eagle 可以帮助运维人员更有效地管理 Kafka 集群,及时发现和解决问题,确保 Kafka 的稳定运行。对于大数据系统来说,这样的监控工具至关重要,因为它能够提高系统的可用性和性能,降低故障排查的成本。

    kafka的使用场景.docx

    【Kafka的使用场景】 Kafka是一款高效、可扩展、分布式的流处理平台,它最初由LinkedIn开发,后来成为Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流应用,它在处理大规模实时数据流方面表现出色...

    kafka-manager 最新版本 已经编译好,可直接使用

    在本压缩包中,你获得的是kafka-manager的最新版本——1.3.3.18,这个版本已经预先使用Sbt(Scala Build Tool)完成了编译,用户可以直接进行部署,无需自行进行编译步骤,大大简化了使用流程。 Kafka是Apache软件...

    Kafka简介及使用PHP处理Kafka消息

    Kafka简介及使用PHP处理Kafka消息 Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息...

    kafkatool 连接kafka工具

    为了方便管理和操作 Kafka 集群,开发者通常会使用各种工具,其中 `kafkatool` 是一款广泛使用的命令行工具。本文将深入探讨 `kafkatool` 的功能、使用方法以及如何利用它来连接和管理 Kafka 集群。 **1. kafkatool...

Global site tag (gtag.js) - Google Analytics