`
234390216
  • 浏览: 10232927 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462622
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1775515
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398353
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:395022
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:679982
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:530892
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1183946
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:467911
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151388
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68151
社区版块
存档分类
最新评论

Spring Cloud(12)——基于Apache Kafka的Stream实现

阅读更多

基于Apache Kafka的Stream实现

如果你的应用使用了Apache Kafka,你需要把它和Spring Cloud进行整合。需要在应用中添加如下依赖。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

然后就是Spring Cloud Stream的标准配置了。需要在@Configuration类上使用@EnableBinding声明需要应用的Binding。

@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
}

上面代码定义需要使用的Binding是Source和Sink接口中声明的input和output两个Binding。然后可以在application.properties文件中声明这两个Binding对应的destination,它们对应于kafka的Topic。如果指定的Topic还未创建,默认会自动进行创建。

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group

如果你的Kafka服务器不是本机或者监听端口不是默认的9092,则还需要通过spring.cloud.stream.kafka.binder.brokers指定Kafka的服务地址。

spring.cloud.stream.kafka.binder.brokers=localhost:9092

之后就是照常的使用Spring Cloud Stream的相关API进行操作了。如下是发送消息的示例。

@Component
@Slf4j
public class SourceProducer {

    @Autowired
    private Source source;

    public void sendMessages(String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        log.info("发送了一条消息-{}", msg);
        this.source.output().send(message);
    }

}

如下是监听消息的示例。

@Component
@Slf4j
public class SinkConsumer {

    @StreamListener(Sink.INPUT)
    public void inputConsumer(Message<String> message) {
        String payload = message.getPayload();
        MessageHeaders headers = message.getHeaders();
        log.info("从Binding-{}收到信息-{}, headers:{}", Sink.INPUT, payload, headers);
    }
    
}

由于笔者的上一篇文章——Spring Cloud Stream基于RocketMQ的实现已经介绍了Spring Cloud Stream的一些规范,这里就不再赘述了。

从Kafka服务,也就是从Spring Cloud Stream的Binder的角度来讲可以定义的参数可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties。比较核心的参数有:

  • spring.cloud.stream.kafka.binder.brokers:用来指定Kafka服务的地址,可以是host,也可以是host:port格式,如:spring.cloud.stream.kafka.binder.brokers=localhost,10.10.10.1:9092。默认是localhost。
  • spring.cloud.stream.kafka.binder.defaultBrokerPort:Kafka服务的完整地址应该是host+port,当brokers只定义了host时,将默认取该属性定义的port作为Kafka服务的port,默认是9092。
  • spring.cloud.stream.kafka.binder.autoCreateTopics:指定Topic不存在时是否需要自动创建,默认是true。

Spring Cloud Stream有多种不同的实现,比如RocketMQ/Kafka/RabbitMQ。不同的实现者在Producer和Consumer上也可能是有些差别的,或者是有些特性的。整合Spring Cloud Stream后这些特性的属性也是可以进行配置的。可以通过spring.cloud.stream.kafka.bindings.xxx.consumer.yyy指定名为xxx的这个Consumer角色的yyy属性。可以通过spring.cloud.stream.kafka.bindings.xxx.producer.yyy指定名为xxx的Producer的yyy属性。Kafka实现的Producer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties,Consumer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties。Consumer特性的参数主要有:

  • autoRebalanceEnabled:默认为true。当设置为true时,会对分区进行负载均衡,有Consumer加入或退出时会对Topic的分区重新分配。设置为false时,每个Consumer分配的Topic分区是固定的,不会变。
  • autoCommitOffset:默认为true。当设置为true时表示消息处理完后会自动提交offset;如果设置为false则会在消息的header中添加一个key为kafka_acknowledgment,value为org.springframework.kafka.support.Acknowledgment类型的对象的header,消费者可以在处理消息后从header中获取该对象进行手动响应消息的处理情况。
  • startOffset:指定新的消费者组加入的时候起始的消费位置,可选值有earliest和latest。默认是null,相当于earliest。

(注:本文是基于Spring cloud Finchley.SR1所写)

2
0
分享到:
评论

相关推荐

    spring-cloud-stream结合kafka dome

    标题 "spring-cloud-stream结合kafka dome" 描述的是一个关于如何使用Spring Cloud Stream与Apache Kafka进行集成并创建示例应用的教程。在这个场景中,我们首先启动两个消费者(标记为kafka和kafka1),然后启动名...

    基于Spring Cloud Stream和Apache Kafka的云端消息系统.zip

    这是一个基于Spring Cloud Stream和Apache Kafka的云端应用,作为一个消息系统,用于实现云端的实时消息传输和处理。项目利用Spring Cloud Stream进行微服务间的消息通信,并结合Apache Kafka实现高吞吐量、高可靠性...

    kafka-spring-cloud-stream:Apache Kafka的Spring Cloud Stream展示

    Apache Kafka是一种分布式流处理平台,常...通过上述步骤,我们可以构建出一个利用Spring Cloud Stream与Apache Kafka集成的高效、可靠的微服务应用。这种方式不仅简化了开发流程,也提高了系统的可扩展性和灵活性。

    spring cloud stream kafka 消息驱动集成

    Spring Cloud Stream 和 Apache Kafka 的集成是构建分布式系统中消息驱动架构的一个重要实践。Spring Cloud Stream 是 Spring 生态系统中的一个模块,它为构建输入/输出绑定到消息中间件的微服务应用提供了抽象层,...

    springCloud集成kafak

    本篇文章将深入探讨如何将Spring Cloud与Kafka集成,实现发布订阅模式和一对一消息的通信。 首先,我们需要了解Spring Cloud Stream,它是Spring框架提供的一种用于构建消息驱动的应用程序的抽象层。它允许开发者...

    SpringCloud.pdf

    6. **分布式消息传递**:Spring Cloud Stream 支持集成消息中间件(如 RabbitMQ、Kafka),实现服务间的异步通信和解耦。 云原生应用程序是Spring Cloud 支持的一种开发范式,鼓励采用持续交付和以价值为导向的开发...

    spring-cloud-stream-kafka:Spring Cloud Streams Kafka Avro

    Kafka Streams允许开发者编写状态ful的转换操作,进行窗口聚合、Join操作等,这些功能在Spring Cloud Stream的语境下同样可以实现。 总的来说,Spring Cloud Stream Kafka Avro的组合提供了强大的数据处理能力,...

    Spring Cloud 中文文档.pdf

    - **Apache Kafka Binder**:Apache Kafka 是一种流行的分布式流处理平台,Spring Cloud Stream 提供了 Kafka Binder 的实现。 - **Apache Kafka Binder 概述**:Kafka Binder 的实现基于 Apache Kafka。 - **配置...

    SpringCloud微服务架构笔记(四

    Spring Cloud Stream支持多种Binder实现,包括RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure Event Hubs等。 3. 发布/订阅模型 Spring Cloud Stream采用发布/订阅模式进行消息...

    Spring Cloud 中文文档 参考手册 中文版2018

    它支持多种消息中间件的实现,比如RabbitMQ和Apache Kafka。 Spring Cloud Stream应用模型中涉及到了生产者、消费者、分区支持以及绑定器(Binder)的使用。Binder SPI提供了与不同消息中间件的绑定器的连接能力。...

    spring cloud 中文文档

    - **Binder检测**:说明了Spring Cloud Stream如何自动检测可用的Binder实现。 - **Classpath上有多个Binders**:讨论了当Classpath中有多个Binder实现时,如何选择合适的Binder。 - **连接到多个系统**:解释了如何...

    基于Spring Cloud分布式物联网(IOT)平台源码.zip

    3. 数据处理服务:利用Spring Cloud Stream处理实时数据流,可能结合Apache Kafka或RabbitMQ等消息中间件。 4. 分析服务:通过Spark或Flink进行复杂事件处理和数据分析。 5. 用户接口服务:提供Web界面或移动应用...

    SpringCloud与Kafka消息中间件集成教程

    #### 四、SpringCloud集成Kafka的实现步骤 ##### 4.1 创建Kafka生产者模块 在Spring Cloud中集成Kafka作为消息中间件,首先需要创建一个Kafka生产者模块。这个模块负责发送消息到Kafka服务器。具体实现步骤如下: ...

    基于SpringCloud-微服务系统设计方案.rar

    本文将深入探讨基于SpringCloud的微服务系统设计方案,涵盖核心组件、架构设计原则以及实施策略。 1. **微服务架构基础** 微服务架构是一种将单一应用程序分解为一组小型服务的方法,每个服务运行在其自己的进程中...

    spring cloud微服务技术栈

    Spring Cloud Stream提供了构建消息驱动微服务的框架,支持各种消息中间件如RabbitMQ、Kafka等,方便构建松耦合、高可用的系统。 9. **Spring Cloud Sleuth服务跟踪** Sleuth提供了分布式追踪解决方案,集成...

    spring_cloud_kafka_jwt:PoC-通过Apache Kafka在微服务之间传播Spring Cloud JWT安全上下文

    标题中的“spring_cloud_kafka_jwt”是一个项目,旨在演示如何在基于Spring Cloud的微服务架构中,通过Apache Kafka来传播JWT(JSON Web Token)安全上下文。JWT是一种轻量级的身份验证和授权机制,常用于分布式系统...

    Java_示例微服务展示了如何在Spring Boot中使用Kafka和Kafka流,以SAGA模式的分布式事务实现为.zip

    在Spring Boot中使用Kafka Streams,可以通过Spring for Apache Kafka Streams模块,提供一套与Spring Data和Spring Cloud Stream相融合的API。 SAGA(Saga)模式是一种分布式事务处理策略,它将一个大事务分解为一...

    使用KafkaStreams和SpringBoot实现微服务Saga分布式事务-Piotr.pdf

    4. **Spring Cloud Stream for Kafka**:虽然文章中提到可以使用,但并未详细展开。这是一个用于构建消息驱动微服务的框架,提供更高级别的抽象,如DLQ支持(死信队列)、JSON序列化和交互式查询。 5. **架构设计**...

    spring cloud.zip

    Spring Cloud 是一个基于 Spring Boot 实现的云应用开发工具集,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)...

Global site tag (gtag.js) - Google Analytics