`
fly_ever
  • 浏览: 153122 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
阅读更多

 

kafka下载:http://kafka.apache.org/downloads

解压下载下来的文件,bin目录下是常用命令,config目录下是配置文件。

kafka已经内置了一个zookeeper环境,可以直接使用。

建议kafka在linux环境下使用。

 

Kafka的简单测试

以下是在windows环境下的一个kafka测试:

配置则直接使用默认的配置即可。

1,启动zookeeper:

进入bin/windows目录,运行命令:

>zookeeper-server-start.bat  ../../config/zookeeper.properties

2,启动kafka:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-server-start.bat ../../config/server.properties

3,创建一个Topic:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-topics.bat  -create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 -topic test

创建Topic完成后,可通过命令查看Topic:

>kafka-topics.bat -list --zookeeper localhost:2181

4,创建一个消息消费者:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-console-consumer.bat  --bootstrap-server localhost:9092 --topic test --from-beginning

执行命令后,暂时不会有任何信息返回,消费者在等待信息。

5,创建一个消息生产者:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-console-producer.bat  --broker-list localhost:9092 --topic test

此时输入信息,然后按回车键。则完成了一条消息的生产,此时可看到在消息消费者窗口,会打印出刚刚生产的消息。

此时一个Kafka的使用实例完成。

 

Java代码完成Kafka测试

也可以直接通过Java代码来完成上述过程:

如上面步骤3:创建一个Topic,Java代码如下:

public static void main(String[] args) {
		// TODO Auto-generated method stub

		Properties prop = new Properties();
		prop.put("bootstrap.servers", "localhost:9092");
		
		AdminClient ac = AdminClient.create(prop);
		ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
		NewTopic topic = new NewTopic("topic-test2",1,(short)1);
		topics.add(topic);
		
		CreateTopicsResult result = ac.createTopics(topics);
		try {
			System.out.println("show create topics result:");
			result.all().get();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

 然后通过命令

>kafka-topics.bat -list --zookeeper localhost:2181

可查看刚刚通过程序新增的Topic。

 

创建一个消息生产者,

public void createMsg(){
		prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		Producer<String,String> p = new KafkaProducer<String,String>(prop);
		for(int i=0;i<50;i++){
			p.send(new ProducerRecord<String,String>("topic-test1","topictest message" + i));
		}
		p.close();
	}

 

生产的消息能够通过命令查看:

>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic-test1  --from-beginning

 

创建一个消息消费者,把收到的消息信息展示出来:

public void doconsume(){
		prop.put("group.id", "test");
		prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		final KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop);
		consumer.subscribe(Arrays.asList("topic-test1"),new ConsumerRebalanceListener(){

			public void onPartitionsRevoked(
					Collection<TopicPartition> partitions) {
				// TODO Auto-generated method stub
				
			}

			public void onPartitionsAssigned(
					Collection<TopicPartition> partitions) {
				// TODO Auto-generated method stub
				consumer.seekToBeginning(partitions);
				}
		
		});
		
		while(true){
			ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
			for(ConsumerRecord<String,String> record: records)
				System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());
		}
	}

 

0
0
分享到:
评论

相关推荐

    Kafka入门、介绍、使用及部署

    ### Kafka入门、介绍、使用及部署 #### 一、Kafka简介 Kafka是由LinkedIn于2010年12月开源的一款强大的消息系统,它主要用于处理活跃的流式数据,如网站的PV(页面浏览量)、用户行为数据等。传统日志分析系统虽然...

    Apache Kafka入门介绍.zip

    **Apache Kafka 入门介绍** Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并贡献给了 Apache 软件基金会。Kafka 设计用于构建实时数据管道和流应用,能够高效地处理大量实时数据。它结合了消息队列和日志...

    大数据Kafka入门--理论+实践

    在大数据Kafka入门中,首先要了解的理论知识包括了Kafka的基本概念和架构设计。Kafka作为一个分布式的消息系统,其核心组件包括了Producer(生产者)、Consumer(消费者)、Broker(节点)和Topic(主题)。生产者...

    kafka入门必备手册

    下面将详细介绍Kafka入门的相关知识点。 **Kafka简介** Kafka是一个分布式流处理平台,它的核心功能是消息队列系统,用于处理大量实时数据的发布和订阅。它被设计为在分布式环境中运行,可以处理高并发的读写请求,...

    kafka入门到精通.txt

    ### Kafka入门到精通知识点概述 #### 一、Kafka简介 Kafka是由Apache软件基金会开发的一款开源流处理平台,主要用于构建实时数据管道以及基于流的数据处理应用。它以极高的吞吐量、低延迟和可扩展性而著称,被广泛...

    Kafka 入门基础篇.pptx

    Kafka 入门基础篇 Kafka 是 LinkedIn 公司开发的一种分布式消息队列系统,支持离线和在线日志处理。它可以实时处理大量数据,满足各种需求场景,如基于 Hadoop 的批处理系统、低延迟的实时系统、Storm/Spark 流式...

    kafka入门简介.docx

    ### Kafka入门简介 #### 一、Kafka概述 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。它是一个高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据,如网页...

    21.消息中间件之Kafka入门讲解

    **Kafka入门详解** Kafka是一款高性能、分布式的消息中间件,最初由LinkedIn开发,后成为Apache顶级项目。它主要用于处理实时数据流,提供高吞吐量的发布订阅服务,同时也支持离线数据处理。在本篇文章中,我们将...

    21.消息中间件之Kafka入门讲解(更新)

    **Kafka入门详解** Kafka是一款高性能的消息中间件,由LinkedIn开发并开源,现在是Apache软件基金会的顶级项目。它最初设计的目标是处理实时的流数据,但随着时间的发展,Kafka已经成为广泛应用于日志收集、监控...

    Kafka入门教程与详解

    Kafka入门教程与详解 Kafka是分布式发布-订阅消息系统,由 LinkedIn 公司开发,使用 Scala 语言编写,后成为 Apache 项目的一部分。Kafka 集群中没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,...

    kafka入门学习资料

    大数据组件-kafka入门学习,通过该部分学习,可以了解掌握kafka的基本常识

    kafka入门资料

    ### Kafka入门知识点详解 #### 一、概述与系统环境 Kafka是一款强大的分布式消息系统,主要应用于实时数据处理场景。其高效的数据传输能力和高吞吐量特性使其在大数据领域受到广泛青睐。 - **系统环境**:本文档...

    读书笔记:kafka入门与实践.zip

    读书笔记:kafka入门与实践

    读书笔记:kafka入门与实践代码实现.zip

    读书笔记:kafka入门与实践代码实现

    Kafka技术内幕-图文详解Kafka源码设计与实现

    Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...

    kafka入门demo

    Kafka入门Demo主要介绍了Apache Kafka的基本概念、工作原理以及如何进行简单操作,是初学者了解和使用Kafka的一个基础教程。Kafka是一个分布式流处理平台,由LinkedIn开发并开源,后来成为了Apache软件基金会的顶级...

    Kafka入门.pdf

    搭建单节点的Kafka环境涉及几个步骤,首先需要搭建单节点的ZooKeeper,因为Kafka的运行依赖于ZooKeeper的协调功能。搭建ZooKeeper后,需要下载并解压Apache Kafka的二进制包。Kafka的配置和启动步骤会在相关文档中...

    Kafka技术内幕图文详解源码设计与实现

    Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...

Global site tag (gtag.js) - Google Analytics