基于Apache Kafka的Stream实现
如果你的应用使用了Apache Kafka,你需要把它和Spring Cloud进行整合。需要在应用中添加如下依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
然后就是Spring Cloud Stream的标准配置了。需要在@Configuration
类上使用@EnableBinding
声明需要应用的Binding。
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
上面代码定义需要使用的Binding是Source和Sink接口中声明的input和output两个Binding。然后可以在application.properties文件中声明这两个Binding对应的destination,它们对应于kafka的Topic。如果指定的Topic还未创建,默认会自动进行创建。
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group
如果你的Kafka服务器不是本机或者监听端口不是默认的9092,则还需要通过spring.cloud.stream.kafka.binder.brokers
指定Kafka的服务地址。
spring.cloud.stream.kafka.binder.brokers=localhost:9092
之后就是照常的使用Spring Cloud Stream的相关API进行操作了。如下是发送消息的示例。
@Component
@Slf4j
public class SourceProducer {
@Autowired
private Source source;
public void sendMessages(String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
log.info("发送了一条消息-{}", msg);
this.source.output().send(message);
}
}
如下是监听消息的示例。
@Component
@Slf4j
public class SinkConsumer {
@StreamListener(Sink.INPUT)
public void inputConsumer(Message<String> message) {
String payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
log.info("从Binding-{}收到信息-{}, headers:{}", Sink.INPUT, payload, headers);
}
}
由于笔者的上一篇文章——Spring Cloud Stream基于RocketMQ的实现已经介绍了Spring Cloud Stream的一些规范,这里就不再赘述了。
从Kafka服务,也就是从Spring Cloud Stream的Binder的角度来讲可以定义的参数可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties
。比较核心的参数有:
- spring.cloud.stream.kafka.binder.brokers:用来指定Kafka服务的地址,可以是host,也可以是host:port格式,如:
spring.cloud.stream.kafka.binder.brokers=localhost,10.10.10.1:9092
。默认是localhost。 - spring.cloud.stream.kafka.binder.defaultBrokerPort:Kafka服务的完整地址应该是host+port,当brokers只定义了host时,将默认取该属性定义的port作为Kafka服务的port,默认是9092。
- spring.cloud.stream.kafka.binder.autoCreateTopics:指定Topic不存在时是否需要自动创建,默认是true。
Spring Cloud Stream有多种不同的实现,比如RocketMQ/Kafka/RabbitMQ。不同的实现者在Producer和Consumer上也可能是有些差别的,或者是有些特性的。整合Spring Cloud Stream后这些特性的属性也是可以进行配置的。可以通过spring.cloud.stream.kafka.bindings.xxx.consumer.yyy
指定名为xxx的这个Consumer角色的yyy属性。可以通过spring.cloud.stream.kafka.bindings.xxx.producer.yyy
指定名为xxx的Producer的yyy属性。Kafka实现的Producer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties
,Consumer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties
。Consumer特性的参数主要有:
- autoRebalanceEnabled:默认为true。当设置为true时,会对分区进行负载均衡,有Consumer加入或退出时会对Topic的分区重新分配。设置为false时,每个Consumer分配的Topic分区是固定的,不会变。
- autoCommitOffset:默认为true。当设置为true时表示消息处理完后会自动提交offset;如果设置为false则会在消息的header中添加一个key为kafka_acknowledgment,value为
org.springframework.kafka.support.Acknowledgment
类型的对象的header,消费者可以在处理消息后从header中获取该对象进行手动响应消息的处理情况。 - startOffset:指定新的消费者组加入的时候起始的消费位置,可选值有earliest和latest。默认是null,相当于earliest。
(注:本文是基于Spring cloud Finchley.SR1所写)
相关推荐
标题 "spring-cloud-stream结合kafka dome" 描述的是一个关于如何使用Spring Cloud Stream与Apache Kafka进行集成并创建示例应用的教程。在这个场景中,我们首先启动两个消费者(标记为kafka和kafka1),然后启动名...
这是一个基于Spring Cloud Stream和Apache Kafka的云端应用,作为一个消息系统,用于实现云端的实时消息传输和处理。项目利用Spring Cloud Stream进行微服务间的消息通信,并结合Apache Kafka实现高吞吐量、高可靠性...
Apache Kafka是一种分布式流处理平台,常...通过上述步骤,我们可以构建出一个利用Spring Cloud Stream与Apache Kafka集成的高效、可靠的微服务应用。这种方式不仅简化了开发流程,也提高了系统的可扩展性和灵活性。
Spring Cloud Stream 和 Apache Kafka 的集成是构建分布式系统中消息驱动架构的一个重要实践。Spring Cloud Stream 是 Spring 生态系统中的一个模块,它为构建输入/输出绑定到消息中间件的微服务应用提供了抽象层,...
本篇文章将深入探讨如何将Spring Cloud与Kafka集成,实现发布订阅模式和一对一消息的通信。 首先,我们需要了解Spring Cloud Stream,它是Spring框架提供的一种用于构建消息驱动的应用程序的抽象层。它允许开发者...
6. **分布式消息传递**:Spring Cloud Stream 支持集成消息中间件(如 RabbitMQ、Kafka),实现服务间的异步通信和解耦。 云原生应用程序是Spring Cloud 支持的一种开发范式,鼓励采用持续交付和以价值为导向的开发...
Kafka Streams允许开发者编写状态ful的转换操作,进行窗口聚合、Join操作等,这些功能在Spring Cloud Stream的语境下同样可以实现。 总的来说,Spring Cloud Stream Kafka Avro的组合提供了强大的数据处理能力,...
- **Apache Kafka Binder**:Apache Kafka 是一种流行的分布式流处理平台,Spring Cloud Stream 提供了 Kafka Binder 的实现。 - **Apache Kafka Binder 概述**:Kafka Binder 的实现基于 Apache Kafka。 - **配置...
Spring Cloud Stream支持多种Binder实现,包括RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure Event Hubs等。 3. 发布/订阅模型 Spring Cloud Stream采用发布/订阅模式进行消息...
它支持多种消息中间件的实现,比如RabbitMQ和Apache Kafka。 Spring Cloud Stream应用模型中涉及到了生产者、消费者、分区支持以及绑定器(Binder)的使用。Binder SPI提供了与不同消息中间件的绑定器的连接能力。...
- **Binder检测**:说明了Spring Cloud Stream如何自动检测可用的Binder实现。 - **Classpath上有多个Binders**:讨论了当Classpath中有多个Binder实现时,如何选择合适的Binder。 - **连接到多个系统**:解释了如何...
3. 数据处理服务:利用Spring Cloud Stream处理实时数据流,可能结合Apache Kafka或RabbitMQ等消息中间件。 4. 分析服务:通过Spark或Flink进行复杂事件处理和数据分析。 5. 用户接口服务:提供Web界面或移动应用...
#### 四、SpringCloud集成Kafka的实现步骤 ##### 4.1 创建Kafka生产者模块 在Spring Cloud中集成Kafka作为消息中间件,首先需要创建一个Kafka生产者模块。这个模块负责发送消息到Kafka服务器。具体实现步骤如下: ...
本文将深入探讨基于SpringCloud的微服务系统设计方案,涵盖核心组件、架构设计原则以及实施策略。 1. **微服务架构基础** 微服务架构是一种将单一应用程序分解为一组小型服务的方法,每个服务运行在其自己的进程中...
Spring Cloud Stream提供了构建消息驱动微服务的框架,支持各种消息中间件如RabbitMQ、Kafka等,方便构建松耦合、高可用的系统。 9. **Spring Cloud Sleuth服务跟踪** Sleuth提供了分布式追踪解决方案,集成...
标题中的“spring_cloud_kafka_jwt”是一个项目,旨在演示如何在基于Spring Cloud的微服务架构中,通过Apache Kafka来传播JWT(JSON Web Token)安全上下文。JWT是一种轻量级的身份验证和授权机制,常用于分布式系统...
在Spring Boot中使用Kafka Streams,可以通过Spring for Apache Kafka Streams模块,提供一套与Spring Data和Spring Cloud Stream相融合的API。 SAGA(Saga)模式是一种分布式事务处理策略,它将一个大事务分解为一...
4. **Spring Cloud Stream for Kafka**:虽然文章中提到可以使用,但并未详细展开。这是一个用于构建消息驱动微服务的框架,提供更高级别的抽象,如DLQ支持(死信队列)、JSON序列化和交互式查询。 5. **架构设计**...
Spring Cloud 是一个基于 Spring Boot 实现的云应用开发工具集,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)...