Spring Cloud Bus除了支持RabbitMQ的自动化配置之外,还支持现在被广泛应用的Kafka。在本文中,我们将搭建一个Kafka的本地环境,并通过它来尝试使用Spring Cloud Bus对Kafka的支持,实现消息总线的功能。由于本文会以之前Rabbit的实现作为基础来修改,所以先阅读《Spring
Cloud构建微服务架构(七)消息总线》有助于理解本文。
Kafka简介
Kafka是一个由LinkedIn开发的分布式消息系统,它于2011年初开源,现在由著名的Apache基金会维护与开发。Kafka使用Scala实现,被用作LinkedIn的活动流和运营数据处理的管道,现在也被诸多互联网企业广泛地用作为数据流管道和消息系统。
Kafka是基于消息发布/订阅模式实现的消息系统,其主要设计目标如下:
-
消息持久化:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
-
高吞吐:在廉价的商用机器上也能支持单机每秒100K条以上的吞吐量
-
分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序
-
跨平台:支持不同技术平台的客户端(如:Java、PHP、Python等)
-
实时性:支持实时数据处理和离线数据处理
-
伸缩性:支持水平扩展
Kafka中涉及的一些基本概念:
-
Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
-
Topic:逻辑上同Rabbit的Queue队列相似,每条发布到Kafka集群的消息都必须有一个Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
-
Partition:Partition是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹(存储对应分区的消息内容和索引文件)。
-
Producer:消息生产者,负责生产消息并发送到Kafka Broker。
-
Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端。
-
Consumer Group:每个Consumer属于一个特定的组(可为每个Consumer指定属于一个组,若不指定则属于默认组),组可以用来实现一条消息被组内多个成员消费等功能。
快速入门
在对Kafka有了一些基本了解之后,下面我们来尝试构建一个Kafka服务端,并体验一下基于Kafka的消息生产与消费。
环境安装
首先,我们需要从官网上下载安装介质。下载地址为:http://kafka.apache.org/downloads.html
。本例中采用的版本为:Kafka-0.10.0.1
在解压Kafka的安装包之后,可以看到其目录结构如下:
1 2 3 4 5 6 7
|
kafka +-bin +-windows +-config +-libs +-logs +-site-docs
|
由于Kafka的设计中依赖了ZooKeeper,所以我们可以在bin
和config
目录中除了看到Kafka相关的内容之外,还有ZooKeeper相关的内容。其中bin
目录存放了Kafka和ZooKeeper的命令行工具,bin
根目录下是适用于Linux/Unix的shell,而bin/windows
下的则是适用于windows下的bat。我们可以根据实际的系统来设置环境变量,以方便后续的使用和操作。而在config
目录中,则是用来存放了关于Kafka与ZooKeeper的配置信息。
启动测试
下面我们来尝试启动ZooKeeper和Kafka来进行消息的生产和消费。示例中所有的命令均已配置了Kafka的环境变量为例。
-
启动ZooKeeper,执行命令:
zookeeper-server-start
config/zookeeper.properties
,该命令需要指定zookeeper的配置文件位置才能正确启动,kafka的压缩包中包含了其默认配置,开发与测试环境不需要修改。
1 2 3 4 5 6 7 8 9
|
[2016-09-28 08:05:34,849] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2016-09-28 08:05:34,850] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2016-09-28 08:05:34,851] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2016-09-28 08:05:34,851] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2016-09-28 08:05:34,852] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2016-09-28 08:05:34,868] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2016-09-28 08:05:34,869] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) ... [2016-09-28 08:05:34,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
|
从控制台信息中,我们可以看到ZooKeeper从指定的config/zookeeper.properties
配置文件中读取信息并绑定2181端口启动服务。有时候启动失败,可查看一下端口是否被占用,可以杀掉占用进程或通过修改config/zookeeper.properties
配置文件中的clientPort
内容以绑定其他端口号来启动ZooKeeper。
-
启动Kafka,执行命令:kafka-server-start
config/server.properties
,该命令也需要指定Kafka配置文件的正确位置,如上命令中指向了解压目录包含的默认配置。若在测试时,使用外部集中环境的ZooKeeper的话,我们可以在该配置文件中通过zookeeper.connect
参数来设置ZooKeeper的地址和端口,它默认会连接本地2181端口的ZooKeeper;如果需要设置多个ZooKeeper节点,可以为这个参数配置多个ZooKeeper地址,并用逗号分割。比如:zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
。
-
创建Topic,执行命令:kafka-topics
--create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
,通过该命令,创建一个名为“test”的Topic,该Topic包含一个分区一个Replica。在创建完成后,可以使用kafka-topics
--list --zookeeper localhost:2181
命令来查看当前的Topic。
另外,如果我们不使用kafka-topics
命令来手工创建,直接进行下面的内容进行消息创建时也会自动创建Topics来使用。
-
创建消息生产者,执行命令:kafka-console-producer
--broker-list localhost:9092 --topic test
。kafka-console-producer
命令可以启动Kafka基于命令行的消息生产客户端,启动后可以直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。我们可以尝试输入几行消息,由于此时并没有消费者,所以这些输入的消息都会被阻塞在名为test的Topics中,直到有消费者将其消费掉位置。
-
创建消息消费者,执行命令:kafka-console-consumer
--zookeeper localhost:2181 --topic test --from-beginning
。kafka-console-consumer
命令启动的是Kafka基于命令行的消息消费客户端,在启动之后,我们马上可以在控制台中看到输出了之前我们在消息生产客户端中发送的消息。我们可以再次打开之前的消息生产客户端来发送消息,并观察消费者这边对消息的输出来体验Kafka对消息的基础处理。
整合Spring
Cloud Bus
在上一篇使用Rabbit实现消息总线的案例中,我们已经通过引入spring-cloud-starter-bus-amqp
模块,完成了使用RabbitMQ来实现的消息总线。若我们要使用Kafka来实现消息总线时,只需要把spring-cloud-starter-bus-amqp
替换成spring-cloud-starter-bus-kafka
模块,在pom.xml
的dependenies节点中进行修改,具体如下:
1 2 3 4
|
<dependency> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
|
如果我们在启动Kafka时均采用了默认配置,那么我们不需要再做任何其他配置就能在本地实现从RabbitMQ到Kafka的切换。我们可以尝试把刚刚搭建的ZooKeeper、Kafka启动起来,并将修改为spring-cloud-starter-bus-kafka
模块的config-server和config-client启动起来。
在config-server启动时,我们可以在控制台中看到如下输出:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
2016-09-28 22:11:29.627 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: springCloudBus 2016-09-28 22:11:29.642 INFO 15144 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread : Starting ZkClient event thread. ... 016-09-28 22:11:30.290 INFO 15144 --- [ main] o.s.i.kafka.support.ProducerFactoryBean : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384} 2016-09-28 22:11:30.298 INFO 15144 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: ... 2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel 2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.integration.channel.DirectChannel : Channel 'config-server:7001.springCloudBusOutput' has 1 subscriber(s). 2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus ... 2016-09-28 22:11:31.465 INFO 15144 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@4178cb34 2016-09-28 22:11:31.467 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : Adding {message-handler:inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b} as a subscriber to the 'bridge.springCloudBus' channel 2016-09-28 22:11:31.467 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : started inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b
|
从控制台的输出内容,我们可以看到config-server连接到了Kafka中,并使用了名为springCloudBus
的Topic。
此时,我们可以使用kafka-topics
--list --zookeeper localhost:2181
命令来查看当前Kafka中的Topic,若已成功启动了config-server并配置正确,我们就可以在Kafka中看到已经多了一个名为springCloudBus
的Topic。
我们再启动配置了spring-cloud-starter-bus-kafka
模块的config-client,可以看到控制台中输出如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
2016-09-28 22:43:55.067 INFO 6136 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: springCloudBus 2016-09-28 22:43:55.078 INFO 6136 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread : Starting ZkClient event thread. ... 2016-09-28 22:50:38.584 INFO 828 --- [ main] o.s.i.kafka.support.ProducerFactoryBean : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384} 2016-09-28 22:50:38.592 INFO 828 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: ... 2016-09-28 22:50:38.615 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel 2016-09-28 22:50:38.616 INFO 828 --- [ main] o.s.integration.channel.DirectChannel : Channel 'didispace:7002.springCloudBusOutput' has 1 subscriber(s). 2016-09-28 22:50:38.616 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus ... 2016-09-28 22:50:39.162 INFO 828 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@60cf855e 2016-09-28 22:50:39.162 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : Adding {message-handler:inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216} as a subscriber to the 'bridge.springCloudBus' channel 2016-09-28 22:50:39.163 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : started inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216
|
可以看到,config-client启动时输出了类似的内容,他们都订阅了名为springCloudBus
的Topic。
在启动了config-server和config-client之后,为了更明显地观察消息总线刷新配置的效果,我们可以在本地启动多个不同端口的config-client。此时,我们的config-server以及多个config-client都已经连接到了由Kafka实现的消息总线上。我们可以先访问各个config-client上的/from
请求,查看他获取到的配置内容。然后,修改Git中对应的参数内容,再访问各个config-client上的/from
请求,可以看到配置内容并没有改变。最后,我们向config-server发送POST请求:/bus/refresh
,此时我们再去访问各个config-client上的/from
请求,就能获得到最新的配置信息,各客户端上的配置都已经加载为最新的Git配置内容。
从config-client的控制台中,我们可以看到如下内容:
1
|
2016-09-29 08:20:34.361 INFO 21256 --- [ kafka-binder-1] o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [from]
|
RefreshListener
监听类记录了收到远程刷新请求,并刷新了from
属性的日志。
Kafka配置
在上面的例子中,由于Kafka、ZooKeeper均运行于本地,所以我们没有在测试程序中通过配置信息来指定Kafka和ZooKeeper的配置信息,就完成了本地消息总线的试验。但是我们实际应用中,Kafka和ZooKeeper一般都会独立部署,所以在应用中都需要来为Kafka和ZooKeeper配置一些连接信息等。Kafka的整合与RabbitMQ不同,在Spring Boot 1.3.7中并没有直接提供的Starter模块,而是采用了Spring Cloud Stream的Kafka模块,所以对于Kafka的配置均采用了spring.cloud.stream.kafka
的前缀,比如:
属性名
说明
默认值
spring.cloud.stream.kafka.binder.brokers |
Kafka的服务端列表 |
localhost |
spring.cloud.stream.kafka.binder.defaultBrokerPort |
Kafka服务端的默认端口,当brokers 属性中没有配置端口信息时,就会使用这个默认端口 |
9092 |
spring.cloud.stream.kafka.binder.zkNodes |
Kafka服务端连接的ZooKeeper节点列表 |
localhost |
spring.cloud.stream.kafka.binder.defaultZkPort |
ZooKeeper节点的默认端口,当zkNodes 属性中没有配置端口信息时,就会使用这个默认端口 |
2181 |
更多配置参数请参考官方文档
本文完整示例:
分享到:
相关推荐
本资料旨在提供最全面的Spring Cloud构建微服务架构的知识,帮助开发者深入理解和实践这一强大的框架。 1. **Spring Boot基础** 在开始Spring Cloud之前,了解Spring Boot的基本概念和使用是必要的。Spring Boot...
在微服务架构中,消息总线是一种用于跨多个分布式服务实例分发消息的机制。Spring Cloud Bus正是这样一种工具,它利用轻量级的消息代理来连接分布式系统中的节点,实现了配置更改的广播功能以及服务之间的通信。本文...
通过以上介绍,我们可以看出Spring Cloud为构建微服务架构提供了丰富的工具和组件,它简化了开发过程,同时也带来了复杂性的挑战。开发者需要理解每个组件的工作原理,并根据实际需求进行选型和配置,以实现高效、...
Spring Cloud 是一个基于Spring Boot的框架,旨在提供一系列开箱即用的工具和服务,帮助开发者轻松构建和部署微服务架构的应用程序。它集合了许多常用的服务组件和技术栈,使开发者能够在构建分布式系统时减少对底层...
1. **Spring Cloud简介**:Spring Cloud是基于Spring Boot的云应用开发工具集,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式...
在微服务架构中,Spring Cloud Bus 是一个非常重要的组件,它起到了消息总线的作用,能够有效地连接各个微服务节点,并且结合轻量级的消息代理,如 RabbitMQ 或 Kafka,实现跨服务的通信和同步更新。下面我们将深入...
11. **Spring Boot**:Spring Boot是构建微服务的基础,它简化了Spring应用的初始搭建以及开发过程,提供了一种快速、生产就绪的微服务开发方式。 12. Maven项目结构:这个压缩包中的项目都是基于Maven构建的,...
Spring Cloud 是一个基于 Spring Boot 实现的云应用开发工具集,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)...
SpringCloud核心组件Bus消息总线 SpringCloud核心组件Sleuth链路追踪 SpringCloud核心组件Zipkin分布式追踪系统 SpringCloud核心组件Stream消息流处理 SpringCloud核心组件Kafka消息中间件集成 SpringCloud核心...
本章将深入探讨Spring Cloud的各个关键组件,以及如何利用它们来构建微服务架构。 1. **Eureka**:Eureka是Spring Cloud中的服务注册与发现组件,它允许服务提供者向注册中心注册自己的服务,同时服务消费者可以从...
- **定义与作用**:Spring Cloud是一套用于构建微服务架构的工具集,基于Spring Boot实现,旨在提供一种简单的方式来进行配置管理、服务发现、断路器模式、智能路由、微代理、控制总线、一次性令牌协调器服务等操作...
SpringCloud核心组件Bus消息总线 SpringCloud核心组件Sleuth链路追踪 SpringCloud核心组件Zipkin分布式追踪系统 SpringCloud核心组件Stream消息流处理 SpringCloud核心组件Kafka消息中间件集成 SpringCloud核心组件...
SpringCloud作为微服务解决方案的集大成者,为基于SpringBoot的应用提供了构建分布式系统所需的工具,如服务发现、配置管理、断路器、智能路由、微代理、控制总线等。它使得开发者能够轻松地在SpringBoot应用中实现...
9. **SpringCloud Stream**:提供了一种声明式的消息处理模型,支持多种消息中间件,如RabbitMQ、Kafka,便于构建消息驱动的微服务。 10. **SpringCloud CLI**:命令行工具,用于快速初始化和管理Spring Cloud项目...
Spring Cloud作为构建微服务架构的工具集,提供了一套完整的解决方案,让开发者能够快速构建分布式系统。这些微服务公共组件的集成,使得微服务架构更为健壮、灵活和易管理。在选择微服务组件时,需要根据具体的业务...
【SpringCloud项目Demo】是一个基于SpringCloud框架的示例项目,旨在帮助开发者理解和学习如何在实际应用中使用SpringCloud构建微服务架构。SpringCloud是一系列工具的集合,它为开发者提供了在分布式系统(如配置...
Spring Cloud 是一个基于Java的微服务开发框架,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)操作的一系列...
在构建分布式系统架构时,Spring Cloud是一个非常关键的框架,它提供了一系列的工具和服务来帮助开发者轻松地创建和管理微服务。"整合spring cloud微服务,搭建一个分布式系统架构"这个主题涵盖了多个方面,让我们...
Spring Cloud Bus 是一个重要的组件,它在微服务架构中起到了消息总线的作用,允许服务间的通信和状态更新的广播。消息总线的概念是基于轻量级的消息代理,如 RabbitMQ 或 Kafka,它允许将消息从一个服务传递到所有...
这个“Spring Cloud SpringCloud全套高清视频教程”旨在帮助开发者深入理解和掌握Spring Cloud的相关技术栈,通过高清视频的方式进行学习,确保理论与实践相结合。 在微服务架构中,Spring Cloud 提供了多种组件来...