一、环境准备:
下载kafka官方安装包:http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz
获取最新的下载包: http://kafka.apache.org/downloads.html
服务器使用:centos7 jdk1.8或者jdk1.7
kafka版本:kafka_2.10-0.9.0.1.tgz
二、安装JDK
解压配置环境变量即可(已安装openjdk,需要先卸载)
rpm包安装参照:http://wkm.iteye.com/blog/1249553
三、安装kafka:
# tar -xvf kafka_2.10-0.9.0.1.tgz
kafka解压即可使用
四、启动kafka
kafka依赖 ZooKeeper服务,故先要启动ZooKeeper服务。
kafka中带有ZooKeeper服务,启动方式如下:
# cd kafka_2.10-0.9.0.1
# ./bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper服务默认端口为 2181
接下来启动kafka服务
# ./bin/kafka-server-start.sh config/server.properties
kafka服务默认端口为 9092
五、测试kafka
1、创建名称为my-topic的topic:
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
2、查询topic列表
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
my-topic
3、启动消息生产者并发布消息:
# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
message1
4、启动消息消费者订阅接收消息:
# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic --from-beginning
message1
到这里kafka单击服务已经成功启动,并可以发送和接收消息了。
六、关闭kafka
1、关闭kafka
# ./bin/kafka-server-stop.sh
2、关闭ZooKeeper服务
# ./bin/zookeeper-server-stop.sh
-------到这里,单机版的kafka服务已成功完成安装配置了,下面我们用kafka-client来发布订阅消息---------
生产者代码样例详见官方文档:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
消费者代码样例详见官方文档:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
注意:由于上面的kafka服务配置的是单机版的,下面的代码运行需要与kafka服务在同一台机器上。
如果你的kafka-client和kafka服务不在同一台机器上,修改kafka服务配置文件
修改配置文件 kafka_2.10-0.9.0.1/config/server.properties
1、将
#advertised.host.name=<hostname routable by clients>
改为
advertised.host.name=kafka机器的IP地址
2、将
#host.name=localhost
改为
host.name=kafka机器的IP地址
添加依赖包:kafka-client
gradle 项目:
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.9.0.1'
maven项目:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
参照官方样例(生产者):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
for(Object key:props.keySet()){
LOG.info("key:" + key + ";value:" + props.get(key));
}
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 10000; i++) {
LOG.info("send ..." + i);
ProducerRecord record = new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i));
LOG.info("p:" + record.partition());
LOG.info("t:" + record.topic());
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(metadata != null) {
LOG.info("metadata:" + metadata.topic() + "" + metadata.partition() + "" + metadata.offset());
} else {
LOG.info("metadata:" + metadata);
}
if(exception != null) {
LOG.info("exception:" + exception.getMessage(), exception);
} else {
LOG.info("exception:" + exception);
}
}
});
}
producer.close();
参照官方样例(消费者):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
for(Object key:props.keySet()){
LOG.info("key:" + key + ";value:" + props.get(key));
}
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
int count = 0;
while (count < 1000) {
LOG.info("receive message...");
ConsumerRecords<String, String> records = consumer.poll(1000);
LOG.info("received");
for (ConsumerRecord<String, String> record : records) {
LOG.info("offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());
}
count ++;
}
consumer.close();
已经过测试,希望本文能帮到您。
(已迁移)
分享到:
相关推荐
依赖方式 <groupId>org.apache.kafka <artifactId>kafka_2.10 <version>0.10.0.0 但是没有被中央仓库的任何jar包依赖!您可以在这里点击下载
<groupId>org.apache.kafka <artifactId>kafka-clients <version>0.10.1.1 </dependency>
《Apache Kafka实战》这本书深入浅出地介绍了Apache Kafka这一分布式流处理平台的各个方面,旨在帮助读者掌握Kafka的实际应用和核心概念。Kafka是一个高吞吐量、低延迟的消息发布订阅系统,常用于构建实时数据管道和...
<groupId>org.apache.kafka <artifactId>kafka-log4j-appender <version>0.10.2.0 </dependency>
【Apache Jmeter Kafka Jar包】是Apache JMeter的扩展组件,专为测试和性能评估Apache Kafka消息系统设计。这个jar包允许用户在JMeter测试计划中直接发送和接收Kafka消息,从而对Kafka集群的性能进行深入分析。 ...
Apache Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用。在这个"apache-kafka-1.0.0 java Demo"中,我们将探讨如何使用Java API与Kafka进行交互,实现基本的消息生产与消费功能。这个Demo虽然没有...
Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。
This book is here to help you get familiar with Apache Kafka and to solve your challenges related to the consumption of millions of messages in publisher-subscriber architectures. It is aimed at ...
Kafka是一个对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。 1.Kafka集群包含一个或多个服务器,这种服务器被称为broker 2.Partition是物理上的概念,每个Topic...
Apache Kafka是一款分布式流处理平台,它被设计用来处理大量数据,并且提供一个统一、高吞吐量、低延迟的处理数据管道。由于Kafka的这些特性,它在大数据处理和实时数据管道领域得到了广泛的应用。Kafka作为一个开源...
《Apache Kafka Cookbook》是由PACKT在2015年出版的一本专著,专注于介绍Apache Kafka这一分布式流处理平台的实战技巧和最佳实践。Apache Kafka是一个高性能、可扩展且容错性强的消息中间件,它被广泛应用于大数据...
Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...
Kafka的核心特性包括高性能、可扩展性、持久性和可靠性,使其成为现代大数据生态系统中的关键组件之一。 #### 设置Apache Kafka集群 在设置Apache Kafka集群之前,需要了解其架构和组成部分: - **Broker**:Kafka...
本文将深入探讨"Streaming Architecture New Designs Using Apache Kafka and MapR Streams"这一主题,阐述如何利用这两种强大的工具构建高效、可扩展的流处理系统。 Apache Kafka是一种分布式流处理平台,由...
Apache Kafka 是一个分布式流处理平台,由LinkedIn 开发并捐赠给了Apache 软件基金会,现在已成为大数据领域的重要组件。Kafka 主要用于构建实时数据管道和流应用,它能够高效地处理大量的实时数据,同时提供了消息...
Apache Kafka 3.1.0 (Scala 2.12 :kafka_2.12-3.1.0.tgz) 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。) 是一个开源分布式事件流平台,被数千家公司用于高...
### Apache Kafka 知识点概览 #### 一、引言 Apache Kafka 是一款开源的流处理平台,由 LinkedIn 开发并捐赠给 Apache 软件基金会。它以一种高吞吐量、低延迟的方式处理实时数据流,并支持可扩展性、持久性和容错性...
Building Data Streaming Applications with Apache Kafka 英文azw3 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
Learning Apache Kafka Second Edition provides you with step-by-step, practical examples that help you take advantage of the real power of Kafka and handle hundreds of megabytes of messages per second ...