`
sunbin
  • 浏览: 352438 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Kafka集群部署

 
阅读更多

 

集群规划

 

Zookeeper集群共三台服务器,分别为:sto1、sto2、sto3。

 

Kafka集群共三台服务器,分别为:sto1、sto2、sto3。

 

1、Zookeeper集群准备

 

kafka是一个分布式消息队列,需要依赖ZooKeeper,请先安装好zk集群。

 

Zookeeper集群安装步骤略。

 

 2、安装Kafka

 

下载压缩包(官网地址:http://kafka.apache.org/downloads.html

 解压:

 

tar zxvf kafka_2.10-0.9.0.1.tgz -C /opt/sgb

mv kafka_2.10-0.9.0.1/ kafka

 

 

修改配置文件:config/server.properties

 

broker.id=0   #参考zookeeper的myid  ,每个id不能一样,0,1,2集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器)
zookeeper.connect=sto1:2181,sto2:2181,sto3:2181  #zookeeper集群地址

 

代码分发
scp -r /opt/sgb/kafka/ sto2:/opt
scp -r /opt/sgb/kafka/ sto3:/opt

 

修改sto2、sto3上Kafka配置文件中的broker.id(分别在sto2、3服务器上执行以下命令修改broker.id)
sed -i -e 's/broker.id=.*/broker.id=1/' /opt/sgb/kafka/config/server.properties
sed -i -e 's/broker.id=.*/broker.id=2/' /opt/sgb/kafka/config/server.properties

 

3、启动Kafka集群

A、启动Zookeeper集群。

B、启动Kafka集群。

分别在三台服务器上执行以下命令启动:

bin/kafka-server-start.sh config/server.properties

 

 

4、测试

创建topic:
bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --create --replication-factor 2 --partitions 3 --topic test
查看topic列表
bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --list
查看“test”topic描述
bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --describe --topic test
创建生产者
bin/kafka-console-producer.sh --broker-list sto1:9092,sto2:9092,sto3:9092 --topic test
创建消费者
bin/kafka-console-consumer.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --from-beginning --topic test

 

 

 ----------------------------------------------------------------------------------------------------------

消費者
package bhz.kafka.example;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {
	
	public static final String topic = "test";

	public static void main(String[] args) {
    	
        Properties props = new Properties();
        props.put("zookeeper.connect", "sto1:2181,sto2:2181,sto3:2181");
        //group 代表一个消费组
        props.put("group.id", "group1");
        //zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

        //设置订阅主题。
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        
        KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        
        while (it.hasNext())
            System.out.println(it.next().message());
		}
}

 

 

生產者
package bhz.kafka.example;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class KafkaProducer {  
	  
	public static final String topic = "test";
	
    public static void main(String[] args) throws Exception {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "sto1:2181,sto2:2181,sto3:2181");	//声明zk  
        properties.put("serializer.class", StringEncoder.class.getName());  
        properties.put("metadata.broker.list", "sto1:9092");	// 声明kafka broker 
        properties.put("request.required.acks", "1");
        Producer producer = new Producer<Integer, String>(new ProducerConfig(properties));
        for(int i=0; i < 10; i++){
        	producer.send(new KeyedMessage<Integer, String>(topic, "hello kafka" + i)); 
        	System.out.println("send message: " + "hello kafka" + i);
        	TimeUnit.SECONDS.sleep(1);  
        }
        producer.close();
    }  
       
}  

 

 --------------------------------------------------------------------------------------------------------------------------------

基于docker环境搭建kafka集群(单机版)

1.kafka依赖于zookeeper ,所以需要拉取kafka和zookeeper镜像。

 

  docker pull wurstmeister/kafka
  docker pull wurstmeister/zookeeper
 

2.启动zookeeper容器(wurstmeister/zookeeper镜像拥有默认命令“/usr/sbin/sshd && bash /usr/bin/start-zk.sh”,所以只需启动一个守护式容器即可)

 docker run --name zookeeper -p 12181:2181 -d wurstmeister/zookeeper:latest

 

3.启动三个kafka容器

docker run -p 19092:9092 --name kafka1 -d -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

docker run -p 19093:9093 --name kafka2 -d -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

docker run -p 19094:9094 --name kafka3 -d -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

 

4.分别进入容器kafka1、kafka2、kafka3创建主题test1、test2、test3。下面以kafka3为例。

#在守护式容器中启动一个交互式进程
docker exec -i -t kafka3 /bin/bash

cd /opt/kafka_2.12-2.5.0/bin
#创建主题test3
./kafka-topics.sh --zookeeper 192.168.181.163:12181 --create --topic test3 --replication-factor 1 --partitions 3 Created topic "test3"
#查看主题test3
./kafka-topics.sh --zookeeper 192.168.181.163:12181 --describe --topic test3

 可以看到,已经是集群环境,可以看到leader机器、副本在分区上的保存情况,和ISR列表成员

 

5.测试集群,在kafka3上向test1发送消息,在kafka2上消费test1

kafka3

./kafka-console-producer.sh --broker-list 192.168.181.163:19092,192.168.181.163:19093,192.168.181.163:19094 --topic test1
>testword
>test

 kafka1

./kafka-console-consumer.sh --bootstrap-server 192.168.181.163:19092,192.168.181.163:19093,192.168.181.163:19094 --topic test1 --from-beginning

 

6.依次关闭kafka2、test1后查看集群状态

./kafka-topics.sh --zookeeper 192.168.181.163:12181 --describe --topic test3

./kafka-topics.sh --zookeeper 192.168.181.163:12181 --describe --topic test3

./kafka-topics.sh --zookeeper 192.168.181.163:12181 --describe --topic test3

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    kafka集群部署文档(部署,运维,FAQ)

    ### Kafka集群部署与运维知识点详解 #### 一、Kafka概览 Kafka是一种高性能的分布式消息系统,具有以下特点: - **数据持久性**:Kafka通过高效的磁盘数据结构来实现消息的持久化存储,即使面对大量数据也能保持...

    kafka集群部署、监控

    zookeeper集群部署,kafka集群部署,kafka介绍,topic创建、删除、kafka监控

    kafka集群部署步骤

    ### Kafka集群部署步骤详解 #### 一、安装Java环境(JDK 1.8) Kafka作为基于Java语言开发的消息中间件,其运行环境需要Java支持。为了确保Kafka能够正常运行,首先需要在每台服务器上安装Java环境。推荐使用JDK ...

    kafka集群部署文档.docx

    kafka集群部署文档 本文档详细介绍了Kafka集群的部署过程,涵盖了环境信息、JDK安装、Kafka安装、配置文件修改、集群启动等步骤。通过阅读本文档,读者可以了解Kafka集群的部署过程,并掌握相关的技术要点。 一、...

    Kafka集群部署手册1

    【Kafka集群部署手册1】 本手册主要涵盖了Kafka集群的部署步骤,包括Zookeeper和Kafka组件的配置。Kafka是一种高吞吐量的分布式发布订阅消息系统,而Zookeeper是Apache的一个分布式协调服务,它在Kafka集群中扮演着...

    云计算基础架构-Kafka集群部署.pptx

    《云计算基础架构:Kafka集群部署详解》 在当今的云计算环境中,高效的数据处理和传输是关键。Apache Kafka作为一个分布式流处理平台,被广泛应用于实时数据管道和消息系统,尤其在大数据处理领域,它的性能和可...

    zookeeper、kafka集群部署

    zookeeper配置、集群部署 kafka配置、集群部署 Window平台下

    《Kafka集群部署》配置文件

    现在,我们来深入探讨《Kafka集群部署》配置文件中的关键知识点。 1. **Kafka集群**: Kafka集群由一个或多个服务器(称为Brokers)组成,它们负责存储和转发消息。为了实现高可用性和容错性,通常会设置多个副本。...

    第10单元 Kafka集群部署1

    【Kafka集群部署详解】 Apache Kafka是一个分布式流处理平台,用于构建实时数据管道和流应用。在本单元中,我们将探讨如何部署一个Kafka集群,包括集群规划、环境准备、Kafka与Zookeeper的配置以及集群的启动和管理...

    kafka集群部署说明-wubin-200720.doc

    【Kafka集群部署详解】 Kafka是一个开源的消息中间件,由Apache软件基金会开发,它能够高效地处理大量的实时数据流。在大规模分布式系统中,Kafka通常被用于构建实时数据管道和流应用。以下是Kafka集群部署的详细...

    linux Redhat Kafka集群部署

    redhat linux 部署Kafka集群

    kafka集群部署资料包,包含对接jdk版本

    kafka集群不部署的时候,jdk版本不对报错,kafka启动不了,整合集群搭建打包完整工具。亲测可用。值得收藏使用

    HyperLedger Fabric开发实战 -Kafka集群部署

    在Hyperledger Fabric开发中,Kafka集群的部署是构建基于排序服务网络的关键步骤。Kafka是一种高可用、高性能的消息中间件,常用于处理大规模实时数据流。在Fabric中,Kafka作为ordering service(排序服务)的一...

    kafka集群.pdf

    Kafka 集群部署是实现高可用性和容错性的关键步骤,确保数据的可靠传输和存储。以下是对 Kafka 集群部署的详细解释: 1. **环境准备**: - 系统版本:在本例中,使用的操作系统是 CentOS Linux release 7.5.1804 ...

    介绍kafka及kafka集群安装

    #### Kafka 集群部署步骤 1. **准备多台机器**:确保每台机器上都已安装好 Java 环境。 2. **配置每个 Broker 的 server.properties 文件**:包括 Broker ID、监听地址、ZooKeeper 地址等。 3. **启动 ZooKeeper ...

    liunx安装kafka及集群部署

    #### 三、Kafka集群部署 Kafka集群通常包括多个Kafka Broker实例,这些实例通过ZooKeeper协调工作。为了构建一个Kafka集群,你需要: 1. **准备多个服务器**:至少三个节点,分别作为Broker。 2. **安装Kafka**:在...

    使用sasl的kafka集群的搭建使用

    Kafka集群搭建和使用过程涉及多个技术要点和配置项,包括SASL安全机制、ACL权限设置、Kafka基础概念以及安装配置步骤等。下面将详细介绍这些知识点。 首先,SASL(Simple Authentication and Security Layer)是为C...

    快速部署单机kafka集群(win环境)

    本文不讲kafka集群原理,只谈部署步骤。 默认读者已对kafka有最基本的认知,纯粹作为部署笔记,方便回忆。 另外本文是基于Windows部署的,Linux的步骤是基本相同的(只是启动脚本位置不同)。 kafka集群类型: ...

    kafka、Zk 集群部署技术手册v0.1

    3. **Kafka集群部署**: - **服务器配置**:设置broker节点,包括端口、日志目录、zk连接等。 - **Zookeeper集群**:部署多个Zookeeper节点以实现高可用性。 - **主题创建**:手动或通过API创建主题,并设置分区...

Global site tag (gtag.js) - Google Analytics