- 浏览: 2542848 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
Kafka Cluster 2019(4)Spring Boot Kafka Reactive
Found some articles and one samples here https://github.com/vanseverk/paymentprocessor-kafka-intro/tree/master/paymentprocessor-gateway
to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS
Add dependency supports to POM, pom.xml
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
Kafka Configuration class to load the configurations from YAML
package com.sillycat.webfluxlatency.config;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
@Slf4j
@Setter
@Getter
@Configuration
@EnableConfigurationProperties
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaSender<Integer, String> kafkaProducer() {
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
return KafkaSender.create(producerOptions);
}
@Bean
public KafkaReceiver<Object, Object> kafkaReceiver() {
final Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "webflux-1");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "webflux");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
.subscription(Collections.singleton("hello-webflux"))
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
return KafkaReceiver.create(consumerOptions);
}
}
The implementation class which send out messages and consume messages
package com.sillycat.webfluxlatency.service;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@Slf4j
@Service
public class KafkaServiceImpl implements KafkaService {
@Autowired
KafkaSender<Integer, String> kafkaSender;
@Autowired
KafkaReceiver<Object, Object> kafkaReceiver;
@PostConstruct
public void init(){
((Flux<ReceiverRecord<Object, Object>>) kafkaReceiver.receive()).doOnNext(r -> {
processEvent(r.value().toString());
r.receiverOffset().acknowledge();
}).subscribe();
}
private void processEvent(String message) {
log.info("received message:" + message);
}
public Mono<SenderResult<Integer>> send(final String payload) {
log.info("send out message:" + payload);
SenderRecord<Integer, String, Integer> message = SenderRecord
.create(new ProducerRecord<>("hello-webflux", payload), 1);
return kafkaSender.send(Mono.just(message)).next();
}
}
Webflux Controller which send back MONO
package com.sillycat.webfluxlatency.web;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import com.sillycat.webfluxlatency.service.KafkaService;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderResult;
@Slf4j
@Api(value = "/kafka/")
@RestController
public class KafkaController {
@Autowired
KafkaService kafkaService;
@PostMapping("/kafka/{topicName}")
public Mono<SenderResult<Integer>> sendToTopic(@PathVariable String topicName, @RequestBody String message) {
log.info("topicName " + topicName + " message " + message);
return kafkaService.send(message);
}
}
Application YAML configuration File
spring:
jackson:
serialization:
FAIL_ON_EMPTY_BEANS: false
kafka:
consumer:
group-id: mvccluster
bootstrap-servers: ubuntu-master:9092, ubuntu-dev2:9092, ubuntu-dev4:9092
Then I can easily send out messages and consume messages.
References:
https://github.com/reactor/reactor-kafka
https://www.reactiveprogramming.be/an-introduction-to-reactor-kafka/
https://codar.club/blogs/reactor-kafka-via-spring-boot-webflux.html
Found some articles and one samples here https://github.com/vanseverk/paymentprocessor-kafka-intro/tree/master/paymentprocessor-gateway
to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS
Add dependency supports to POM, pom.xml
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
Kafka Configuration class to load the configurations from YAML
package com.sillycat.webfluxlatency.config;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
@Slf4j
@Setter
@Getter
@Configuration
@EnableConfigurationProperties
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaSender<Integer, String> kafkaProducer() {
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
return KafkaSender.create(producerOptions);
}
@Bean
public KafkaReceiver<Object, Object> kafkaReceiver() {
final Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "webflux-1");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "webflux");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
.subscription(Collections.singleton("hello-webflux"))
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
return KafkaReceiver.create(consumerOptions);
}
}
The implementation class which send out messages and consume messages
package com.sillycat.webfluxlatency.service;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
@Slf4j
@Service
public class KafkaServiceImpl implements KafkaService {
@Autowired
KafkaSender<Integer, String> kafkaSender;
@Autowired
KafkaReceiver<Object, Object> kafkaReceiver;
@PostConstruct
public void init(){
((Flux<ReceiverRecord<Object, Object>>) kafkaReceiver.receive()).doOnNext(r -> {
processEvent(r.value().toString());
r.receiverOffset().acknowledge();
}).subscribe();
}
private void processEvent(String message) {
log.info("received message:" + message);
}
public Mono<SenderResult<Integer>> send(final String payload) {
log.info("send out message:" + payload);
SenderRecord<Integer, String, Integer> message = SenderRecord
.create(new ProducerRecord<>("hello-webflux", payload), 1);
return kafkaSender.send(Mono.just(message)).next();
}
}
Webflux Controller which send back MONO
package com.sillycat.webfluxlatency.web;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import com.sillycat.webfluxlatency.service.KafkaService;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderResult;
@Slf4j
@Api(value = "/kafka/")
@RestController
public class KafkaController {
@Autowired
KafkaService kafkaService;
@PostMapping("/kafka/{topicName}")
public Mono<SenderResult<Integer>> sendToTopic(@PathVariable String topicName, @RequestBody String message) {
log.info("topicName " + topicName + " message " + message);
return kafkaService.send(message);
}
}
Application YAML configuration File
spring:
jackson:
serialization:
FAIL_ON_EMPTY_BEANS: false
kafka:
consumer:
group-id: mvccluster
bootstrap-servers: ubuntu-master:9092, ubuntu-dev2:9092, ubuntu-dev4:9092
Then I can easily send out messages and consume messages.
References:
https://github.com/reactor/reactor-kafka
https://www.reactiveprogramming.be/an-introduction-to-reactor-kafka/
https://codar.club/blogs/reactor-kafka-via-spring-boot-webflux.html
发表评论
-
Update Site will come soon
2021-06-02 04:10 1672I am still keep notes my tech n ... -
Stop Update Here
2020-04-28 09:00 310I will stop update here, and mo ... -
NodeJS12 and Zlib
2020-04-01 07:44 468NodeJS12 and Zlib It works as ... -
Docker Swarm 2020(2)Docker Swarm and Portainer
2020-03-31 23:18 362Docker Swarm 2020(2)Docker Swar ... -
Docker Swarm 2020(1)Simply Install and Use Swarm
2020-03-31 07:58 364Docker Swarm 2020(1)Simply Inst ... -
Traefik 2020(1)Introduction and Installation
2020-03-29 13:52 330Traefik 2020(1)Introduction and ... -
Portainer 2020(4)Deploy Nginx and Others
2020-03-20 12:06 424Portainer 2020(4)Deploy Nginx a ... -
Private Registry 2020(1)No auth in registry Nginx AUTH for UI
2020-03-18 00:56 428Private Registry 2020(1)No auth ... -
Docker Compose 2020(1)Installation and Basic
2020-03-15 08:10 367Docker Compose 2020(1)Installat ... -
VPN Server 2020(2)Docker on CentOS in Ubuntu
2020-03-02 08:04 445VPN Server 2020(2)Docker on Cen ... -
Buffer in NodeJS 12 and NodeJS 8
2020-02-25 06:43 377Buffer in NodeJS 12 and NodeJS ... -
NodeJS ENV Similar to JENV and PyENV
2020-02-25 05:14 468NodeJS ENV Similar to JENV and ... -
Prometheus HA 2020(3)AlertManager Cluster
2020-02-24 01:47 414Prometheus HA 2020(3)AlertManag ... -
Serverless with NodeJS and TencentCloud 2020(5)CRON and Settings
2020-02-24 01:46 332Serverless with NodeJS and Tenc ... -
GraphQL 2019(3)Connect to MySQL
2020-02-24 01:48 243GraphQL 2019(3)Connect to MySQL ... -
GraphQL 2019(2)GraphQL and Deploy to Tencent Cloud
2020-02-24 01:48 445GraphQL 2019(2)GraphQL and Depl ... -
GraphQL 2019(1)Apollo Basic
2020-02-19 01:36 321GraphQL 2019(1)Apollo Basic Cl ... -
Serverless with NodeJS and TencentCloud 2020(4)Multiple Handlers and Running wit
2020-02-19 01:19 307Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(3)Build Tree and Traverse Tree
2020-02-19 01:19 310Serverless with NodeJS and Tenc ... -
Serverless with NodeJS and TencentCloud 2020(2)Trigger SCF in SCF
2020-02-19 01:18 286Serverless with NodeJS and Tenc ...
相关推荐
spring-boot-starter-kafka示例程序\n 支持springcloud1.5.4,kafka0.8.2.x\n 项目地址:https://github.com/zhyea/spring-boot-starter-kafka
在本文中,我们将深入探讨如何在Spring Boot应用中集成Apache Kafka。Kafka是一个高吞吐量、分布式的发布/订阅消息系统,常用于构建实时数据管道和流处理应用程序。Spring Boot是一个简化Spring应用开发的框架,它...
Spring for Apache Kafka提供了与Spring Boot整合的便利,通过引入`spring-kafka`和`spring-boot-starter`依赖,我们可以轻松地在Spring Boot应用中使用Kafka。添加以下依赖: ```xml <groupId>org.spring...
带有Spring Boot Reactive + Kafka +服务器已发送事件+ Cassandra的流。 使用服务器发送事件和Kafka的HTTP流,通过Spring Boot Reactive和Reactor Kafka实现。 此外,还使用Spring Data将数据存储在Cassandra中。 ...
Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...
在本资源中,我们主要探讨的是如何在Spring Boot应用程序中集成Kafka,并利用它来实现事务处理,同时结合数据库等其他资源进行测试。这是一份非常实用的教程,旨在帮助开发者理解Kafka事务的工作原理以及如何在实际...
Spring Boot简化了Java开发,提供了快速构建应用的框架,而Spring for Apache Kafka则让Kafka的使用更加便捷。本项目即展示了如何在Spring Boot应用中集成Kafka,实现消息的发布与订阅。 ### 1. Spring Boot配置 ...
在本文中,我们将深入探讨如何在Spring Boot 2.7.3版本中集成Apache Kafka,以便在微服务架构中实现高效的数据流处理。首先,让我们理解Spring Boot和Kafka的基本概念,然后逐步介绍如何配置和使用它们。 **Spring ...
spring-kafka是Spring框架对Apache Kafka消息系统进行整合的一个项目,它简化了Kafka的使用,使其更容易与Spring应用程序集成。Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具备...
在本文中,我们将深入探讨如何使用Spring Boot与Apache Kafka进行集成,从而实现高效的消息传递功能。Spring Boot简化了Kafka的配置和使用,使得开发者能够快速地构建基于事件驱动的应用程序。 首先,让我们理解...
《Spring Boot 1.5与Kafka流应用详解》 在现代微服务架构中,实时数据处理和流处理成为了一项重要技术。Spring Boot作为Java生态中的轻量级框架,结合Apache Kafka强大的流处理能力,可以构建高效、可扩展的数据...
在本文中,我们将深入探讨如何在Spring Boot 2.7.3版本的项目中整合ELK(Elasticsearch、Logstash、Kafka)堆栈,以便实现高效且可扩展的日志管理和分析。ELK组合提供了实时日志收集、处理和搜索的能力,而Kafka作为...
在本示例中,我们将探讨如何整合Spring Boot、Kafka、Hibernate和Redis这四个关键的技术组件,构建一个高效、可扩展的应用系统。Spring Boot简化了Java应用的开发过程,Kafka是一个分布式消息中间件,Hibernate是...
在`pom.xml`文件中,引入Spring for Apache Kafka的依赖: ```xml <groupId>org.springframework.kafka <artifactId>spring-kafka <version>2.5.13 ``` 接下来,配置Kafka连接信息。在`application.yml`或`...
4. **KafkaListener**: Spring Boot支持@KafkaListener注解,允许你创建监听特定主题的方法。这些方法会在接收到新消息时被调用,提供了一种声明式的消费消息方式。 5. **容器管理**: Spring Boot提供...
【标题】:“Kafka-Spring-Boot:整合Kafka、Kafka流与Spring Boot的实践” 在现代微服务架构中,消息队列扮演着至关重要的角色,它允许服务之间进行异步通信,提高系统的可扩展性和可靠性。Apache Kafka作为一个...
4. **动态创建消费者**:在Spring Boot中,我们通常通过监听器接口(如`KafkaListener`)来创建消费者。然而,如果你需要动态创建消费者,可以使用`@KafkaListener`配合`@ConditionalOnProperty`注解,根据特定的...
在本项目中,我们主要探讨的是如何将Spring Boot与Kafka 0.10.0.1集成,以及如何利用该集成实现消息监听、发送邮件的功能。Kafka是一个分布式流处理平台,它允许应用程序发布和订阅实时数据流,而Spring Boot则是一...
Spring Boot Starter Kafka是Spring Boot框架的一个扩展模块,它简化了在Spring Boot应用中集成Apache Kafka的过程。Apache Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。通过使用Spring Boot...
项目介绍一个例子,使用 spring boot 构建一个应用,从 kafka 队列中获取要推送的内容,通过`websocket`将内容推送到`web`端。项目依赖安装项目依赖kafkazookeeperzookeeper 安装解压, 进入 conf将 zoo_sample.cfg ...