Kafka集群安装
kafka是一种消息队列。用于大规模的在系统间传递消息。详细介绍参见官网: https://kafka.apache.org/intro
本例中我们使用kafka版本是2.3.1,用Scala2.12编译的版本。 环境配置使用《Hadoop及Yarn的HA集群安装》中的三台服务器的硬件环境,软件要求提前安装zookeeper。
1、下载安装包
在node01中获取安装包
cd /tools wget https://www-us.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
解压后进入目录
tar -xzf kafka_2.12-2.3.1.tgz cd kafka_2.12-2.3.1
2、编辑配置文件
vi config/server.properties
修改内容
broker.id=0 listeners=PLAINTEXT://node01:9092 log.dirs=/data/kafka-logs zookeeper.connect=node01:2181,node02:2181,node03:2181
将kafka_2.12-2.3.0目录分发到node02,node03
cd /tools scp -r kafka_2.12-2.3.1 root@node02:`pwd` scp -r kafka_2.12-2.3.1 root@node03:`pwd`
修改ndoe02上kafka的server.properties
broker.id=1 listeners=PLAINTEXT://node02:9092
修改ndoe03上kafka的server.properties
broker.id=2 listeners=PLAINTEXT://node03:9092
在三台机器上创建kafka用的日志目录
mkdir -r /data/kafka-logs
3、命令行使用
1)启动服务
在三台机器上执行启动命令
cd /tools/kafka_2.12-2.3.1 bin/kafka-server-start.sh -daemon config/server.properties
2)创建topic主题
在ndoe01上执行
bin/kafka-topics.sh --create --bootstrap-server node01:9092 --replication-factor 3 --partitions 6 --topic my-topic
bootstrap-server任意一个kafka节点。
replication-factor:副本的个数,集群宕机此数量-1,仍可正常运行。但此数量会影响磁盘吞吐量,所以也不宜过多,一般在[2,4]之间。
partitions:主题分区,起到负载均衡的数量,一般一个主题的分区数是节点数到节点数的2倍。过多的分区会使读写碎片化,一个集群总的分区数不应超过10万。
副本和分区尽量在主题创建时设置好,后面增加会有比较大的开销,尤其是副本,最好不要变。
3)查看topic信息
查看集群上所有topic
bin/kafka-topics.sh --list --bootstrap-server node01:9092
查看某个topic的信息
bin/kafka-topics.sh --describe --bootstrap-server node01:9092 --topic my-topic
4)删除topic
bin/kafka-topics.sh --delete --bootstrap-server node01:9092 --topic xxx
5)重新设置分区数
bin/kafka-topics.sh --alter --bootstrap-server node01:9092 --topic my-topic --partitions 4
6)生产消息
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic my-topic
7)消费消息
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic my-topic
生产和消费在不同的窗口执行,就可以一边发送消息,一边看到接收的结果。
4、JAVA API
API大致包括5个部分:
Producer API:生产者API。允许应用通过某些主题发送消息到kafka集群。
Consumer API:消费者API。允许应用从kafka集群接收某些主题的消息。
Streams API:流API。允许应用创建kafka的处理流。
Connect API:连接器API。允许应用链接数据源送入kafka集群或将kafka集群的消息拉出到其他系统。
AdminClient:管理API。允许管理主题(topic)、节点(brokers)和其他kafka对象。
maven引入的包是:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.3.1</version> </dependency>
1)生产者Producer
主类是KafkaProducer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。
代码片段如:
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。
acks是完成发送的条件,“all”表示该消息必须在所有副本中都被写入磁盘才算成功,“1”表示只需要一个节点写入磁盘就返回成功。
key.serializer和value.serializer是指定键和值的序列化类
send方法第一个参数是Topic,第二个参数是key,第三个参数是value。
还有一些其他发送相关的参数,比如
retries:重试次数;
batch.size:每当将多个记录发送到同一分区时,生产者将尝试将记录一起批处理成更少的请求, 此配置控制默认的批处理大小16k;
linger.ms:在这个时间内的所有消息合并成一个批处理请求,默认0;
buffer.memory:生产者可以用来缓冲等待发送到服务器的记录的总内存字节。 如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms,此后它将引发异常。
enable.idempotence:幂等,消息保证执行一次。设置为true,意味着retries=Integer.MAX_VALUE,acks=all
transaction.id:启用事务。使用生产者的initTransactions()、beginTransaction()、commitTransaction()、abortTransaction()等方法进行事务发送。
其他详情见文后的参考【1】。
2)消费者Consumer
主类是KafkaConsumer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。
代码片段如:
Properties props = new Properties(); props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。
group.id:消费者组。同组的消费者将分别使用被订阅主题的不同分区,这个分配过程默认是动态的,也可手动指定。
enable.auto.commit:设置enable.auto.commit表示偏移量将以配置auto.commit.interval.ms控制的频率自动提交。
消费者可以让kafka自动维护偏移量,也可以手动设定。手动设定将有更大的灵活性,但也增加了编码。偏移量甚至可以保存在kafka外,这样可以更好的保证消息只消费一次。
其他详情见文后的参考【2】。
3)管理AdminClient
主类是AdminClinet,构造函数带一个Properties的参数,Properties设置一些相关的参数值。
例如创建主题的代码段:
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); AdminClient ad = AdminClient.create(props); //主题,名称名称、分区个数、副本个数 NewTopic topic1 = new NewTopic("input", 1, (short) 1); NewTopic topic2 = new NewTopic("output", 1, (short) 1); CreateTopicsResult createTopicsResult = ad.createTopics(Arrays.asList(topic1, topic2)); //一定要get一下,否则不会创建主题 createTopicsResult.all().get();
5、Kafka Streams
kafka streams是在传输数据的同时可以进行流处理。他的输入源是一个主题的数据,经过流处理,输出到另一个主题中去。
举一个最简单的管道例子,管道接收一个主题的数据,把他们发送到另一个主题里去,中间什么都不做。
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class Pipe { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); //从主题(topic)streams-plaintext-input读取,写入主题(topic)streams-pipe-output builder.stream("streams-plaintext-input").to("streams-pipe-output"); final Topology topology = builder.build(); //下面可以打印这个流处理的拓步关系,数据从哪来到哪去 System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
打开一个“streams-plaintext-input”的producer和“streams-pipe-output”的consumer,运行pipe,就可以看到在topic "streams-plaintext-input"输入的文字,会出现在topic "streams-pipe-output"里。
下面是一个词语计次的例子:
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class WordCount { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
启动一个“streams-plaintext-input”的producer
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic streams-plaintext-input
启动一个“streams-wordcount-output”的consumer
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
其他详情见文后的参考【3】。
6、Kafka Connect
这块有很多成熟的工具可以用,所以这里就不详细介绍了编程方式的实现了。
详情见文后的参考【4】。
参考:
[1] https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html 生产者API
相关推荐
### Kafka集群安装与配置详解 #### 一、概述 在分布式计算环境中,消息队列扮演着重要的角色。Apache Kafka作为一款高性能的消息中间件,被广泛应用于日志收集、监控数据聚合、流处理等多个领域。本篇文章将详细...
通过本文档的学习,我们不仅深入了解了 Kafka 的架构原理和使用方法,还掌握了 Kafka 集群的安装部署过程。此外,我们还学习了 Kafka 生产者和消费者的 Java API 使用方法,以及 JMS 规范的相关概念。这些知识对于...
利用安装zookeeper的三台服务器搭建KAFKA集群,并对其进行验证测试
**Kafka集群安装部署全量指南** Apache Kafka是一款开源流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它设计为一个高吞吐量、分布式的消息队列系统,用于处理实时数据流。Kafka通常与ZooKeeper一起使用...
Kafka 集群安装配置 Kafka 集群安装配置是指在多台服务器上部署 Kafka 集群,以便实现高可用性和高性能的消息队列系统。下面是 Kafka 集群安装配置的详细步骤和知识点: 一、环境准备 * 系统版本:CentOS Linux ...
【Kafka集群安装部署-自带zookeeper】 Apache Kafka是一个分布式流处理平台,它被设计用于构建实时数据管道和流应用程序。Kafka的核心概念包括topic、producer、consumer和broker。Topic是消息的分类,producer负责...
redis zookeeper kafka集群安装手册
Kafka集群安装 Kafka是一种流行的分布式流媒体平台,用于构建实时数据管道和流媒体应用程序。下面是Kafka集群安装的详细步骤和相关知识点: 一、Kafka集群安装前提 在安装Kafka集群之前,需要安装Scala,因为...
java代码-使用java解决JEESZ-kafka集群安装的源代码 ——学习参考资料:仅用于个人学习使用!
Kafka集群搭建和使用过程涉及多个技术要点和配置项,包括SASL安全机制、ACL权限设置、Kafka基础概念以及安装配置步骤等。下面将详细介绍这些知识点。 首先,SASL(Simple Authentication and Security Layer)是为C...
搭建Kafka集群涉及到对虚拟机的安装配置、JDK环境的搭建、Zookeeper的安装配置等关键步骤。下面详细介绍各个知识点。 首先,虚拟机的安装是搭建Kafka集群的基础。文中提到了使用VMWare来安装三台虚拟机,并分配了...
本篇文档将详细介绍如何在Linux环境中搭建Kafka集群,同时结合Hadoop和Spark的分布式安装,构建一个完整的数据处理平台。 首先,搭建Kafka集群的基础条件包括: 1. 至少一台Linux服务器,但为了高可用性,推荐多台...
### Kafka集群部署与运维知识点详解 #### 一、Kafka概览 Kafka是一种高性能的分布式消息系统,具有以下特点: - **数据持久性**:Kafka通过高效的磁盘数据结构来实现消息的持久化存储,即使面对大量数据也能保持...
本文不讲kafka集群原理,只谈部署步骤。 默认读者已对kafka有最基本的认知,纯粹作为部署笔记,方便回忆。 另外本文是基于Windows部署的,Linux的步骤是基本相同的(只是启动脚本位置不同)。 kafka集群类型: ...
zookeeper集群部署,kafka集群部署,kafka介绍,topic创建、删除、kafka监控
docker容器中搭建kafka集群环境,kafka集群配置注意事项与优化
Kafka分布式集群安装部署 Kafka是一个流行的分布式消息队列系统,广泛应用于大数据处理、实时数据处理和流处理等领域。为了确保Kafka的高可用性和可靠性,需要安装和部署分布式集群。本文将详细介绍Kafka分布式集群...
总之,搭建Kafka集群涉及多个步骤,包括Zookeeper集群的配置、Kafka的安装与配置、主题创建以及服务启动。正确配置和管理Kafka集群对于实现高效、稳定的数据流处理至关重要。随着对Kafka的深入理解和实践,你可以...