前期测试准备:
zookeeper安装,kafka安装
kafka:
1.启动:
kafka-server-start.bat ./../../config/server.properties
2.创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
3.创建producer
kafka-console-producer.bat --broker-list localhost:9092 --topic demo
4.创建consumer
kafka-console-consumer.bat --zookeeper localhost:2181 --topic demo
此步创建完后,producer发消息,consumer就可以看到了
5.查看kafka topic
kafka-list-topic.bat --zookeeper localhost:2181
代码部分:
1.在项目中pom.xml添加
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.1</version>
</dependency>
</dependencies>
2.代码如例子中。
生产者代码:
Properties props = new Properties(); props.load(DemoKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
Producer<String, String> producer = new KafkaProducer(props,new StringSerializer(),new StringSerializer());
while(true) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("demo", "DemoKafkaProducer", "I am a message form DemoKafkaProducer!");
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println(recordMetadata.offset());
}
}
});
消费者关键代码:
KafkaConsumer consumer = new KafkaConsumer<String, String>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("demo"));
ConsumerRecords<String, String> records = consumer.poll(100);
3.启动,程序成功。
其他问题:
Q1:如果kafka服务器不可用,生产者和消费者现象?
A:kafka服务器不可用时,生产者和消费者都可以启动,生产者发送数据时报错,但是获取错误信息大约1分钟。消费者则在 consumer.poll(100)阻塞。当kafka可用时,不用重新启动代码,直接使用即可,消费者可以获取到信息
git地址:https://github.com/leaf-it/demo-kafka
发现一个比较好的kafka网站:http://orchome.com/
分享到:
相关推荐
Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。
在本文中,我们将深入探讨如何在Spring Boot应用中集成Apache Kafka。Kafka是一个高吞吐量、分布式的发布/订阅消息系统,常用于构建实时数据管道和流处理应用程序。Spring Boot是一个简化Spring应用开发的框架,它...
spring-kafka是Spring框架对Apache Kafka消息系统进行整合的一个项目,它简化了Kafka的使用,使其更容易与Spring应用程序集成。Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具备...
在Spring Cloud框架中,整合RabbitMQ或Kafka作为消息驱动是常见的微服务间通信方式。这两种技术都是流行的消息中间件,用于实现异步处理、解耦和扩展性。下面将详细阐述它们在Spring Cloud中的应用。 首先,...
《深入剖析Spring Kafka源代码》 Spring Kafka是Spring框架的一部分,专为集成Apache Kafka而设计,提供了一套轻量级且强大的API,使得在Java应用中使用Kafka变得更加简单。本文将围绕Spring Kafka的源代码进行深度...
在本文中,我们将深入探讨如何将Spring框架与Apache Kafka集成,以便实现在Spring应用中发送和接收消息。Kafka是一个高吞吐量、分布式的发布/订阅消息系统,而Spring框架是Java开发中最广泛使用的应用框架之一。通过...
5. **运行与测试**:启动Spring Boot应用,当`kafka.consumer.enabled`设置为`true`时,消费者将开始监听指定的Kafka主题。你可以通过发送消息到该主题来测试消费者的运行情况。 以上就是基于Spring Boot和Spring ...
Spring for Apache Kafka整合概述 Spring for Apache Kafka是Apache Kafka的一个Spring整合实现,旨在提供一个简洁、灵活的消息队列解决方案。下面是Spring for Apache Kafka的概述和关键特性: 概述 Spring for ...
在本文中,我们将深入探讨如何使用Spring Boot与Apache Kafka进行集成,从而实现高效的消息传递功能。Spring Boot简化了Kafka的配置和使用,使得开发者能够快速地构建基于事件驱动的应用程序。 首先,让我们理解...
在本文中,我们将深入探讨如何在Spring Boot 2.7.3版本中集成Apache Kafka,以便在微服务架构中实现高效的数据流处理。首先,让我们理解Spring Boot和Kafka的基本概念,然后逐步介绍如何配置和使用它们。 **Spring ...
### Spring for Apache Kafka #### 一、概览与快速入门 **Spring for Apache Kafka** 是一个结合了 **Spring 框架** 和 **Apache Kafka** 的项目,它为开发基于Kafka的消息解决方案提供了核心的Spring概念支持。此...
《Spring Kafka 框架实战演示》 Spring Kafka 是 Spring 框架为集成 Apache Kafka 提供的一个模块,它使得在 Java 应用程序中使用 Kafka 变得简单且直观。本项目 "spring-kafka-demo" 提供了一个完整的示例,涵盖了...
5. **dm-kafka-client** "dm-kafka-client"可能是一个自定义的Kafka客户端库,用于封装Spring Cloud Kafka的相关操作,提供更方便的API供应用调用。这个客户端可能包含了生产者和消费者的实现,以及一些定制的配置...
当我们谈论"基于xml方式的Spring整合Kafka"时,意味着我们将使用XML配置文件来设置Spring与Kafka的集成。 Spring框架允许开发者通过XML配置来声明式地配置bean,包括数据源、事务管理器、以及与Kafka相关的消费者和...
Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...
5. **Kafka配置**: - 为了确保生产者和消费者的正常运行,需要正确配置Kafka集群的地址、端口以及安全设置(如果使用SSL/TLS)。 - `bootstrap.servers`配置项是连接Kafka集群的初始节点列表,`acks`设置决定何时...
《Spring Kafka 深度解析》 Spring Kafka是Spring框架的一部分,它为Java开发者提供了一种集成Apache Kafka的简便方式,使我们能够充分利用Kafka的分布式消息传递能力。本文将深入探讨Spring Kafka的核心概念、主要...
**Spring Kafka 深度解析** Spring Kafka 是一个由 Spring 社区开发的库,它为 Apache Kafka 提供了轻量级的、基于 Spring 的整合框架。Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用。Spring ...
spring整合kafka集群,init.properties配置kafka集群信息(也可以单个kafka服务),kafka-consumer.xml配置消费者监听,kafka-producer.xml配置消息生产者。