`
sillycat
  • 浏览: 2551296 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Kafka Cluster 2019(4)Spring Boot Kafka Reactive

 
阅读更多
Kafka Cluster 2019(4)Spring Boot Kafka Reactive

Found some articles and one samples here https://github.com/vanseverk/paymentprocessor-kafka-intro/tree/master/paymentprocessor-gateway

to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS

Add dependency supports to POM, pom.xml
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>

Kafka Configuration class to load the configurations from YAML
package com.sillycat.webfluxlatency.config;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

@Slf4j
@Setter
@Getter
@Configuration
@EnableConfigurationProperties
public class KafkaConfig {
   
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
   
    @Bean
    public KafkaSender<Integer, String> kafkaProducer() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
        return KafkaSender.create(producerOptions);
    }
   
    @Bean
    public KafkaReceiver<Object, Object> kafkaReceiver() {
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "webflux-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "webflux");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("hello-webflux"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));

        return KafkaReceiver.create(consumerOptions);
    }

}

The implementation class which send out messages and consume messages
package com.sillycat.webfluxlatency.service;

import javax.annotation.PostConstruct;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

@Slf4j
@Service
public class KafkaServiceImpl implements KafkaService {

    @Autowired
    KafkaSender<Integer, String> kafkaSender;

    @Autowired
    KafkaReceiver<Object, Object> kafkaReceiver;

    @PostConstruct
    public void init(){
        ((Flux<ReceiverRecord<Object, Object>>) kafkaReceiver.receive()).doOnNext(r -> {
            processEvent(r.value().toString());
            r.receiverOffset().acknowledge();
        }).subscribe();
    }

    private void processEvent(String message) {
        log.info("received message:" + message);
    }

    public Mono<SenderResult<Integer>> send(final String payload) {
        log.info("send out message:" + payload);
        SenderRecord<Integer, String, Integer> message = SenderRecord
                .create(new ProducerRecord<>("hello-webflux", payload), 1);
        return kafkaSender.send(Mono.just(message)).next();
    }

}

Webflux Controller which send back MONO
package com.sillycat.webfluxlatency.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import com.sillycat.webfluxlatency.service.KafkaService;

import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderResult;

@Slf4j
@Api(value = "/kafka/")
@RestController
public class KafkaController {

    @Autowired
    KafkaService kafkaService;

    @PostMapping("/kafka/{topicName}")
    public Mono<SenderResult<Integer>> sendToTopic(@PathVariable String topicName, @RequestBody String message) {
        log.info("topicName " + topicName + " message " + message);
        return kafkaService.send(message);
    }

}

Application YAML configuration File
spring:
jackson:
  serialization:
   FAIL_ON_EMPTY_BEANS: false
kafka:
  consumer:
   group-id: mvccluster
  bootstrap-servers: ubuntu-master:9092, ubuntu-dev2:9092, ubuntu-dev4:9092

Then I can easily send out messages and consume messages.


References:
https://github.com/reactor/reactor-kafka
https://www.reactiveprogramming.be/an-introduction-to-reactor-kafka/
https://codar.club/blogs/reactor-kafka-via-spring-boot-webflux.html

分享到:
评论

相关推荐

    spring-boot-starter-kafka示例程序

    spring-boot-starter-kafka示例程序\n 支持springcloud1.5.4,kafka0.8.2.x\n 项目地址:https://github.com/zhyea/spring-boot-starter-kafka

    spring boot集成kafka

    在本文中,我们将深入探讨如何在Spring Boot应用中集成Apache Kafka。Kafka是一个高吞吐量、分布式的发布/订阅消息系统,常用于构建实时数据管道和流处理应用程序。Spring Boot是一个简化Spring应用开发的框架,它...

    spring-boot集成kafka

    Spring for Apache Kafka提供了与Spring Boot整合的便利,通过引入`spring-kafka`和`spring-boot-starter`依赖,我们可以轻松地在Spring Boot应用中使用Kafka。添加以下依赖: ```xml &lt;groupId&gt;org.spring...

    spring-reactive-kafka-sse:通过Spring Boot Reactive + Kafka +服务器发送事件+ Cassandra进行流式传输

    带有Spring Boot Reactive + Kafka +服务器已发送事件+ Cassandra的流。 使用服务器发送事件和Kafka的HTTP流,通过Spring Boot Reactive和Reactor Kafka实现。 此外,还使用Spring Data将数据存储在Cassandra中。 ...

    Spring Boot集群管理工具KafkaAdminClient使用方法解析

    Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...

    kafka资源:Spring Boot演示应用程序,用于使用数据库等其他资源测试Kafka事务

    在本资源中,我们主要探讨的是如何在Spring Boot应用程序中集成Kafka,并利用它来实现事务处理,同时结合数据库等其他资源进行测试。这是一份非常实用的教程,旨在帮助开发者理解Kafka事务的工作原理以及如何在实际...

    springboot集成kafka进行消息发布和订阅jar

    Spring Boot简化了Java开发,提供了快速构建应用的框架,而Spring for Apache Kafka则让Kafka的使用更加便捷。本项目即展示了如何在Spring Boot应用中集成Kafka,实现消息的发布与订阅。 ### 1. Spring Boot配置 ...

    springboot - 2.7.3版本 - (七)整合Kafka

    在本文中,我们将深入探讨如何在Spring Boot 2.7.3版本中集成Apache Kafka,以便在微服务架构中实现高效的数据流处理。首先,让我们理解Spring Boot和Kafka的基本概念,然后逐步介绍如何配置和使用它们。 **Spring ...

    spring-kafka 整合官方文档

    spring-kafka是Spring框架对Apache Kafka消息系统进行整合的一个项目,它简化了Kafka的使用,使其更容易与Spring应用程序集成。Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具备...

    spring boot操作kafka例子

    在本文中,我们将深入探讨如何使用Spring Boot与Apache Kafka进行集成,从而实现高效的消息传递功能。Spring Boot简化了Kafka的配置和使用,使得开发者能够快速地构建基于事件驱动的应用程序。 首先,让我们理解...

    springboot 基于spring-kafka动态创建kafka消费者

    4. **动态创建消费者**:在Spring Boot中,我们通常通过监听器接口(如`KafkaListener`)来创建消费者。然而,如果你需要动态创建消费者,可以使用`@KafkaListener`配合`@ConditionalOnProperty`注解,根据特定的...

    springboot-kafkastream:Spring Boot 1.5和Kafka流示例

    《Spring Boot 1.5与Kafka流应用详解》 在现代微服务架构中,实时数据处理和流处理成为了一项重要技术。Spring Boot作为Java生态中的轻量级框架,结合Apache Kafka强大的流处理能力,可以构建高效、可扩展的数据...

    springboot - 2.7.3版本 - (八)ELK整合Kafka

    在本文中,我们将深入探讨如何在Spring Boot 2.7.3版本的项目中整合ELK(Elasticsearch、Logstash、Kafka)堆栈,以便实现高效且可扩展的日志管理和分析。ELK组合提供了实时日志收集、处理和搜索的能力,而Kafka作为...

    spring-boot+kafka+hibernate+redis示例

    在本示例中,我们将探讨如何整合Spring Boot、Kafka、Hibernate和Redis这四个关键的技术组件,构建一个高效、可扩展的应用系统。Spring Boot简化了Java应用的开发过程,Kafka是一个分布式消息中间件,Hibernate是...

    springboot-kafka-simple-demo

    在`pom.xml`文件中,引入Spring for Apache Kafka的依赖: ```xml &lt;groupId&gt;org.springframework.kafka &lt;artifactId&gt;spring-kafka &lt;version&gt;2.5.13 ``` 接下来,配置Kafka连接信息。在`application.yml`或`...

    springboot-kafka:springboot-kafka

    4. **KafkaListener**: Spring Boot支持@KafkaListener注解,允许你创建监听特定主题的方法。这些方法会在接收到新消息时被调用,提供了一种声明式的消费消息方式。 5. **容器管理**: Spring Boot提供...

    kafka-spring-boot:Kafka和Kafka流与Spring Boot

    【标题】:“Kafka-Spring-Boot:整合Kafka、Kafka流与Spring Boot的实践” 在现代微服务架构中,消息队列扮演着至关重要的角色,它允许服务之间进行异步通信,提高系统的可扩展性和可靠性。Apache Kafka作为一个...

    springboot集成kafka0.10.0.1并发送邮件项目

    在本项目中,我们主要探讨的是如何将Spring Boot与Kafka 0.10.0.1集成,以及如何利用该集成实现消息监听、发送邮件的功能。Kafka是一个分布式流处理平台,它允许应用程序发布和订阅实时数据流,而Spring Boot则是一...

    spring-boot-starter-kafka.zip

    Spring Boot Starter Kafka是Spring Boot框架的一个扩展模块,它简化了在Spring Boot应用中集成Apache Kafka的过程。Apache Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。通过使用Spring Boot...

    spring-boot-kafka-websocket:Spring Boot Kafka Websocket演示

    项目介绍一个例子,使用 spring boot 构建一个应用,从 kafka 队列中获取要推送的内容,通过`websocket`将内容推送到`web`端。项目依赖安装项目依赖kafkazookeeperzookeeper 安装解压, 进入 conf将 zoo_sample.cfg ...

Global site tag (gtag.js) - Google Analytics