Spring Cloud Function
Spring Cloud Function专注于提供一些与业务无关的函数功能。它允许用户把java.util.function.Function
、java.util.function.Consumer
和java.util.function.Supplier
类型的bean直接对外发布。
通过Http对外发布
Function、Consumer、Supplier可以直接以Http的方式对外发布,这需要我们添加spring-cloud-starter-function-web
依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>
默认会发布到根路径,资源名称就是对应的bean名称。比如下面的代码中定义了一个名称为uppercase的Function类型的bean,它默认会发布到/uppercase
,对应的请求方式是POST。当我们以POST方式请求/uppercase
时如果请求体中传递的数据是abc,则服务端会响应ABC,因为对应处理的uppercase Function会把接入的参数转为大写后再返回。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
如果我们更习惯基于Reactive编程,则可以定义入参和出参为reactor.core.publisher.Flux
。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.map(value -> value.toUpperCase());
}
Consumer和Supplier也是类似的。Function可以接收一个参数进行处理后再返回一个结果,Consumer是只接收参数进行处理,Supplier是不接收参数但是会返回一个结果。在使用的时候可以根据它们的特性选择使用Function、Consumer还是Supplier。下面的定义了consumer、consumerFlux和supplier三个bean,当POST请求/consumer
时传递的参数会被简单的输出到控制台,POST请求/consumerFlux
时其内部的逻辑是基于Flux编程的,最终也是简单输出;GET请求/supplier
时会返回当前时间的字符串表示。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Consumer<String> consumer() {
return message -> System.out.println("收到消息:" + message);
}
@Bean
public Consumer<Flux<String>> consumerFlux() {
return stringFlux -> {stringFlux.subscribe(str -> System.out.println("receive message : " + str));};
}
@Bean
public Supplier<String> supplier() {
return () -> LocalDateTime.now().toString();
}
}
Function、Consumer和Supplier的输入和输出参数除了可以是字符串以外,还可以是一个对象。当是一个对象时它会被转换为JSON格式,或者从JSON格式转换为对象(入参从JSON转换为对象,出参从对象转换为JSON)。比如下面代码中,当POST请求/functionUser
,传递JSON数据{"id":1,"name":"zhangsan"}
时会被转换为ID为1,name为zhangsan的User对象,经functionUser
对应的Function处理后会把name设置为output:zhangsan
,然后再转换为JSON对象返回给客户端,所以客户端会收到{"id":1,"name":"output:zhangsan"}
。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Function<User, User> functionUser() {
return input -> {
input.setName("output:" + input.getName());
return input;
};
}
@Data
public static class User {
private Long id;
private String name;
}
}
Supplier和Consumer也是类似的。
@Bean
public Consumer<Flux<User>> consumerUser() {
return flux -> flux.subscribe(user -> System.out.println("收到User消息:" + user));
}
@Bean
public Supplier<User> supplierUser() {
return () -> {
User user = new User();
user.setId(System.currentTimeMillis());
user.setName("User-" + System.nanoTime());
return user;
};
}
输入、输出参数还可以是org.springframework.messaging.Message
类型。当入参是Message类型时,Http请求的Header会存放到Message的Header中,比如下面这样可以通过Message.getHeaders()
拿到Http请求的所有Header信息。
@Bean
public Function<Message<String>, Message<String>> functionMessage() {
return message -> {
String payload = "response:" + message.getPayload();
System.out.println("请求的Headers是:" + message.getHeaders());
Map<String, Object> headers = new HashMap<>();
headers.put("responseTime", LocalDateTime.now());
MessageHeaders messageHeaders = new MessageHeaders(headers);
return MessageBuilder.createMessage(payload, messageHeaders);
};
}
当响应的内容也是Message类型时,响应的Message的Header不会作为响应Http的Header,而是会把Message作为一个整体转换为JSON进行返回。比如下面这样:
{
"payload": "response:abc",
"headers": {
"responseTime": "2019-03-07T19:41:18.9",
"id": "4f244b72-b4fe-d679-6c7e-3a23dbfc9b53",
"timestamp": 1551958878900
}
}
Message也可以和复杂对象一起使用,比如下面的代码就定义了Message的payload是User对象,那请求体中的JSON对象会转换为User对象。
@Bean
public Consumer<Flux<Message<User>>> consumerMessage() {
return flux -> flux.subscribe(message -> System.out.println("收到User消息:" + message.getPayload() + ",消息头是:" + message.getHeaders()));
}
通过Stream交互
Spring Cloud Function可以和Spring Cloud Stream一起使用。当只有一个Consumer类型的bean定义时,Consumer的入参会和名为input的Binding绑定,即通过从名为input的Binding接收到的消息会调用Consumer bean进行处理。Spring Cloud Function和Spring Cloud Stream一起使用时需要添加spring-cloud-function-stream
依赖和一个Stream实现,笔者选择的是基于RocketMQ的实现,所以添加了spring-cloud-starter-stream-rocketmq
依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-stream</artifactId>
</dependency>
现假设我们的Spring Cloud Function应用中定义了如下这样一个Consumer类型的bean,我们期望它接收的对象来自于Spring Cloud Stream,更直观的说是来自于名为input的Binding。
@Bean
public Consumer<String> consumer() {
return message -> System.out.println("收到消息:" + message);
}
当我们的Stream的Binder实现采用的RocketMQ,我们可以进行如下定义,它指定了RocketMQ的NameServer的地址,指定input Binding对应的是名为test-topic
的Topic。
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group
所以当我们往RocketMQ的test-topic
中发送消息时对应的消息就会转而交给定义的Consumer bean进行处理。
DefaultMQProducer producer = new DefaultMQProducer("test1");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
for (int i=0; i<10; i++) {
org.apache.rocketmq.common.message.Message message = new Message();
message.setTopic("test-topic");
message.setTags("tag1");
message.setBody(("Hello"+i).getBytes());
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
当我们用上面的代码往test-topic
Topic中发送了10条消息后,我们在控制台可能会看到Consumer输出的如下内容。
收到消息:Hello4
收到消息:Hello3
收到消息:Hello1
收到消息:Hello9
收到消息:Hello2
收到消息:Hello0
收到消息:Hello8
收到消息:Hello6
收到消息:Hello7
收到消息:Hello5
同样,它也可以把参数类型定义为org.springframework.messaging.Message
,payload也可以是复杂对象。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("test1");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
for (int i=0; i<10; i++) {
org.apache.rocketmq.common.message.Message message = new Message();
message.setTopic("test-topic");
message.setTags("tag1");
User user = new User();
user.setId(Long.valueOf(i));
user.setName("User" + i);
message.setBody(JSON.toJSONString(user).getBytes());
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
SpringApplication.run(Application.class, args);
}
@Bean
public Consumer<org.springframework.messaging.Message<User>> consumer() {
return message -> {
System.out.println("收到消息:" + message);
User user = message.getPayload();
System.out.println(user.getName());
};
}
@Data
public static class User {
private Long id;
private String name;
}
}
也可以定义Supplier类型的bean,Supplier类型的bean默认会绑定名为output的Binding。下面的代码就定义了一个Supplier类型的bean,返回的是Flux对象,其每秒会产生一条String类型的消息。
@Bean
public Supplier<Flux<String>> supplier() {
return () -> Flux.interval(Duration.ofSeconds(1)).map(i -> "新消息" + LocalDateTime.now());
}
接着我们可以定义名为output的Binding的destination等信息。比如下面我们定义了其destination为test-topic。
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.group=group1
之前我们定义了Consumer对应的名为input的Binding的destination也为test-topic。所以当应用启动后,Supplier会每秒往test-topic发送一条消息,对应的消息会由Consumer接收到。此时会看到Consumer在控制台打印的如下信息。
收到消息:新消息2019-03-08T21:56:20.577
收到消息:新消息2019-03-08T21:56:21.577
收到消息:新消息2019-03-08T21:56:22.577
收到消息:新消息2019-03-08T21:56:23.576
收到消息:新消息2019-03-08T21:56:24.577
收到消息:新消息2019-03-08T21:56:25.576
完整代码如下:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Consumer<String> consumer() {
return message -> {
System.out.println("收到消息:" + message);
};
}
@Bean
public Supplier<Flux<String>> supplier() {
return () -> Flux.interval(Duration.ofSeconds(1)).map(i -> "新消息" + LocalDateTime.now());
}
}
当定义Function类型的bean时,因为它同时有入参和出参,默认情况下它的入参会绑定名为input的Binding,出参绑定名为output的Binding。当我们定义下面这样一个Function类型的bean时,它先是把接收到的消息打印到控制台,然后加上response:
前缀返回并发送到名为output的Binding。
@Bean
public Function<String, String> function() {
return input -> {
System.out.println("Function接收到消息:" + input);
return "response:" + input;
};
}
当input和output的Binding定义如下时即表示从test-topic接收到消息,加上response:
前缀后又发送到了test-topic1这个Topic上。
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group
spring.cloud.stream.bindings.output.destination=test-topic1
spring.cloud.stream.bindings.output.group=group1
基于上述配置进行测试的完整代码如下。先是通过sendMessages()
往test-topic中发送了5条消息,它们会被Function bean处理后再发送到test-topic1中。而receiveMessages()
中定义了对test-topic1中消息的监听,它会收到test-topic1中所有的消息。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
sendMessages();
receiveMessages();
SpringApplication.run(Application.class, args);
}
private static void sendMessages() {
DefaultMQProducer producer = new DefaultMQProducer("test1");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
for (int i=0; i<5; i++) {
org.apache.rocketmq.common.message.Message message = new Message();
message.setTopic("test-topic");
message.setTags("tag1");
message.setBody(("消息-" + i).getBytes());
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
private static void receiveMessages() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
try {
consumer.subscribe("test-topic1", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach(msg -> {
System.out.println("消费者收到消息:" + new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public Function<String, String> function() {
return input -> {
System.out.println("Function接收到消息:" + input);
return "response:" + input;
};
}
}
上述代码运行后你会在控制台看到类似如下输出。
Function接收到消息:消息-4
消费者收到消息:response:消息-4
Function接收到消息:消息-2
Function接收到消息:消息-3
Function接收到消息:消息-0
Function接收到消息:消息-1
消费者收到消息:response:消息-2
消费者收到消息:response:消息-0
消费者收到消息:response:消息-3
消费者收到消息:response:消息-1
Supplier默认会绑定名为output的Binding,Consumer默认会绑定名为input的Binding,Function默认会同时绑定input和output这两个Binding。当同时存在多个Consumer或Function类型的bean时,因为它们都与名为input的Binding绑定,当名为input的Binding中有新消息了,Spring Cloud Function默认会抛出异常,因为它不知道该给哪一个Consumer或Function处理。此时可以指定spring.cloud.function.stream.shared=true
,这样多个function类型的bean将共享消息。需要注意的是从RocketMQ的角度来讲,它们都属于一个消费者组,它们在共享消息时每条消息只会被一个消费者消费,比如有消息ABCD,消费者1可能消费了AB,消费者2消费了CD。但是对于Spring Cloud Function Stream来讲,它们对应的都是名为input的Binding,一个Binding只对应一个消费者,它能消费所有的消息,Spring Cloud Function内部在共享消息的时候所有与名为input的Binding绑定的Function或Consumer都将收到所有的消息。
@Bean
public Consumer<String> consumer() {
return message -> {
System.out.println("Consumer收到消息:" + message);
};
}
@Bean
public Function<String, String> function() {
return input -> {
System.out.println("Function接收到消息:" + input);
return "response:" + input;
};
}
当我们定义了上述这样的Function和Consumer类型的bean,又定义了spring.cloud.function.stream.shared=true
,当往名为input的Binding指定的destination发送消息时,我们的控制台会输出类似如下这样的内容。
Function接收到消息:消息-3
Consumer收到消息:消息-3
Function接收到消息:消息-2
Consumer收到消息:消息-2
Function接收到消息:消息-0
Consumer收到消息:消息-0
Function接收到消息:消息-1
Consumer收到消息:消息-1
Function接收到消息:消息-4
Consumer收到消息:消息-4
那如果想精确的把一条消息发送到某个Consumer或Function怎么办呢?Spring Cloud Function Stream允许我们通过头信息stream_routekey
来指定需要发送的Consumer或Function对应的bean名称。比如想要名为abc的Consumer处理,就指定stream_routekey
的值为abc。
Map<String, Object> headers = new HashMap<>();
headers.put("stream_routekey", "consumer");
MessageHeaders messageHeaders = new MessageHeaders(headers);
for (int i=0; i<5; i++) {
Message<String> message = MessageBuilder.createMessage("消息" + i, messageHeaders);
this.sender.send(message);
}
因为笔者使用的基于RocketMQ的实现没有转发最原始的Header到新消息的Header中(会放到RocketMQ的消息的properties中),所以笔者模拟不了指定
stream_routekey
的效果。
参考文档
(注:本文是基于Spring Cloud Finchley.SR1所写)
相关推荐
SpringCloud Function SpEL 注入漏洞分析(CVE-2022-22963) SpringCloud Function 是 Spring 提供的一套分布式函数式编程组件,旨在帮助开发者专注于业务逻辑实现,而不需要关心服务器环境运维等问题。然而,在 ...
Spring Cloud Function 是基于Spring Boot 的函数计算框架,它抽象出所有传输细节和基础架构,允许开发人员保留所有熟悉的工具和流程,并专注于业务逻辑。 批量检测脚本:python Spel_RCE_POC.py url.txt 反弹...
Spring Cloud 是一个基于 Spring Boot 实现的云应用开发工具集,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)...
用于Spring Cloud Function SPEL表达式注入漏洞测试环境搭建,是编译好的服务端程序,命令号java -jar *.jar运行即可,服务端运行在127.0.0.1:8080端口
《SpringCloud详细示例》是深入理解并实践Spring Cloud组件的宝贵资源,它涵盖了这个分布式系统框架中的多个关键组件和功能。Spring Cloud作为微服务架构的首选工具集,旨在简化服务发现、配置、熔断、负载均衡等...
该模板存储库提供了一个基于Spring Boot和Spring Cloud Function的简单Hello Web应用程序。 它可以部署为独立的Web应用程序,Kubernetes部署和服务或Knative服务。 代码 注意:为Java 11配置了项目,如果使用的是...
在IT行业中,Spring Cloud Gateway作为Spring Cloud生态体系中的一个关键组件,被广泛用于构建微服务架构中的API网关。这个框架允许我们集中处理各种请求,包括路由、过滤、安全等,极大地简化了服务间的通信。而...
Spring Cloud Function Framework + AWS Lambda + Serverless Framework 此示例使用无服务器框架在 AWS Lambda 平台上部署 Spring Cloud 函数。 Spring Cloud 函数快速介绍 Spring Cloud Function 提供了一个统一的...
### Spring Cloud Gateway全局异常处理详解 #### 一、引言 在微服务架构中,网关作为服务入口,承担着路由转发、限流熔断、鉴权认证等职责。Spring Cloud Gateway作为一款基于Spring Framework 5、Project Reactor...
1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,...
Cloud Function 是一个具有以下高级目标的项目: 通过功能促进业务逻辑的实现。 将业务逻辑的开发生命周期与任何特定的运行时目标分离,以便相同的代码可以作为 Web 端点、流处理器或任务运行。 支持跨无服务器提供...
SpringCloud Gateway 是一款基于 Spring Framework 5 和 Spring Boot 2 构建的微服务网关,它是 Spring Cloud 生态系统中的重要组件,主要用于处理服务之间的路由、过滤和安全控制等。在 Spring Cloud Gateway 2.1 ...
标题 "lambda-spring-cloud-function-graalvm" 指向的是一个使用 AWS Lambda 平台,结合 Spring Cloud Function 和 GraalVM 技术构建的项目。这个项目的主要目的是创建一个自定义的运行时环境,以便在 AWS Lambda 上...
5. Spring Cloud Function:提供函数式编程模型,简化微服务处理逻辑。 四、实施微服务的步骤 1. 识别服务边界:根据业务领域划分服务。 2. 设计API接口:定义服务间通信的契约。 3. 配置服务发现:使用Eureka或...
Spring Cloud Stream 3.0引入了Spring Cloud Function,这是一个面向函数编程的框架,受到Serverless和FaaS(函数即服务)的影响。Spring Cloud Function提供了一种简洁的方式来定义和执行函数,同时支持命令式和...
《Kotlin与Spring Cloud Function整合应用详解》 在现代微服务架构中,Spring框架一直扮演着举足轻重的角色,而随着技术的发展,Kotlin语言以其简洁、安全和高效的特性,逐渐成为Java开发者的新宠。当Kotlin遇上...
page_type 语言产品描述urlFragment 样本Java天蓝色这是一个示例应用程序,用于在Azure Functions之上展示Spring Cloud Function的用法。 你好春天功能天蓝色在Azure Functions上运行的示例“ Hello,world” Spring...
- **Spring Cloud Function**:函数式编程模型。 #### 总结 微服务架构和Spring Cloud为现代软件开发提供了强大的支持。微服务通过分解大型单体应用为独立的服务,提高了系统的可扩展性和可维护性。Spring Cloud...
使其工作的最佳方法是包含spring-cloud-function-context作为依赖项,但不包含更高级别的适配器(例如spring-cloud-function-web或spring-cloud-function-stream )。 适配器有几个可以使用的通用请求处理程序。 最...
标题“spring-cloud-function-lambda-aws-sample”表明这是一个关于使用Spring Cloud Function与Amazon Web Services (AWS) Lambda的示例项目。这个项目旨在演示如何将Spring框架的功能性编程模型与AWS的无服务器...