话题和日志 (Topic和Log)
分布式(Distribution)
生产者(Producers)
消费者(Consumers)
Kafka的保证(Guarantees)
1
2
|
tar xf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1
|
1
2
3
|
bin /zookeeper-server-start .sh config /zookeeper .properties # 启动zookeeper服务端实例
bin /kafka-server-start .sh config /server .properties # 启动kafka服务端实例
|
1
|
bin /kafka-topics .sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
|
1
|
bin /kafka-topics .sh --list --zookeeper localhost:2181
|
1
2
3
|
bin /kafka-console-producer .sh --broker-list localhost:9092 --topic test This is a message This is another message |
1
2
3
|
bin /kafka-console-consumer .sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message This is another message |
1
|
bin /kafka-topics .sh --describe --zookeeper localhost:2181 --topic peiyinlog
|
1
|
bin /kafka-topics .sh --zookeeper 192.168.90.201:2181 --alter --topic test2 --partitions 20
|
1
|
bin /kafka-topics .sh --zookeeper zk_host:port /chroot --delete --topic my_topic_name
|
1
|
delete.topic. enable = true
|
broker_server ip | 主机名 | zookeeper ip | 客户端 ip |
192.168.1.2 | 默认 localhost | 192.168.1.4 | 192.168.1.5 |
1
2
3
|
# 此时客户端向broker发起一些消费: bin /kafka-console-consumer .sh --zookeeper 192.168.1.4:2181 --topic test2 --from-beginning
|
1
2
3
4
5
6
7
8
|
# server.properties broker. id =0 # broker节点的唯一标识 ID 不能重复。
host.name=10.10.4.1 # 监听的地址,如果不设置默认返回主机名给zk_server
log. dirs = /u01/kafka/kafka_2 .11-0.10.0.1 /data # 消息数据存放路径
num.partitions=6 # 默认主题(Topic)分片数
log.retention.hours=24 # 消息数据的最大保留时长
zookeeper.connect=10.160.4.225:2181 # zookeeper server 连接地址和端口
|
1
2
3
4
5
6
7
8
9
|
output { kafka {
workers => 2
bootstrap_servers => "10.160.4.25:9092,10.160.4.26:9092,10.160.4.27:9092"
topic_id => "xuexilog"
} } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
input{ kafka {
zk_connect => "112.100.6.1:2181,112.100.6.2:2181,112.100.6.3:2181"
group_id => "logstash"
topic_id => "xuexilog"
reset_beginning => false
consumer_threads => 5
decorate_events => true
} } # 这里group_id 需要解释一下,在Kafka中,相同group的Consumer可以同时消费一个topic,不同group的Consumer工作则互不干扰。 # 补充: 在同一个topic中的同一个partition同时只能由一个Consumer消费,当同一个topic同时需要有多个Consumer消费时,则可以创建更多的partition。 output { if [ type ] == "nginxacclog" {
elasticsearch {
hosts => [ "10.10.1.90:9200" ]
index => "logstash-nginxacclog-%{+YYYY.MM.dd}"
manage_template => true
flush_size => 50000
idle_flush_time => 10
workers => 2
} } } |
1
|
bin /kafka-consumer-groups .sh --group logstash --describe --zookeeper 127.0.0.1:2181
|
GROUP | TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG |
消费者组 | 话题id | 分区id | 当前已消费的条数 | 总条数 | 未消费的条数 |
相关推荐
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
Filebeat+Kafka+Logstash+ElasticSearch 日志监控解决方案 本文档介绍了一种基于Filebeat、Kafka、Logstash和ElasticSearch的日志监控解决方案。该解决方案可以实时收集、处理和存储日志数据,提供了一个完整的日志...
kibana-7-12-1.tar.zip
本篇文章我们来解决第三个问题: kibana又如何用直观的显示我们希望看到的日志报表? 根据数据显示看板,大致三步, 第一步是设置数据源,根据我们之前推送给elasticsearch的日志数据,使用management标签创建索引...
Logstash 是一个强大的数据处理管道,它允许用户从各种数据源采集数据,转换数据,并将其发送到各种目标,如 Elasticsearch 或 Kafka。在标题和描述中提到的 "elasticsearch或kafka的数据抽取工具:logstash-5.6.1...
本篇将详细阐述如何利用一系列工具,包括Filebeat、Syslog、Flume、Kafka、Logstash以及Elasticsearch(简称ES),来构建一个高效、可扩展的日志收集系统。 首先,Filebeat是Elastic公司出品的一款轻量级日志收集...
最简单的是Elasticsearch+Logstash+Kibana,但生产环境通常推荐使用Elasticsearch+Logstash+Filebeat+Kibana或Elasticsearch+Logstash+Filebeat+Kafka+Kibana的架构,以保证日志处理的稳定性和数据完整性。...
【标题】基于SpringBoot + Logstash的在线教育系统 在线教育系统已经成为现代教育技术的重要组成部分,而使用SpringBoot和Logstash进行开发可以实现高效、稳定且具有可扩展性的平台。SpringBoot是Java领域广泛使用...
2019年elaticsearch6.6.0的安装教程,kibana6.6.0安装教程,logstash6.6.0安装教程,ik分词器,head插件,bigdesk等插件安装教程,x-pack使用等。
SkyWalking 是一个强大的分布式追踪和应用性能管理(APM)系统,尤其适合微服务架构。本文将详细介绍如何在环境中集成 SkyWalking 8.4.0、Kafka 和 Elasticsearch 7.12.0 的配置和搭建过程。 首先,确保你已下载了 ...
Logstash支持各种输入插件(如文件、HTTP、TCP等)、输出插件(如Elasticsearch、Kafka等)以及过滤插件,允许开发者根据需要对数据进行清洗和预处理。 在Logstash中,我们可以通过配置文件定义输入源,例如读取PHP...
3. **输出插件**: 包括 Elasticsearch 输出,将处理好的数据写入 Elasticsearch,还有其他如 Redis、Kafka 等。 4. **弹性伸缩**: 可以轻松地增加 Logstash 实例以处理更多数据。 5. **易于配置**: 使用 JSON 格式的...
**ELK(Elasticsearch + Logstash + Kibana)** 是一个强大的日志管理和分析解决方案,广泛应用于大数据场景,特别是对于实时日志收集、处理、存储和可视化。这个组合提供了从日志生成到可视化的全套流程,帮助企业更...
然而,由于Logstash的性能限制和资源消耗,对于大规模日志处理,可能需要引入消息队列如Kafka来缓冲数据流,避免数据丢失并减轻Logstash的压力。 【Kibana的使用场景】 Kibana的界面直观易用,可以快速进行日志...
ELK(Elasticsearch, Logstash, Kibana)是一个流行的开源日志分析...在实际应用中,可以根据业务需求定制Logstash的过滤规则,利用Elasticsearch的索引管理和查询性能,以及Kibana的可视化能力,实现高效的数据洞察。
es:elasticsearch 对数据进行存储,分类,搜索 logstash: 日志收集,filter(过滤),日志输出到(reids,kafka,es)中 kibana:日志展示(查询es中保存的数据)
该系统通过整合SpringBoot应用的日志记录、Kafka的消息中间件、ELK(Elasticsearch、Logstash、Kibana)组件,实现了日志的实时流处理和可视化分析。 1. **可视化展示日志**:Kibana作为ELK堆栈的一部分,提供了一个...