`
qingwei201314
  • 浏览: 169102 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

kafka配置

 
阅读更多

1.安装zookeeper

2.安装kafka,kafka配置文件/usr/local/kafka_2.11-0.9.0.1/config/server.properties内容如下:


broker.id=0

listeners=PLAINTEXT://:9092

# The port the socket server listens on
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=192.168.152.130

advertised.host.name=192.168.152.130

num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1


log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000


zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

3.java提供方代码:

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProvider {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.152.130:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 400; i < 500; i++)
            producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

        producer.close();
    }
}

4.java消费方代码:


import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.152.130:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
                System.out.println(record.value());
            }
        }
    }
}

分享到:
评论

相关推荐

    kafka配置调优实践

    Kafka 配置调优实践 Kafka 配置调优实践是指通过调整 Kafka 集群的参数配置来提高其吞吐性能。下面是 Kafka 配置调优实践的知识点总结: 一、存储优化 * 数据目录优先存储到 XFS 文件系统或者 EXT4,避免使用 EXT...

    CDH平台kafka配置文件以及相关操作

    CDH大数据平台kafka配置文件以及相关操作

    Kafka 配置文件及编程

    **Kafka配置文件详解** Kafka是一个分布式流处理平台,其核心组件包括生产者、消费者和代理(broker)。在Kafka的运行中,`server.properties`是每个Kafka broker节点的核心配置文件,它定义了服务器的行为和参数。...

    单机 kafka 配置 kerberos,设置 ACL 权限

    kafka 配置 kerberos,设置 ACL权限, java 客户端连接。

    Kafka 配置用户名密码例子

    在本文中,我们将深入探讨如何在Apache Kafka中配置SASL/PLAIN认证机制,并通过具体的密码验证实现安全的通信。Kafka是一个分布式流处理平台,它在数据传输中扮演着重要角色,而安全性是其核心考量之一。SASL...

    kafka配置文件

    2. **Kafka配置文件** Kafka的配置主要通过修改`config/server.properties`文件进行。这个文件包含了Kafka服务器运行所需的各种参数。例如: - `broker.id`: 每个Kafka节点的唯一标识,通常从0开始。 - `...

    Kafka配置信息.docx

    Kafka 配置信息总结 Kafka 是一个基于 Publish-Subscribe 模式的分布式消息队列系统,配置信息是 Kafka 集群的核心组件。本文将对 Kafka 配置信息进行详细的解析,帮助读者更好地理解 Kafka 的配置机制。 Broker ...

    Hyperledger Fabric kafka配置

    Hyperledger Fabric默认使用solo共识,实际上它早就已经支持kafka共识,只是配置相对复杂点儿。该资源就是使用kafka共识的多orderer集群环境下的网络所需要使用的配置文件。你也可以参考下文帮您理解:...

    kafka配置安装详解

    ### Kafka配置安装详解 #### 一、环境搭建与配置 Kafka是一款开源的消息队列中间件,被广泛应用于大数据处理领域。本篇文章将详细介绍如何在本地环境中安装并配置Kafka,以及进行基本的操作演示。 ##### 环境要求...

    kafka 配置kerberos安全认证

    ### Kafka配置Kerberos安全认证详解 #### 一、引言 Kafka 是一款高性能的消息队列服务,广泛应用于大数据处理领域。为了保障数据的安全性和完整性,Kafka 提供了多种安全认证机制,其中 Kerberos 认证是一种非常...

    Kafka配置参数详解 - 网络技术.rar_KAFKA broker_Kafka配置参数详解_kafka

    本篇将深入探讨Kafka配置参数,帮助你理解和优化Kafka集群的运行。 1. **broker.id**: 这个参数是每个Kafka broker的唯一标识,它必须在整个集群中是唯一的。值可以是任意整数,通常从0开始。 2. **zookeeper....

    kafka配置参数详解 - 网络技术

    在实际使用过程中,需要对Kafka的配置参数进行详细理解,以便根据具体业务需求调整参数,优化性能。以下对Kafka主要配置参数进行详细解读: 1. broker.id:这是Kafka broker的唯一标识符,它是一个整数,用于唯一...

    windows的kafka配置及c++工程.zip

    本资源是windows下kafka的环境配置及c++实现的kafka的producer相关代码,启动后可以测试c++的producer发送消息可以在windows下启动的kafka的customer接收消息。

    linux环境下kafka配置文档及安装包

    这个压缩包提供了在CentOS6.5系统上安装和配置Kafka的详细文档,以及对应的软件版本,包括JDK1.7、Zookeeper-3.4.5和Kafka_2.10-0.10.0.0。 首先,我们需要理解Kafka的核心概念。Kafka是一个高吞吐量、低延迟的消息...

    Kafka配置步骤.docx

    Kafka 配置步骤 Kafka 是一个流行的分布式流媒体平台,广泛应用于大数据处理、实时数据处理和日志处理等领域。为了成功地配置 Kafka 环境,需要按照某些步骤进行安装和配置。本文将详细介绍 Kafka 配置步骤,包括...

    kafka配置.pptx

    ### Kafka配置详解 #### 一、Kafka简介与应用场景 Kafka是一款由LinkedIn开发并开源的分布式消息系统,采用Scala语言编写。它最初被设计用于LinkedIn的活动流和运营数据处理管道,具备高度可扩展性和高吞吐量的...

    mysql+canal+kafka配置及python实现文档.docx

    MySQL+Canal+Kafka 配置及 Python 实现文档 本文档将介绍如何使用 MySQL、Canal 和 Kafka 实现数据实时同步的配置和 Python 实现。 MySQL 配置 MySQL 需要开启日志记录功能,以便 Canal 监听日志变化。首先,...

    kafka配置调优1

    【Kafka配置调优详解】 Kafka是一款高吞吐、分布式的流处理平台,它用于构建实时数据管道和流应用。在大型分布式系统中,为了保证高效稳定运行,对Kafka进行配置调优至关重要。本篇文章将深入解析Kafka的核心配置...

    大数据采集技术-Kafka配置.pptx

    总的来说,Kafka配置涉及的要点包括安装、配置、环境变量设置以及服务启动,这些步骤都是大数据采集系统中Kafka部署的关键环节。理解并熟练掌握这些步骤,将有助于构建稳定高效的实时数据采集平台。

    kafka配置文件zookeeper参数.md

    kafka配置文件zookeeper参数.md

Global site tag (gtag.js) - Google Analytics