`

Spring 初探(5) kafka

阅读更多
前期测试准备:
zookeeper安装,kafka安装
kafka:
1.启动:
kafka-server-start.bat ./../../config/server.properties
2.创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
3.创建producer
kafka-console-producer.bat --broker-list localhost:9092 --topic demo
4.创建consumer
kafka-console-consumer.bat --zookeeper localhost:2181 --topic demo
此步创建完后,producer发消息,consumer就可以看到了
5.查看kafka topic
kafka-list-topic.bat --zookeeper localhost:2181
代码部分:
1.在项目中pom.xml添加
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.11.0.1</version>
    </dependency>
</dependencies>
2.代码如例子中。
生产者代码:
Properties props = new Properties();   props.load(DemoKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));    
Producer<String, String> producer = new KafkaProducer(props,new StringSerializer(),new StringSerializer());
        while(true) {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("demo", "DemoKafkaProducer", "I am a message form DemoKafkaProducer!");
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println(recordMetadata.offset());
                     }
                }
            });

消费者关键代码:
KafkaConsumer consumer = new KafkaConsumer<String, String>(props, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("demo"));
ConsumerRecords<String, String> records = consumer.poll(100);
3.启动,程序成功。

其他问题:
Q1:如果kafka服务器不可用,生产者和消费者现象?
A:kafka服务器不可用时,生产者和消费者都可以启动,生产者发送数据时报错,但是获取错误信息大约1分钟。消费者则在 consumer.poll(100)阻塞。当kafka可用时,不用重新启动代码,直接使用即可,消费者可以获取到信息

git地址:https://github.com/leaf-it/demo-kafka

发现一个比较好的kafka网站:http://orchome.com/






0
1
分享到:
评论

相关推荐

    Spring for Apache Kafka API(Spring for Apache Kafka 开发文档).CHM

    Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。

    spring boot集成kafka

    在本文中,我们将深入探讨如何在Spring Boot应用中集成Apache Kafka。Kafka是一个高吞吐量、分布式的发布/订阅消息系统,常用于构建实时数据管道和流处理应用程序。Spring Boot是一个简化Spring应用开发的框架,它...

    spring-kafka 整合官方文档

    spring-kafka是Spring框架对Apache Kafka消息系统进行整合的一个项目,它简化了Kafka的使用,使其更容易与Spring应用程序集成。Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具备...

    22.Spring Cloud整合RabbitMQ或Kafka消息驱动

    在Spring Cloud框架中,整合RabbitMQ或Kafka作为消息驱动是常见的微服务间通信方式。这两种技术都是流行的消息中间件,用于实现异步处理、解耦和扩展性。下面将详细阐述它们在Spring Cloud中的应用。 首先,...

    spring-kafka源代码

    《深入剖析Spring Kafka源代码》 Spring Kafka是Spring框架的一部分,专为集成Apache Kafka而设计,提供了一套轻量级且强大的API,使得在Java应用中使用Kafka变得更加简单。本文将围绕Spring Kafka的源代码进行深度...

    Spring 集成 Kafka的配置文件及代码讲解

    在本文中,我们将深入探讨如何将Spring框架与Apache Kafka集成,以便实现在Spring应用中发送和接收消息。Kafka是一个高吞吐量、分布式的发布/订阅消息系统,而Spring框架是Java开发中最广泛使用的应用框架之一。通过...

    spring-kafka整合.docx

    Spring for Apache Kafka整合概述 Spring for Apache Kafka是Apache Kafka的一个Spring整合实现,旨在提供一个简洁、灵活的消息队列解决方案。下面是Spring for Apache Kafka的概述和关键特性: 概述 Spring for ...

    spring boot操作kafka例子

    在本文中,我们将深入探讨如何使用Spring Boot与Apache Kafka进行集成,从而实现高效的消息传递功能。Spring Boot简化了Kafka的配置和使用,使得开发者能够快速地构建基于事件驱动的应用程序。 首先,让我们理解...

    spring-boot集成kafka

    Spring for Apache Kafka提供了与Spring Boot整合的便利,通过引入`spring-kafka`和`spring-boot-starter`依赖,我们可以轻松地在Spring Boot应用中使用Kafka。添加以下依赖: ```xml &lt;groupId&gt;org.spring...

    springboot 基于spring-kafka动态创建kafka消费者

    5. **运行与测试**:启动Spring Boot应用,当`kafka.consumer.enabled`设置为`true`时,消费者将开始监听指定的Kafka主题。你可以通过发送消息到该主题来测试消费者的运行情况。 以上就是基于Spring Boot和Spring ...

    Spring for Apache Kafka.pdf

    ### Spring for Apache Kafka #### 一、概览与快速入门 **Spring for Apache Kafka** 是一个结合了 **Spring 框架** 和 **Apache Kafka** 的项目,它为开发基于Kafka的消息解决方案提供了核心的Spring概念支持。此...

    spring-kafka-demo

    《Spring Kafka 框架实战演示》 Spring Kafka 是 Spring 框架为集成 Apache Kafka 提供的一个模块,它使得在 Java 应用程序中使用 Kafka 变得简单且直观。本项目 "spring-kafka-demo" 提供了一个完整的示例,涵盖了...

    spring cloud kafka数据中间件源码

    5. **dm-kafka-client** "dm-kafka-client"可能是一个自定义的Kafka客户端库,用于封装Spring Cloud Kafka的相关操作,提供更方便的API供应用调用。这个客户端可能包含了生产者和消费者的实现,以及一些定制的配置...

    springboot - 2.7.3版本 - (七)整合Kafka

    在本文中,我们将深入探讨如何在Spring Boot 2.7.3版本中集成Apache Kafka,以便在微服务架构中实现高效的数据流处理。首先,让我们理解Spring Boot和Kafka的基本概念,然后逐步介绍如何配置和使用它们。 **Spring ...

    【基于xml方式】spring-kafka.zip

    当我们谈论"基于xml方式的Spring整合Kafka"时,意味着我们将使用XML配置文件来设置Spring与Kafka的集成。 Spring框架允许开发者通过XML配置来声明式地配置bean,包括数据源、事务管理器、以及与Kafka相关的消费者和...

    Spring Boot集群管理工具KafkaAdminClient使用方法解析

    Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    5. **Kafka配置**: - 为了确保生产者和消费者的正常运行,需要正确配置Kafka集群的地址、端口以及安全设置(如果使用SSL/TLS)。 - `bootstrap.servers`配置项是连接Kafka集群的初始节点列表,`acks`设置决定何时...

    spring-kafka文档.zip

    《Spring Kafka 深度解析》 Spring Kafka是Spring框架的一部分,它为Java开发者提供了一种集成Apache Kafka的简便方式,使我们能够充分利用Kafka的分布式消息传递能力。本文将深入探讨Spring Kafka的核心概念、主要...

    spring-kafka

    **Spring Kafka 深度解析** Spring Kafka 是一个由 Spring 社区开发的库,它为 Apache Kafka 提供了轻量级的、基于 Spring 的整合框架。Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用。Spring ...

Global site tag (gtag.js) - Google Analytics