`
maosheng
  • 浏览: 568116 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Linux 下 Kafka Cluster 搭建

 
阅读更多
概述

http://kafka.apachecn.org/quickstart.html
1.高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,他的延迟最低只有几毫秒
2.每个topic可以分多个partition,consumer group 对partition进行consume操作
3,可扩展性:kafka集群支持热扩展
4.持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
5.容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
6.高并发:支持数千个客户端同时读写

Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群
Topic:一类消息,消息存放的目录即主题,对消息进行划分唯一的逻辑单元
message:Kafka中最基本的传递对象
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
Segment:partition物理上有多个segment组成,每个segment存着message信息

Kafka存储策略:
1.kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成
2,每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可以直接定位到消息的存储位置,避免id到位置的额外映射


-------------------------------------------------------------------------------------------

服务器:

192.101.11.162: zookeeper0  kafka0
192.101.11.163: zookeeper1  kafka1
192.101.11.164: zookeeper2  kafka2

一.准备工作

1.下载、安装JDK1.8,Zookeeper,kafka
下载URL:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
jdk-8u251-linux-x64.tar.gz

下载URL:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
apache-zookeeper-3.6.1-bin.tar.gz

下载URL:http://kafka.apache.org/downloads
kafka_2.12-2.5.0.tgz

3.解压JDK1.8,Zookeeper,kafka
#tar -zxvf jdk-8u251-linux-x64.tar.gz
#tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
#tar -zxvf kafka_2.12-2.5.0.tgz

4.创建运行目录
#mkdir /usr/local/components/
#mv jdk1.8.0_251 /usr/local/components/
#mv apache-zookeeper-3.6.1-bin /usr/local/components/
#mv kafka_2.12-2.5.0 /usr/local/components/

5.创建数据目录
#mkdir -p /data/zookeeper/
#mkdir -p /data/kafka/

二.安装JDK

1.编辑/etc/profile,增加如下环境变量
#vi /etc/profile

  export JAVA_HOME=/usr/local/components/jdk1.8.0_251
  export PATH=$JAVA_HOME/bin:$PATH

2.让/etc/profile文件修改后立即生效
#source /etc/profile

3.显示PATH环境变量,检验修改后的结果
#echo $PATH

  printenv

4.验证JDK安装完成,显示版本号
#java -version


三.安装zookeeper

1.分发到其他节点
#scp  ...............

2.复制并修改配置
#cp zoo_sample.cfg zoo.cfg

#vi zoo.cfg

修改:
dataDir=/data/zookeeper/

增加:
server.0=192.101.11.162:8880:7770
server.1=192.101.11.163:8880:7770
server.2=192.101.11.164:8880:7770

创建日志目录
#mkdir -p /data/zookeeper/logs/

#cd ./bin
#vi zkEnv.sh

增加:
ZOO_LOG_DIR=/data/zookeeper/logs/

3.分发到其他节点
#scp  ...............

4.配置实例ID,其他节点同理,(三个节点分别为:0,1,2) 
#cd /data/zookeeper/
#echo "0" >myid

5.启动运行
#cd bin
#./zkServer.sh start

四。安装kafka

1.分发到其他节点
#scp  ...............

2.修改配置
#cd config
#vi server.properties

修改broker.id,修改为不同的值(三个节点分别为:0,1,2)
broker.id=0

修改log.dirs,这是Kafka的数据目录
log.dirs=/data/kafka/

修改listeners(192.101.11.162:9092,192.101.11.163:9092,192.101.11.164:9092)
listeners=PLAINTEXT://192.101.11.162:9092
    
socket.send.buffer.bytes=1048576   
socket.receive.buffer.bytes=1048576   
socket.request.max.bytes=104857600   
num.partitions=10 
zookeeper.connect=192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181

注意:内存生产建议配置 4G

3.启动运行
#cd bin
#./kafka-server-start.sh ../config/server.properties &

#./kafka-server-start.sh -daemon ../config/server.properties

4.创建Topic

# ./kafka-topics.sh --create --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181 --topic test --partitions 1 --replication-factor 3  ##一个分区,三个副本

注意:replication-factor 用于设置主题副本数,每个副本分布在不同节点,不能超过总节点数。

5.查看Topic列表

# ./kafka-topics.sh --list --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181

6.查看Topic状态

# ./kafka-topics.sh --describe --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181  --topic test


删除Topic
# ./kafka-topics.sh --delete --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181  --topic test


7.producer发送消息

#./kafka-console-producer.sh --broker-list 192.101.11.162:9092,192.101.11.163:9092,192.101.11.164:9092 --topic test

8.consumer接收消息

#./kafka-console-consumer.sh --bootstrap-server 192.101.11.162:9092,192.101.11.163:9092,192.101.11.164:9092 --topic test --from-beginning

9.修改分区数
# ./kafka-topics.sh --alter --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181  --topic test --pattitions 3

注意:修改分区数时,只能增加分区个数,不能减少分区个数,否则会报错

10、分区重新分配
vim reassign.json
{
  "topics":[{"topic":"test"}],
  "version":1
}

# ./kafka-reassign-partitions.sh --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181 --topics-to-move-json-file reassign.json --broker-list 192.101.11.162:9092,192.101.11.163:9092,192.101.11.164:9092 --generate

## 命令输出两个json字符串,第一个json内容为当前的分区副本分配情况,第二个为重新分配的候选方案(注意:这是只是生成一份可行性的方案,并没有真正执行重分配的动作)。
## 保存生成的第二个json(Proposed partition reassignment configuration)到名为result.json的文件里。
vim result.json

执行分配策略:
# ./kafka-reassign-partitions.sh --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181 --reassignment-json-file result.json --execute

查看分区重新分配的进度:
# ./kafka-reassign-partitions.sh --zookeeper 192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181 --reassignment-json-file result.json --verify

分区分配策略:
RangeAssignor(默认策略),RoundRobinAssignor,StickyAssignor

RangeAssignor(默认策略)策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。
RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。

partition中segment文件存储结构:
segment file 组成:由两大部分组成,分别为index file和data file。此两个文件一一对应,成对出现,后缀.index和.log分别表示为segment的索引文件和数据文件

数据文件的分段:
kafka解决查询效率的手段之一是将数据文件分段。
一句话:kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到高效性。

日志删除:
kafka日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间顺序),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除
时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取曹组的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteAtrayList.
kafka消费日志删除思想:kafka把topic中一个partition大小文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
vi config/server.properties
log.cleanup.plicy=delete ##启用删除策略
##直接删除,删除后的消息不可恢复,可配置以下两个策略
log.retention.hours=16  ##清理超过指定时间message
log.retention.bytes=1073741824  ##清理指定大小后,删除旧的消息

磁盘存储优势:
kafka在设计的时候,采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且不允许修改已经写入的消息,
这种方式属于典型的顺序写入的操作,所以就算是kafka使用磁盘作为存储介质,所能实现的吞吐量也非常可观。

kafka大量使用页面缓存,这也是kafka实现高吞吐的重要因素之一。
kafka还使用了零拷贝技术来进一步提升性能。

五、kafka-eagle安装:

0、kafka需要开启JMX端口

    找到kafka安装路径,进入到bin文件夹,修改下面的地方。
    vi kafka-server-start.sh
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi
...

1、下载kafka-eagle:
https://codeload.github.com/smartloli/kafka-eagle-bin/tar.gz/v2.0.0

tar -zvxf kafka-eagle-bin-2.0.0.tar.gz

cd kafka-eagle-bin-2.0.0/

tar -zvxf kafka-eagle-web-2.0.0-bin.tar.gz

ln -s /opt/kong/kafka-eagle-bin-2.0.0/kafka-eagle-web-2.0.0 /usr/local/kafka-eagle

2、环境配置
vi /etc/profile
export KE_HOME=/usr/local/kafka-eagle
export PATH=$PATH:$KE_HOME/bin

3、配置修改

cd /usr/local/kafka-eagle/conf
vi system-config.properties

# multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
#如果只有一个集群的话,就写一个cluster1就行了
#kafka.eagle.zk.cluster.alias=cluster1,cluster2  
kafka.eagle.zk.cluster.alias=cluster1
#这里填上刚才上准备工作中的zookeeper.connect地址
cluster1.zk.list=192.101.11.162:2181,192.101.11.163:2181,192.101.11.164:2181
#如果多个集群,继续写,如果没有注释掉
#cluster2.zk.list=192.168.18.21:2181,192.168.18.22:2181,192.168.18.23:2181

# zk limit -- Zookeeper cluster allows the number of clients to connect to
kafka.zk.limit.size=25

# kafka eagel webui port -- WebConsole port access address
kafka.eagle.webui.port=8048     ###web界面地址端口

# kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option
kafka.eagle.offset.storage=kafka

# delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete
kafka.eagle.topic.token=keadmin

# kafka sasl authenticate, current support SASL_PLAINTEXT
#如果kafka开启了sasl认证,需要在这个地方配置sasl认证文件
kafka.eagle.sasl.enable=false
kafka.eagle.sasl.protocol=SASL_PLAINTEXT
kafka.eagle.sasl.mechanism=PLAIN
kafka.eagle.sasl.client=/data/kafka-eagle/conf/kafka_client_jaas.conf

#下面两项是配置数据库的,默认使用sqlite,如果量大,建议使用mysql,这里我使用的是sqlit
#如果想使用mysql建议在文末查看官方文档
# Default use sqlite to store data
kafka.eagle.driver=org.sqlite.JDBC
# It is important to note that the '/hadoop/kafka-eagle/db' path must exist.
kafka.eagle.url=jdbc:sqlite:/data/app/kafka-eagle/db/ke.db   #这个地址,按照安装目录进行配置
kafka.eagle.username=root
kafka.eagle.password=smartloli

# <Optional> set mysql address
#kafka.eagle.driver=com.mysql.jdbc.Driver
#kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#kafka.eagle.username=root
#kafka.eagle.password=smartloli



如果开启了sasl认证,需要自己去修改kafka-eagle目录下的conf/kafka_client_jaas.conf

4、启动kafka-eagle

  cd ${KE_HOME}/bin
  chmod +x ke.sh
  ./ke.sh start
 
  [2020-08-27 21:49:07] INFO: Status Code[0]
[2020-08-27 21:49:07] INFO: [Job done!]
Welcome to
    __ __    ___     ____    __ __    ___            ______    ___    ______                               __     ______
   / //_/   /   |   / __/   / //_/   /   |          / ____/   /   |  / ____/   /                            /    / ____/
  / ,<     / /| |  / /_    / ,<     / /| |         / __/     / /| | / / __    /                            /    / __/
/ /| |   / ___ | / __/   / /| |   / ___ |        / /___    / ___ |/ /_/ /   / /                           ___ / /___
/_/ |_|  /_/  |_|/_/     /_/ |_|  /_/  |_|       /_____/   /_/  |_|\____/   /___                           __//_____/
                                                                                                          

Version 2.0.0 -- Copyright 2016-2020
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.101.11.152:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

  查看日志是否出问题
  tailf ../log/log.log
  如果没问题,则直接登录
  http://192.101.11.152:8048/ke
  默认用户名:admin
  默认密码:123456
  如果进入到一下界面,就说明你安装成功了!


ke.sh start     ##启动kafka Eagle
ke.sh stop      ##停止kafka-eagle
ke.sh restart   ##重启kafka-eagle
ke.sh status    ##查看kafka-eagle系统运行状态
ke.sh stats     ##统计kafka-eagle系统占用资源情况









分享到:
评论

相关推荐

    kafka集群搭建文档

    本篇文档将详细介绍如何在Linux环境中搭建Kafka集群,同时结合Hadoop和Spark的分布式安装,构建一个完整的数据处理平台。 首先,搭建Kafka集群的基础条件包括: 1. 至少一台Linux服务器,但为了高可用性,推荐多台...

    linux安装kafka教程

    Linux 安装 Kafka 教程 Kafka 是一种流行的分布式流处理平台,广泛应用于数据处理、实时数据处理和事件驱动架构等领域。本教程将指导您在 Linux 环境中安装和配置 Kafka。 一、Kafka 安装 Kafka 可以通过两种方式...

    kafka环境搭建

    本文将详细介绍如何在Windows和Linux环境下搭建Kafka环境,并解释其中涉及的关键概念。 #### 二、Kafka基础知识 在开始环境搭建之前,我们先了解一些Kafka的基本术语: - **Broker**:Kafka 集群中的服务节点被...

    linux Redhat Kafka集群部署

    redhat linux 部署Kafka集群

    Kafka Tool linux版本,适用于kafka0.11及以上

    总的来说,Kafka Tool是一款强大的Kafka管理工具,尤其适合Linux环境下的开发者和运维人员,它提供了丰富的功能,使得与Kafka集群的交互变得直观和简单。通过熟练掌握Kafka Tool的使用,可以提高工作效率,更好地...

    kafka集群搭建及测试.docx

    - jdk-8u162-linux-x64.tar:Java开发环境,用于运行Kafka和Zookeeper。 - kafka_2.11-2.0.0.tgz:Apache Kafka的二进制包。 - zookeeper-3.4.12.tar:Apache ZooKeeper,Kafka的依赖,用于集群协调。 **2. 搭建...

    kafka集群搭建.pdf

    kafka集群搭建方案 kafka集群搭建是大数据处理和实时数据处理的重要组件。下面是kafka集群搭建的详细方案: 一、准备工作 1. 关闭防火墙 关闭防火墙是kafka集群搭建的前提条件。可以使用systemctl disable ...

    kafka分布式集群搭建

    ### Kafka分布式集群搭建详解 #### 一、概述 Kafka是一种高性能、分布式的消息发布与订阅系统,被广泛应用于日志收集、流处理、消息传递等多个领域。为了提高系统的可用性与扩展性,通常会采用分布式集群的方式...

    kafka搭建单机windows_单机linux_集群linux操作.rar

    本文将详细介绍如何在Windows单机环境、Linux单机环境以及Linux集群环境下搭建Kafka,旨在帮助读者深入理解Kafka的部署与配置,以便更好地运用在实际项目中。 ### Windows单机环境搭建 1. **下载安装Java运行环境...

    Kafka Tool 2.0.7(linux系统)

    本文将详细介绍如何在 Linux 环境下安装和使用 Kafka Tool 2.0.7。 **一、下载与安装** 1. **获取 Kafka Tool**: 首先,你需要从官方或者第三方源下载 Kafka Tool 2.0.7 的压缩包。通常,它会是一个 `.tar.gz` 或 ...

    kafka消息监控(linux运行_window查看)

    对于Windows用户,虽然Kafka Offset Monitor通常在Linux环境下运行,但也可以通过安装Java并使用相同命令在Windows命令提示符中运行。确保在运行命令前,系统已经正确配置了Java环境变量。 这个监控工具对于Kafka的...

    kafka集群搭建与使用

    Kafka 集群搭建与使用 Kafka 是一种高吞吐量的分布式发布订阅消息系统,使用 Scala 编写。Kafka 拥有作为一个消息系统应该具备的功能,但是确有着独特的设计。Kafka 集群的搭建和使用是基于 Kafka 的设计理念和架构...

    在linux中搭建kafka集群

    在Linux环境中搭建Apache Kafka集群是一项重要的任务,尤其对于那些需要处理大量实时数据流的应用场景。Kafka是一个分布式消息系统,它允许高效地发布、订阅和存储数据流。Zookeeper是Apache的一个子项目,用于...

    linux kafka+elk7.9.1搭建所需要的的包.rar

    Linux环境下,ELK(Elasticsearch、Logstash、Kibana)和Kafka以及Zookeeper是构建高效日志管理和分析系统的必备组件。这个压缩包包含了这些工具的最新稳定版本,旨在简化安装过程,提高效率。 首先,让我们详细...

    linux下kafka

    在Linux环境下部署Apache Kafka是一项常见的任务,特别是在大数据处理和实时流计算中。Kafka是一个分布式流处理平台,它被设计为可扩展、高吞吐量和低延迟的系统。在这个场景中,我们使用的版本是`kafka_2.11-2.0.1....

    支持windows和linux的kafka安装包

    无论是在Windows还是Linux环境下,安装和使用Kafka都需要理解其核心概念,如主题(topics)、分区(partitions)和副本(replicas),以及生产者、消费者和集群配置。通过熟练掌握这些知识点,开发者能够有效地利用...

    Linux搭建Kafka开发环境

    首先,从标题我们可以得知,本文将介绍如何在Linux操作系统上搭建Kafka开发环境。Kafka是由LinkedIn公司开发,是一个高性能分布式消息系统。它使用Scala编写,并且支持分布式数据的发布和订阅模式。其特点包括高吞吐...

    linux安装kafka监控EFAK安装.docx

    在Linux环境中,对Apache Kafka进行监控是确保系统稳定运行的关键步骤。EFAK(原名为Kafka Eagle)是一个专门用于Kafka集群监控的开源工具,它提供了丰富的可视化界面,可以帮助管理员轻松地监控Kafka节点的状态、...

    kafka搭建与使用.doc

    Kafka 集群搭建与使用详解 Kafka 是一种分布式流媒体平台,由 Apache 开源项目提供。它主要用来构建实时数据管道和流媒体处理系统。本文档将详细介绍 Kafka 集群的搭建和使用,包括创建、删除、生产者、消费者等...

Global site tag (gtag.js) - Google Analytics