环境:
CentOS 5.5 Server版 32位
虚拟机内存4096M(最开始没注意,默认1024M,结果开启多个kafka时,就不断GC,导致应用无法使用)
CPU 1*1
硬盘 20G
1.下载软件
zookeeper: http://www.apache.org/dyn/closer.cgi/zookeeper/
wget http://apache.fayea.com/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
kafka:http://kafka.apache.org/downloads
wget http://mirrors.hust.edu.cn/apache/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
JDK:jdk-8u121-linux-i586.rpm
wget:http://download.oracle.com/otn-pub/java/jdk/8u121-b13/e9e7ea248e2c4826b92b3f075a80e441/jdk-8u121-linux-i586.rpm?AuthParam=1484725145_218b02b9ed050daba89d99daced369e0
2.创建用户
groupadd dev
useradd -G dev zookeper
passwd zookeeper
useradd -G dev afka
passwd kafka
3.安装软件
JDK
rpm -install jdk-8u121-linux-i586.rpm
zookeeper
tar -xvf zookeeper-3.4.9.tar.gz
kafka
tar -xvf kafka_2.11-0.10.1.1.tgz
4.更新配置
zookeeper
cd zookeeper/conf/
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
修改如下行
dataDir=/home/zookeeper/data/zookeeper/z01
clientPort=2181 #因为是部署在同一个虚拟机上的伪集群,所以端口不能冲突,多个节点分别修改为不同端口
#localhost可以使用,最好修改为虚拟机所分配的IP。前一个端口是多个节点之间相互通讯用,后一个端口是用于leader选举通讯。由于是伪集群,所以要避免端口冲突。server.x,x表示当前服务节点
server.1=localhost:2280:2281
server.2=localhost:2282:2283
server.3=localhost:2284:2285
拷贝zookeeper分别到三个不同目录,形成三个节点的伪集群
进入dataDir所配置的目录,心中myid文件,文件内容分别为1和2和3,即标识当前服务是哪个节点
kafka
cd kafka/config
#需要几个broker,就复制几份
cp service.properties service-1.properties
cp service.properties service-2.properties
cp service.properties service-3.properties
cp service.properties service-4.properties
修改service-x.properties
broker.id=1 #按照顺序分别是1/2/3/4
listeners=PLAINTEXT://192.168.88.129:9091 #修改为虚拟机所分配的IP,由于是伪集群,端口需要调整,避免冲突
log.dirs=/home/kafka/data/k01/logs #由于是伪集群,避免目录冲突
zookeeper.connect=localhost:2181 #zookeeper的地址和端口,根据前面zookeeper的配置进行调整(三个节点任意一个即可)由于伪集群,zookeeper和kafla在同一个虚拟机上,所以可以使用localhost,建议修改为虚拟机所分配的IP
5.服务器console验证
#启动zookeeper 进入zookeeper三个节点的bin目录
cd /home/zookeeper/app/zookeeper/z01/bin
./zkServer.sh start
cd /home/zookeeper/app/zookeeper/z02/bin
./zkServer.sh start
cd /home/zookeeper/app/zookeeper/z03/bin
./zkServer.sh start
#创建topics 5个分区 每个分区分别复制到3个broker(不能大于broker的总数量) 进入kafka/bin zookeeper的地址任选一个
./kafka-topics.sh --create --zookeeper 192.168.88.129:2181 --replication-factor 3 --partitions 5 --topic test
#查看topic信息 进入kafka/bin zookeeper的地址任选一个
./kafka-topics.sh --describe --zookeeper 192.168.88.129:2181 --topic test
#列出topic zookeeper的地址任选一个
./kafka-topics.sh --list --zookeeper 192.168.88.129:2181
#生产者 kafka的地址任选一个
./kafka-console-producer.sh --broker-list 192.168.88.129:9091 --topic test
#消费者 kafka的地址任选一个 bootstrap-server意味这个地址是用于发现所有broker地址的,并不表示仅针对这个broker
./kafka-console-consumer.sh --bootstrap-server 192.168.88.129:9091 --topic test --from-beginning
6.java客户端访问
pom.xml文件内容(切记增加logback/log4j,日志对问题的发现很有帮助)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.study</groupId>
<artifactId>kafka-client</artifactId>
<version>0.1.0</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.8</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.22</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Producer代码示例
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.129:9092");//用于发现其他broker地址的初始地址
props.put("acks", "all");//表示需要等待所有follower的确认后,消息才commit
props.put("retries", 0);//表示不重试
props.put("batch.size", 16384);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String topic = "test";
Producer<String, String> producer = new KafkaProducer<String,String>(props);
for(int i = 0; i < 10; i++){
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, "Key88&:"+i, "Value88&:"+i);//key用来计算partition,默认是用key.hash后计算出来
System.out.println(rec);
producer.send(rec, new Callback(){
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
exception.printStackTrace();
}
System.out.println("发送到服务器 -----Offset: " + metadata.offset() + "-----Topic:" + metadata.topic() + "-----partition:" + metadata.partition());
}
});
}
producer.close();
}
Consumer代码示例
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.129:9091");//用于发现其他broker地址的初始地址
props.put("group.id", "consumer-for-test");//consumer的分组
props.put("enable.auto.commit", "true");//自动提交
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "10000");//默认心跳是2000,如果要设置session.timeout.ms,则务必>2000,否则不设置
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("test"));//注册哪些topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
实验过程中kill掉部分kafka节点,当某个partition出现没有leader时,producer和consumer均无法正常响应,随后手工启动部分kafka节点,使得所有partition均有leader后,producer和consumer再次可正常响应
切记防火墙打开相应端口
#开启防火墙端口
/sbin/iptables -I INPUT -p tcp --dport 2181 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 2182 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 2183 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9091 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9093 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9094 -j ACCEPT
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart
#查看防火墙
/etc/init.d/iptables status
Producer和Consumer的其他示例,参考
http://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html
http://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
分享到:
相关推荐
最近在学习Kafka,准备测试集群状态的时候感觉无论是开三台虚拟机或者在一台虚拟机开辟三个不同的端口号都太麻烦了(嗯。。主要是懒)。 环境准备 一台可以上网且有CentOS7虚拟机的电脑 为什么使用虚拟机?因为使用...
自己整理的Hadoop环境的一些安装,和一些简单的使用,其中包括Hadoop、hbase、hive、mysql、zookeeper、Kafka、flume。都是一些简单的安装步骤和使用,只在自己的虚拟机(Linux centOS7)上使用测试过。按照步骤一步...
大数据综合实验环境搭建(3个集群、Zookeeper、Hive、HBase) 本资源摘要信息主要对大数据综合实验环境搭建进行了详细的介绍,涵盖了 JDK 的安装和配置、免密码登录的设置、Zookeeper 的安装、Hive 和 HBase 的安装...
### Kafka分布式集群搭建详解 #### 一、概述 Kafka是一种高性能、分布式的消息发布与订阅系统,被广泛应用于日志收集、流处理、消息传递等多个领域。为了提高系统的可用性与扩展性,通常会采用分布式集群的方式...
搭建Kafka集群涉及到对虚拟机的安装配置、JDK环境的搭建、Zookeeper的安装配置等关键步骤。下面详细介绍各个知识点。 首先,虚拟机的安装是搭建Kafka集群的基础。文中提到了使用VMWare来安装三台虚拟机,并分配了...
【Kafka集群搭建及测试】 Kafka是一种分布式流处理平台,常用于实时数据处理和大数据管道。本文档将详细介绍如何在三台Ubuntu 16虚拟机上搭建Kafka集群,并进行基本的测试,确保其正常运行。 **1. 准备工作** 在...
Zookeeper 是 Kafka 的一个依赖组件,用于维护 Kafka 集群的元数据。下载 Zookeeper 3.4.5,解压到 /usr/local,编辑 zoo.cfg 文件,设置 initLimit、syncLimit、dataDir 和 clientPort 等参数。 2. 配置 Zookeeper...
通过VirtualBox安装多台虚拟机,实现集群环境搭建。 优势:一台电脑即可。 应用场景:测试,学习。 注意事项:请严格按照文档操作,作者已经按照文档操作实现环境搭建。 内附百度网盘下载地址,有hadoop+zookeeper+...
安装HBase时,需要考虑集群的Zookeeper配置,因为Zookeeper用于协调HBase的各个组件。 Oozie是Hadoop的工作流调度器,用于管理Hadoop作业(包括MapReduce、Pig、Hive、Sqoop等)和Spark作业的调度。配置Oozie时,...
3. 启动Zookeeper:Kafka依赖Zookeeper进行集群管理和协调,确保启动Zookeeper服务。 4. 启动Kafka服务器:在Kafka安装目录下运行启动脚本,配置服务器设置如端口、日志位置等。 五、使用Kafka 1. 创建主题:使用...
1、三个节点,含多种环境快照 2、环境包含Hadoop、Hive、Zookeeper、Spark、Kafka、Hbase、ES、scala、jdk、mysql 3、最大资源占用:6核cpu、7G内存、90G磁盘容量
在这个文档中,我们关注的是在Fabric 1.4.0版本中如何搭建一个包含多个orderer节点的集群,以及与之相关的Kafka和ZooKeeper组件。多orderer集群能够提高系统的容错性和性能,因为它允许交易在多个orderer之间负载...
接下来,我们进入Kafka集群的搭建环节。首先,解压缩Kafka软件包并上传到每个节点的指定目录。然后,我们需要修改每个节点的配置文件`server.properties`。在配置文件中,`broker.id`是用来标识每个Kafka节点的唯一...
**一、Kafka集群搭建** 1. **安装与解压** - 首先,将Kafka的安装包(如`kafka_2.12-2.4.1.tgz`)上传到你的虚拟机,例如 `/export/software/` 目录。 - 使用 `tar` 命令解压安装包:`cd /export/software/; tar ...
1. **完全分布式部署**:在5台虚拟机上设置Kafka集群,我们需要配置每台机器上的`server.properties`文件,确保正确的broker ID(节点标识)和zookeeper连接字符串。同时,为了实现高可用性,需要配置副本因子和分区...
1. **搭建环境**:首先在本地或者虚拟机上部署一套Kafka集群。 2. **编写生产者程序**:创建一个简单的Java应用,向`kafka-demo`主题发送消息。 3. **编写消费者程序**:创建另一个Java应用,订阅`kafka-demo`主题...
Storm 集群的安装是分布式集群技术的基础,linux 环境准备、zookeeper 集群搭建、Storm 集群搭建、Storm 配置文件配置项讲解、集群搭建常见问题解决等内容将为读者提供了 Storm 集群的安装基础知识。 Storm 常用...
在分布式集群的应用部署中,ZooKeeper 和 Kafka 的集群部署同样重要。ZooKeeper 作为协调服务,负责管理分布式环境下的配置信息、命名和同步等任务,它是保证分布式系统正常运行的关键组件。Kafka 集群部署则主要...
在本文中,我们将深入探讨如何使用给定的压缩包"tarPackage.zip"来搭建一个完整的Apache Kafka环境。这个压缩包包含了三个重要的组件:JDK 8u251 for Linux,Kafka 2.2.2,以及Zookeeper 3.4.14。这三个组件是运行...
在这个“vagrant-apache-cluster”项目中,它被用来配置一个包含多种组件的集群环境,这些组件包括Apache Kafka、Apache Zookeeper、Apache Cassandra以及Apache Ignite。 Apache Kafka 是一个分布式流处理平台,常...