这一节我们看下怎么创建一个多实例的集群(以三个节点为例)。
一、 创建配置文件
进入到 Kafka 主目录,以 config/server.properties 为原型,创建两个新的节点配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
按如下方式修改这两个配置文件的相关属性
config/server-1.properties:
listeners=PLAINTEXT://:9093
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
listeners=PLAINTEXT://:9094
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
上述属性中 broker.id 在集群中必须是唯一且永久的。我们打算在同一台机器上启动另两个节点实例,为避免冲突,端口和日志文件路径也做了相应的修改。
二、启动服务
输入如下命令启动服务:
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
jps 一下,一共可以看到三个 Kafka 进程。
三、 创建主题
现在我们创建一个具有三份副本、两个划分的主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic2
我们可以通过 describe 参数查看集群中对刚刚创建的主题的配置情况:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic2
Topic:my-replicated-topic2 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: my-replicated-topic2 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-replicated-topic2 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
这里解释一下上述输出:第一行给出了所有划分的摘要信息,接下来逐行显示每个划分的详细信息。
- leader 节点响应对应划分的全部读写请求。
- replicas 是一个节点列表,他们产生相应划分的副本信息,里面的节点不一定都是存活的。
- isr 是一组非同步(in-sync)的备份,它是 replicas 的子集,里面的节点当前都是存活的。
四、 验证可靠性
接下来我们验证一下多副本的可靠性。先灌两条消息到之前创建的主题上:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic2
test message 1
test message 2
读一下看看:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic2
test message 1
test message 2
(注意因为我们申请了两个划分,上述命令的结果有可能顺序不一致,也就是 test message 2 在前面)
我们通过这个命令找到 id 为 1 的节点实例:
ps -ef | grep server-1.properties
work 15053 47875 0 17:25 pts/1
干掉这个节点进程:
这时候我们再看一下主题的状态描述:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic2
Topic:my-replicated-topic2 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: my-replicated-topic2 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,2
Topic: my-replicated-topic2 Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2,0
可以看出来节点 1 已经被停掉了。我们再去主题中读一下消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic eplicated-topic2
test message 2
test message 1
谢天谢地,数据还在。
相关推荐
Kafka支持集群部署,可以添加更多的Broker节点以提高可用性和容错性。同时,还可以配置诸如offset存储、acks策略、网络缓冲区大小等参数,以适应不同的性能和持久化需求。 总结 Kafka 2.12-3.6.1在Linux环境中的...
在`kafka-python`中,可以通过创建一个`KafkaProducer`实例并调用其`send()`方法来发送消息: ```python from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') ...
- `KafkaConsumer`类用于创建消费者实例,配置包括bootstrap servers、group id、offset管理策略等。 - `subscribe()`方法用于订阅一个或多个主题,`poll()`方法用于轮询获取新消息。 - 消费者通过自动或手动提交...
5. **Partitions**: 主题被分成多个分区,分区内的消息按照顺序存储,且每个分区只有一个消费者实例在消费,确保了消息的顺序性。 6. **Replication**: 分区可以有副本,以实现容错性。如果主分区失败,一个副本将...
8. **Kafka Streams**:Kafka的Java库,用于在Kafka集群内进行流处理,构建复杂的数据管道和应用程序。 9. **Zookeeper**:Kafka依赖Zookeeper来协调集群,管理元数据,例如首领选举和分区分配。 10. **性能**:...
Kafka 0.11 引入了多项重要改进,包括更好的消息压缩、增强的事务支持和元数据优化。Kafka Manager 的更新意味着它可以无缝地与这些新特性配合,为管理员提供全面的管理能力。例如: 1. **幂等性生产者**:0.11 ...
此配置文件中,我们启动了一个Zookeeper实例(zookeeper1)和一个Kafka实例(kafka1),并暴露了它们的默认端口。Kafka实例依赖于Zookeeper,并配置了连接到Zookeeper的环境变量。如果需要更大的集群,只需复制kafka...
Kafka支持多消费者模式,允许多个消费者实例分组,形成消费者组(Consumer Group),这样可以实现负载均衡和容错。 - ** broker **:Kafka集群中的节点称为broker,负责存储、读取和复制消息。 2. **Kafka_2.11-...
1. 创建`Properties`对象以设置配置参数,如`bootstrap.servers`(Kafka集群地址)、`key.serializer`和`value.serializer`(消息序列化类)。 2. 使用`Properties`创建`KafkaProducer`实例。 3. 调用`KafkaProducer...
- **消费者(Consumer)**:从Kafka集群中读取消息的应用程序,可以是单个实例或消费组的一部分。 - **消费者组(Consumer Group)**:一组消费者实例,它们共享主题中的消息,每个分区只能被组内一个消费者实例...
首先,它支持多集群配置,这意味着用户可以在一个统一的界面上管理多个Kafka实例,大大提升了工作效率。其次,它的实时监控功能可以直观展示集群的状态,包括节点健康状况、主题分区分布、消费者组的消费进度等,这...
《Kafka-Manager 2.0.0.2:高效管理Kafka集群的利器》 Kafka-Manager 2.0.0.2是一款专为Apache Kafka设计的管理工具,由社区成员自行编译,旨在简化Kafka集群的日常管理和监控工作。这款工具的出现,使得管理员能够...
- **Broker**:Kafka集群中的节点,负责存储和转发消息。 2. **Kafka工作流程** - 生产者将消息发送到一个或多个主题的分区。 - 消息被持久化到磁盘,确保即使服务器故障也能恢复。 - 消费者从 broker 拉取数据...
通过这个配置实例,我们可以了解到Kafka如何利用SASL/PLAIN进行简单的用户名和密码认证,以及如何在Kafka集群和客户端之间建立安全的连接。这为保障数据的安全传输提供了基础,同时,了解和实践这些配置将有助于我们...
- 消费者:创建消费者实例,订阅一个或多个主题,然后通过调用Consumer的Consume方法获取消息。处理完消息后,通常需要调用Commit来确认已处理的消息,以便Kafka可以释放相应的存储空间。 - 错误处理:客户端库会...
3. **可扩展性**:Kafka支持水平扩展,只需添加更多broker即可提高集群容量。 4. **容错性**:通过复制机制,Kafka可以在broker故障时自动恢复服务,保持高可用。 5. **多语言支持**:Kafka提供了多种语言的客户端...
`Testcontainers`库可以用来启动临时的Kafka实例进行测试。 7. **并发与消费策略**: - Spring Boot允许配置消费者实例的数量,以并行处理消息。每个实例(消费者组内的成员)可以负责处理一部分分区,提高整体...
### Kafka集群部署与运维知识点详解 #### 一、Kafka概览 Kafka是一种高性能的分布式消息系统,具有以下特点: - **数据持久性**:Kafka通过高效的磁盘数据结构来实现消息的持久化存储,即使面对大量数据也能保持...
- **Broker**:Kafka 集群中的每一个服务器实例被称为 Broker。 - **Topic**:消息分类存储的逻辑单元。 - **Partition**:为了提高吞吐量和实现水平扩展,每个 Topic 可以分为多个 Partition。 - **Replication**:...
Kafka主要设计用于处理实时数据流,提供高吞吐量的数据传输能力,并能够持久化数据,支持多消费者和多生产者模型。 **PHP与Kafka的结合** 在PHP中使用Kafka可以方便地集成到Web应用中,实现消息队列、日志收集、...