1.安装zookeeper
2.安装kafka,kafka配置文件/usr/local/kafka_2.11-0.9.0.1/config/server.properties内容如下:
broker.id=0
listeners=PLAINTEXT://:9092
# The port the socket server listens on
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=192.168.152.130
advertised.host.name=192.168.152.130
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
3.java提供方代码:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProvider {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.152.130:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 400; i < 500; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
4.java消费方代码:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.152.130:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
System.out.println(record.value());
}
}
}
}
相关推荐
Kafka 配置调优实践 Kafka 配置调优实践是指通过调整 Kafka 集群的参数配置来提高其吞吐性能。下面是 Kafka 配置调优实践的知识点总结: 一、存储优化 * 数据目录优先存储到 XFS 文件系统或者 EXT4,避免使用 EXT...
CDH大数据平台kafka配置文件以及相关操作
**Kafka配置文件详解** Kafka是一个分布式流处理平台,其核心组件包括生产者、消费者和代理(broker)。在Kafka的运行中,`server.properties`是每个Kafka broker节点的核心配置文件,它定义了服务器的行为和参数。...
kafka 配置 kerberos,设置 ACL权限, java 客户端连接。
在本文中,我们将深入探讨如何在Apache Kafka中配置SASL/PLAIN认证机制,并通过具体的密码验证实现安全的通信。Kafka是一个分布式流处理平台,它在数据传输中扮演着重要角色,而安全性是其核心考量之一。SASL...
2. **Kafka配置文件** Kafka的配置主要通过修改`config/server.properties`文件进行。这个文件包含了Kafka服务器运行所需的各种参数。例如: - `broker.id`: 每个Kafka节点的唯一标识,通常从0开始。 - `...
Kafka 配置信息总结 Kafka 是一个基于 Publish-Subscribe 模式的分布式消息队列系统,配置信息是 Kafka 集群的核心组件。本文将对 Kafka 配置信息进行详细的解析,帮助读者更好地理解 Kafka 的配置机制。 Broker ...
Hyperledger Fabric默认使用solo共识,实际上它早就已经支持kafka共识,只是配置相对复杂点儿。该资源就是使用kafka共识的多orderer集群环境下的网络所需要使用的配置文件。你也可以参考下文帮您理解:...
### Kafka配置安装详解 #### 一、环境搭建与配置 Kafka是一款开源的消息队列中间件,被广泛应用于大数据处理领域。本篇文章将详细介绍如何在本地环境中安装并配置Kafka,以及进行基本的操作演示。 ##### 环境要求...
### Kafka配置Kerberos安全认证详解 #### 一、引言 Kafka 是一款高性能的消息队列服务,广泛应用于大数据处理领域。为了保障数据的安全性和完整性,Kafka 提供了多种安全认证机制,其中 Kerberos 认证是一种非常...
本篇将深入探讨Kafka配置参数,帮助你理解和优化Kafka集群的运行。 1. **broker.id**: 这个参数是每个Kafka broker的唯一标识,它必须在整个集群中是唯一的。值可以是任意整数,通常从0开始。 2. **zookeeper....
在实际使用过程中,需要对Kafka的配置参数进行详细理解,以便根据具体业务需求调整参数,优化性能。以下对Kafka主要配置参数进行详细解读: 1. broker.id:这是Kafka broker的唯一标识符,它是一个整数,用于唯一...
本资源是windows下kafka的环境配置及c++实现的kafka的producer相关代码,启动后可以测试c++的producer发送消息可以在windows下启动的kafka的customer接收消息。
这个压缩包提供了在CentOS6.5系统上安装和配置Kafka的详细文档,以及对应的软件版本,包括JDK1.7、Zookeeper-3.4.5和Kafka_2.10-0.10.0.0。 首先,我们需要理解Kafka的核心概念。Kafka是一个高吞吐量、低延迟的消息...
Kafka 配置步骤 Kafka 是一个流行的分布式流媒体平台,广泛应用于大数据处理、实时数据处理和日志处理等领域。为了成功地配置 Kafka 环境,需要按照某些步骤进行安装和配置。本文将详细介绍 Kafka 配置步骤,包括...
### Kafka配置详解 #### 一、Kafka简介与应用场景 Kafka是一款由LinkedIn开发并开源的分布式消息系统,采用Scala语言编写。它最初被设计用于LinkedIn的活动流和运营数据处理管道,具备高度可扩展性和高吞吐量的...
MySQL+Canal+Kafka 配置及 Python 实现文档 本文档将介绍如何使用 MySQL、Canal 和 Kafka 实现数据实时同步的配置和 Python 实现。 MySQL 配置 MySQL 需要开启日志记录功能,以便 Canal 监听日志变化。首先,...
【Kafka配置调优详解】 Kafka是一款高吞吐、分布式的流处理平台,它用于构建实时数据管道和流应用。在大型分布式系统中,为了保证高效稳定运行,对Kafka进行配置调优至关重要。本篇文章将深入解析Kafka的核心配置...
总的来说,Kafka配置涉及的要点包括安装、配置、环境变量设置以及服务启动,这些步骤都是大数据采集系统中Kafka部署的关键环节。理解并熟练掌握这些步骤,将有助于构建稳定高效的实时数据采集平台。
kafka配置文件zookeeper参数.md