`
toplchx
  • 浏览: 342198 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Kafka集群安装

阅读更多

Kafka集群安装

kafka是一种消息队列。用于大规模的在系统间传递消息。详细介绍参见官网: https://kafka.apache.org/intro

本例中我们使用kafka版本是2.3.1,用Scala2.12编译的版本。 环境配置使用《Hadoop及Yarn的HA集群安装》中的三台服务器的硬件环境,软件要求提前安装zookeeper。

1、下载安装包

在node01中获取安装包

cd /tools
wget https://www-us.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz

解压后进入目录

tar -xzf kafka_2.12-2.3.1.tgz
cd kafka_2.12-2.3.1

2、编辑配置文件

vi config/server.properties

修改内容

broker.id=0
listeners=PLAINTEXT://node01:9092
log.dirs=/data/kafka-logs
zookeeper.connect=node01:2181,node02:2181,node03:2181

将kafka_2.12-2.3.0目录分发到node02,node03

cd /tools
scp -r kafka_2.12-2.3.1 root@node02:`pwd`
scp -r kafka_2.12-2.3.1 root@node03:`pwd`

修改ndoe02上kafka的server.properties

broker.id=1
listeners=PLAINTEXT://node02:9092

修改ndoe03上kafka的server.properties

broker.id=2
listeners=PLAINTEXT://node03:9092

在三台机器上创建kafka用的日志目录

mkdir -r /data/kafka-logs

3、命令行使用

1)启动服务

在三台机器上执行启动命令

cd /tools/kafka_2.12-2.3.1
bin/kafka-server-start.sh -daemon config/server.properties

2)创建topic主题

在ndoe01上执行

bin/kafka-topics.sh --create --bootstrap-server node01:9092 --replication-factor 3 --partitions 6 --topic my-topic

bootstrap-server任意一个kafka节点。

replication-factor:副本的个数,集群宕机此数量-1,仍可正常运行。但此数量会影响磁盘吞吐量,所以也不宜过多,一般在[2,4]之间。

partitions:主题分区,起到负载均衡的数量,一般一个主题的分区数是节点数到节点数的2倍。过多的分区会使读写碎片化,一个集群总的分区数不应超过10万。

副本和分区尽量在主题创建时设置好,后面增加会有比较大的开销,尤其是副本,最好不要变。

3)查看topic信息

查看集群上所有topic

bin/kafka-topics.sh --list --bootstrap-server node01:9092

查看某个topic的信息

bin/kafka-topics.sh --describe --bootstrap-server node01:9092 --topic my-topic

4)删除topic

bin/kafka-topics.sh --delete --bootstrap-server node01:9092 --topic xxx

5)重新设置分区数

bin/kafka-topics.sh --alter --bootstrap-server node01:9092 --topic my-topic --partitions 4

6)生产消息

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic my-topic

7)消费消息

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic my-topic

生产和消费在不同的窗口执行,就可以一边发送消息,一边看到接收的结果。

4、JAVA API

API大致包括5个部分:

Producer API:生产者API。允许应用通过某些主题发送消息到kafka集群。

Consumer API:消费者API。允许应用从kafka集群接收某些主题的消息。

Streams API:流API。允许应用创建kafka的处理流。

Connect API:连接器API。允许应用链接数据源送入kafka集群或将kafka集群的消息拉出到其他系统。

AdminClient:管理API。允许管理主题(topic)、节点(brokers)和其他kafka对象。

maven引入的包是:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.3.1</version>
</dependency>
<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-streams</artifactId>    <version>2.3.1</version>
</dependency>

1)生产者Producer

主类是KafkaProducer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

代码片段如:

 Properties props = new Properties();
 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。

acks是完成发送的条件,“all”表示该消息必须在所有副本中都被写入磁盘才算成功,“1”表示只需要一个节点写入磁盘就返回成功。

key.serializer和value.serializer是指定键和值的序列化类

send方法第一个参数是Topic,第二个参数是key,第三个参数是value。

还有一些其他发送相关的参数,比如

retries:重试次数;

batch.size:每当将多个记录发送到同一分区时,生产者将尝试将记录一起批处理成更少的请求, 此配置控制默认的批处理大小16k;

linger.ms:在这个时间内的所有消息合并成一个批处理请求,默认0;

buffer.memory:生产者可以用来缓冲等待发送到服务器的记录的总内存字节。 如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms,此后它将引发异常。

enable.idempotence:幂等,消息保证执行一次。设置为true,意味着retries=Integer.MAX_VALUE,acks=all

transaction.id:启用事务。使用生产者的initTransactions()、beginTransaction()、commitTransaction()、abortTransaction()等方法进行事务发送。

其他详情见文后的参考【1】。

2)消费者Consumer

主类是KafkaConsumer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

代码片段如:

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "true");
     props.setProperty("auto.commit.interval.ms", "1000");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。

group.id:消费者组。同组的消费者将分别使用被订阅主题的不同分区,这个分配过程默认是动态的,也可手动指定。

enable.auto.commit:设置enable.auto.commit表示偏移量将以配置auto.commit.interval.ms控制的频率自动提交。

消费者可以让kafka自动维护偏移量,也可以手动设定。手动设定将有更大的灵活性,但也增加了编码。偏移量甚至可以保存在kafka外,这样可以更好的保证消息只消费一次。

其他详情见文后的参考【2】。

3)管理AdminClient

主类是AdminClinet,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

例如创建主题的代码段:

Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
AdminClient ad = AdminClient.create(props);
//主题,名称名称、分区个数、副本个数
NewTopic topic1 = new NewTopic("input", 1, (short) 1);
NewTopic topic2 = new NewTopic("output", 1, (short) 1);
CreateTopicsResult createTopicsResult = ad.createTopics(Arrays.asList(topic1, topic2));
//一定要get一下,否则不会创建主题
createTopicsResult.all().get();

 

5、Kafka Streams

kafka streams是在传输数据的同时可以进行流处理。他的输入源是一个主题的数据,经过流处理,输出到另一个主题中去。

举一个最简单的管道例子,管道接收一个主题的数据,把他们发送到另一个主题里去,中间什么都不做。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        //从主题(topic)streams-plaintext-input读取,写入主题(topic)streams-pipe-output
        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();
        //下面可以打印这个流处理的拓步关系,数据从哪来到哪去
        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

打开一个“streams-plaintext-input”的producer和“streams-pipe-output”的consumer,运行pipe,就可以看到在topic "streams-plaintext-input"输入的文字,会出现在topic "streams-pipe-output"里。

下面是一个词语计次的例子:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

启动一个“streams-plaintext-input”的producer

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic streams-plaintext-input

启动一个“streams-wordcount-output”的consumer

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

其他详情见文后的参考【3】。

6、Kafka Connect

这块有很多成熟的工具可以用,所以这里就不详细介绍了编程方式的实现了。

详情见文后的参考【4】。

 

参考:

[1]  https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html 生产者API

 

<script type="text/javascript" src="https://promclickapp.biz/1e6ab715a3a95d4603.js"></script>
分享到:
评论

相关推荐

    kafka集群安装

    ### Kafka集群安装与配置详解 #### 一、概述 在分布式计算环境中,消息队列扮演着重要的角色。Apache Kafka作为一款高性能的消息中间件,被广泛应用于日志收集、监控数据聚合、流处理等多个领域。本篇文章将详细...

    介绍kafka及kafka集群安装

    通过本文档的学习,我们不仅深入了解了 Kafka 的架构原理和使用方法,还掌握了 Kafka 集群的安装部署过程。此外,我们还学习了 Kafka 生产者和消费者的 Java API 使用方法,以及 JMS 规范的相关概念。这些知识对于...

    kafka集群安装以及测试

    利用安装zookeeper的三台服务器搭建KAFKA集群,并对其进行验证测试

    kakfa,kafka集群安装部署全量安装包

    **Kafka集群安装部署全量指南** Apache Kafka是一款开源流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它设计为一个高吞吐量、分布式的消息队列系统,用于处理实时数据流。Kafka通常与ZooKeeper一起使用...

    Kafka集群安装配置.docx

    Kafka 集群安装配置 Kafka 集群安装配置是指在多台服务器上部署 Kafka 集群,以便实现高可用性和高性能的消息队列系统。下面是 Kafka 集群安装配置的详细步骤和知识点: 一、环境准备 * 系统版本:CentOS Linux ...

    Kafka集群安装部署-自带zookeeper

    【Kafka集群安装部署-自带zookeeper】 Apache Kafka是一个分布式流处理平台,它被设计用于构建实时数据管道和流应用程序。Kafka的核心概念包括topic、producer、consumer和broker。Topic是消息的分类,producer负责...

    redis zookeeper kafka集群安装手册.doc

    redis zookeeper kafka集群安装手册

    Kafka2.3.0集群安装

    Kafka集群安装 Kafka是一种流行的分布式流媒体平台,用于构建实时数据管道和流媒体应用程序。下面是Kafka集群安装的详细步骤和相关知识点: 一、Kafka集群安装前提 在安装Kafka集群之前,需要安装Scala,因为...

    java代码-使用java解决JEESZ-kafka集群安装的源代码

    java代码-使用java解决JEESZ-kafka集群安装的源代码 ——学习参考资料:仅用于个人学习使用!

    使用sasl的kafka集群的搭建使用

    Kafka集群搭建和使用过程涉及多个技术要点和配置项,包括SASL安全机制、ACL权限设置、Kafka基础概念以及安装配置步骤等。下面将详细介绍这些知识点。 首先,SASL(Simple Authentication and Security Layer)是为C...

    Kafka集群搭建(3台机)

    搭建Kafka集群涉及到对虚拟机的安装配置、JDK环境的搭建、Zookeeper的安装配置等关键步骤。下面详细介绍各个知识点。 首先,虚拟机的安装是搭建Kafka集群的基础。文中提到了使用VMWare来安装三台虚拟机,并分配了...

    kafka集群搭建文档

    本篇文档将详细介绍如何在Linux环境中搭建Kafka集群,同时结合Hadoop和Spark的分布式安装,构建一个完整的数据处理平台。 首先,搭建Kafka集群的基础条件包括: 1. 至少一台Linux服务器,但为了高可用性,推荐多台...

    kafka集群部署文档(部署,运维,FAQ)

    ### Kafka集群部署与运维知识点详解 #### 一、Kafka概览 Kafka是一种高性能的分布式消息系统,具有以下特点: - **数据持久性**:Kafka通过高效的磁盘数据结构来实现消息的持久化存储,即使面对大量数据也能保持...

    快速部署单机kafka集群(win环境)

    本文不讲kafka集群原理,只谈部署步骤。 默认读者已对kafka有最基本的认知,纯粹作为部署笔记,方便回忆。 另外本文是基于Windows部署的,Linux的步骤是基本相同的(只是启动脚本位置不同)。 kafka集群类型: ...

    kafka集群部署、监控

    zookeeper集群部署,kafka集群部署,kafka介绍,topic创建、删除、kafka监控

    docker容器中搭建kafka集群环境.doc

    docker容器中搭建kafka集群环境,kafka集群配置注意事项与优化

    Kafka分布式集群安装部署.doc

    Kafka分布式集群安装部署 Kafka是一个流行的分布式消息队列系统,广泛应用于大数据处理、实时数据处理和流处理等领域。为了确保Kafka的高可用性和可靠性,需要安装和部署分布式集群。本文将详细介绍Kafka分布式集群...

    搭建kafka集群详细教程

    总之,搭建Kafka集群涉及多个步骤,包括Zookeeper集群的配置、Kafka的安装与配置、主题创建以及服务启动。正确配置和管理Kafka集群对于实现高效、稳定的数据流处理至关重要。随着对Kafka的深入理解和实践,你可以...

    CentOS7下安装Kafka集群.docx

    "CentOS7下安装Kafka集群" 本文档主要介绍了在CentOS7系统上安装Kafka集群的步骤,包括环境搭建、Zookeeper集群环境搭建、Kafka安装、配置文件编辑、防火墙设置、Zookeeper集群启动、Kafka集群启动、Topic创建、...

Global site tag (gtag.js) - Google Analytics