`
234390216
  • 浏览: 10238503 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462879
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1775973
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398730
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:395156
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:680167
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:531135
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1184996
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:468788
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151486
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68337
社区版块
存档分类
最新评论

Spring Cloud(14)——Function

阅读更多

Spring Cloud Function

Spring Cloud Function专注于提供一些与业务无关的函数功能。它允许用户把java.util.function.Functionjava.util.function.Consumerjava.util.function.Supplier类型的bean直接对外发布。

 

通过Http对外发布

Function、Consumer、Supplier可以直接以Http的方式对外发布,这需要我们添加spring-cloud-starter-function-web依赖。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>

默认会发布到根路径,资源名称就是对应的bean名称。比如下面的代码中定义了一个名称为uppercase的Function类型的bean,它默认会发布到/uppercase,对应的请求方式是POST。当我们以POST方式请求/uppercase时如果请求体中传递的数据是abc,则服务端会响应ABC,因为对应处理的uppercase Function会把接入的参数转为大写后再返回。

@SpringBootApplication
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Bean
  public Function<String, String> uppercase() {
    return value -> value.toUpperCase();
  }

}

如果我们更习惯基于Reactive编程,则可以定义入参和出参为reactor.core.publisher.Flux

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
  return flux -> flux.map(value -> value.toUpperCase());
}

Consumer和Supplier也是类似的。Function可以接收一个参数进行处理后再返回一个结果,Consumer是只接收参数进行处理,Supplier是不接收参数但是会返回一个结果。在使用的时候可以根据它们的特性选择使用Function、Consumer还是Supplier。下面的定义了consumer、consumerFlux和supplier三个bean,当POST请求/consumer时传递的参数会被简单的输出到控制台,POST请求/consumerFlux时其内部的逻辑是基于Flux编程的,最终也是简单输出;GET请求/supplier时会返回当前时间的字符串表示。

@SpringBootApplication
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Bean
  public Consumer<String> consumer() {
    return message -> System.out.println("收到消息:" + message);
  }

  @Bean
  public Consumer<Flux<String>> consumerFlux() {
    return stringFlux -> {stringFlux.subscribe(str -> System.out.println("receive message : " + str));};
  }

  @Bean
  public Supplier<String> supplier() {
    return () -> LocalDateTime.now().toString();
  }

}

Function、Consumer和Supplier的输入和输出参数除了可以是字符串以外,还可以是一个对象。当是一个对象时它会被转换为JSON格式,或者从JSON格式转换为对象(入参从JSON转换为对象,出参从对象转换为JSON)。比如下面代码中,当POST请求/functionUser,传递JSON数据{"id":1,"name":"zhangsan"}时会被转换为ID为1,name为zhangsan的User对象,经functionUser对应的Function处理后会把name设置为output:zhangsan,然后再转换为JSON对象返回给客户端,所以客户端会收到{"id":1,"name":"output:zhangsan"}

@SpringBootApplication
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Bean
  public Function<User, User> functionUser() {
    return input -> {
      input.setName("output:" + input.getName());
      return input;
    };
  }

  @Data
  public static class User {
    private Long id;
    private String name;
  }

}

Supplier和Consumer也是类似的。

@Bean
public Consumer<Flux<User>> consumerUser() {
  return flux -> flux.subscribe(user -> System.out.println("收到User消息:" + user));
}

@Bean
public Supplier<User> supplierUser() {
  return () -> {
    User user = new User();
    user.setId(System.currentTimeMillis());
    user.setName("User-" + System.nanoTime());
    return user;
  };
}

输入、输出参数还可以是org.springframework.messaging.Message类型。当入参是Message类型时,Http请求的Header会存放到Message的Header中,比如下面这样可以通过Message.getHeaders()拿到Http请求的所有Header信息。

@Bean
public Function<Message<String>, Message<String>> functionMessage() {
  return message -> {
    String payload = "response:" + message.getPayload();
    System.out.println("请求的Headers是:" + message.getHeaders());
    Map<String, Object> headers = new HashMap<>();
    headers.put("responseTime", LocalDateTime.now());
    MessageHeaders messageHeaders = new MessageHeaders(headers);
    return MessageBuilder.createMessage(payload, messageHeaders);
  };
}

当响应的内容也是Message类型时,响应的Message的Header不会作为响应Http的Header,而是会把Message作为一个整体转换为JSON进行返回。比如下面这样:

{
  "payload": "response:abc",
  "headers": {
    "responseTime": "2019-03-07T19:41:18.9",
    "id": "4f244b72-b4fe-d679-6c7e-3a23dbfc9b53",
    "timestamp": 1551958878900
  }
}

Message也可以和复杂对象一起使用,比如下面的代码就定义了Message的payload是User对象,那请求体中的JSON对象会转换为User对象。

@Bean
public Consumer<Flux<Message<User>>> consumerMessage() {
  return flux -> flux.subscribe(message -> System.out.println("收到User消息:" + message.getPayload() + ",消息头是:" + message.getHeaders()));
}

 

通过Stream交互

Spring Cloud Function可以和Spring Cloud Stream一起使用。当只有一个Consumer类型的bean定义时,Consumer的入参会和名为input的Binding绑定,即通过从名为input的Binding接收到的消息会调用Consumer bean进行处理。Spring Cloud Function和Spring Cloud Stream一起使用时需要添加spring-cloud-function-stream依赖和一个Stream实现,笔者选择的是基于RocketMQ的实现,所以添加了spring-cloud-starter-stream-rocketmq依赖。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-function-stream</artifactId>
</dependency>

现假设我们的Spring Cloud Function应用中定义了如下这样一个Consumer类型的bean,我们期望它接收的对象来自于Spring Cloud Stream,更直观的说是来自于名为input的Binding。

@Bean
public Consumer<String> consumer() {
  return message -> System.out.println("收到消息:" + message);
}

当我们的Stream的Binder实现采用的RocketMQ,我们可以进行如下定义,它指定了RocketMQ的NameServer的地址,指定input Binding对应的是名为test-topic的Topic。

spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group

所以当我们往RocketMQ的test-topic中发送消息时对应的消息就会转而交给定义的Consumer bean进行处理。

DefaultMQProducer producer = new DefaultMQProducer("test1");
producer.setNamesrvAddr("localhost:9876");
try {
  producer.start();
  for (int i=0; i<10; i++) {
    org.apache.rocketmq.common.message.Message message = new Message();
    message.setTopic("test-topic");
    message.setTags("tag1");
    message.setBody(("Hello"+i).getBytes());
    producer.send(message);
  }
} catch (Exception e) {
  e.printStackTrace();
} finally {
  producer.shutdown();
}

当我们用上面的代码往test-topic Topic中发送了10条消息后,我们在控制台可能会看到Consumer输出的如下内容。

收到消息:Hello4
收到消息:Hello3
收到消息:Hello1
收到消息:Hello9
收到消息:Hello2
收到消息:Hello0
收到消息:Hello8
收到消息:Hello6
收到消息:Hello7
收到消息:Hello5

同样,它也可以把参数类型定义为org.springframework.messaging.Message,payload也可以是复杂对象。

@SpringBootApplication
public class Application {

  public static void main(String[] args) {
    DefaultMQProducer producer = new DefaultMQProducer("test1");
    producer.setNamesrvAddr("localhost:9876");
    try {
      producer.start();
      for (int i=0; i<10; i++) {
        org.apache.rocketmq.common.message.Message message = new Message();
        message.setTopic("test-topic");
        message.setTags("tag1");
        User user = new User();
        user.setId(Long.valueOf(i));
        user.setName("User" + i);
        message.setBody(JSON.toJSONString(user).getBytes());
        producer.send(message);
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer.shutdown();
    }
    SpringApplication.run(Application.class, args);
  }

  @Bean
  public Consumer<org.springframework.messaging.Message<User>> consumer() {
    return message -> {
      System.out.println("收到消息:" + message);
      User user = message.getPayload();
      System.out.println(user.getName());
    };
  }

  @Data
  public static class User {
    private Long id;
    private String name;
  }
}

也可以定义Supplier类型的bean,Supplier类型的bean默认会绑定名为output的Binding。下面的代码就定义了一个Supplier类型的bean,返回的是Flux对象,其每秒会产生一条String类型的消息。

@Bean
public Supplier<Flux<String>> supplier() {
  return () -> Flux.interval(Duration.ofSeconds(1)).map(i -> "新消息" + LocalDateTime.now());
}

接着我们可以定义名为output的Binding的destination等信息。比如下面我们定义了其destination为test-topic。

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.group=group1

之前我们定义了Consumer对应的名为input的Binding的destination也为test-topic。所以当应用启动后,Supplier会每秒往test-topic发送一条消息,对应的消息会由Consumer接收到。此时会看到Consumer在控制台打印的如下信息。

收到消息:新消息2019-03-08T21:56:20.577
收到消息:新消息2019-03-08T21:56:21.577
收到消息:新消息2019-03-08T21:56:22.577
收到消息:新消息2019-03-08T21:56:23.576
收到消息:新消息2019-03-08T21:56:24.577
收到消息:新消息2019-03-08T21:56:25.576

完整代码如下:

@SpringBootApplication
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Bean
  public Consumer<String> consumer() {
    return message -> {
      System.out.println("收到消息:" + message);
    };
  }

  @Bean
  public Supplier<Flux<String>> supplier() {
    return () -> Flux.interval(Duration.ofSeconds(1)).map(i -> "新消息" + LocalDateTime.now());
  }

}

当定义Function类型的bean时,因为它同时有入参和出参,默认情况下它的入参会绑定名为input的Binding,出参绑定名为output的Binding。当我们定义下面这样一个Function类型的bean时,它先是把接收到的消息打印到控制台,然后加上response:前缀返回并发送到名为output的Binding。

@Bean
public Function<String, String> function() {
  return input -> {
    System.out.println("Function接收到消息:" + input);
    return "response:" + input;
  };
}

当input和output的Binding定义如下时即表示从test-topic接收到消息,加上response:前缀后又发送到了test-topic1这个Topic上。

spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group

spring.cloud.stream.bindings.output.destination=test-topic1
spring.cloud.stream.bindings.output.group=group1

基于上述配置进行测试的完整代码如下。先是通过sendMessages()往test-topic中发送了5条消息,它们会被Function bean处理后再发送到test-topic1中。而receiveMessages()中定义了对test-topic1中消息的监听,它会收到test-topic1中所有的消息。

@SpringBootApplication
public class Application {

  public static void main(String[] args) {
    sendMessages();
    receiveMessages();
    SpringApplication.run(Application.class, args);
  }

  private static void sendMessages() {
    DefaultMQProducer producer = new DefaultMQProducer("test1");
    producer.setNamesrvAddr("localhost:9876");
    try {
      producer.start();
      for (int i=0; i<5; i++) {
        org.apache.rocketmq.common.message.Message message = new Message();
        message.setTopic("test-topic");
        message.setTags("tag1");
        message.setBody(("消息-" + i).getBytes());
        producer.send(message);
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer.shutdown();
    }
  }

  private static void receiveMessages() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("localhost:9876");
    try {
      consumer.subscribe("test-topic1", "*");
      consumer.setMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
          msgs.forEach(msg -> {
            System.out.println("消费者收到消息:" + new String(msg.getBody()));
          });
          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
      });
      consumer.start();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Bean
  public Function<String, String> function() {
    return input -> {
      System.out.println("Function接收到消息:" + input);
      return "response:" + input;
    };
  }

}

上述代码运行后你会在控制台看到类似如下输出。

Function接收到消息:消息-4
消费者收到消息:response:消息-4
Function接收到消息:消息-2
Function接收到消息:消息-3
Function接收到消息:消息-0
Function接收到消息:消息-1
消费者收到消息:response:消息-2
消费者收到消息:response:消息-0
消费者收到消息:response:消息-3
消费者收到消息:response:消息-1

Supplier默认会绑定名为output的Binding,Consumer默认会绑定名为input的Binding,Function默认会同时绑定input和output这两个Binding。当同时存在多个Consumer或Function类型的bean时,因为它们都与名为input的Binding绑定,当名为input的Binding中有新消息了,Spring Cloud Function默认会抛出异常,因为它不知道该给哪一个Consumer或Function处理。此时可以指定spring.cloud.function.stream.shared=true,这样多个function类型的bean将共享消息。需要注意的是从RocketMQ的角度来讲,它们都属于一个消费者组,它们在共享消息时每条消息只会被一个消费者消费,比如有消息ABCD,消费者1可能消费了AB,消费者2消费了CD。但是对于Spring Cloud Function Stream来讲,它们对应的都是名为input的Binding,一个Binding只对应一个消费者,它能消费所有的消息,Spring Cloud Function内部在共享消息的时候所有与名为input的Binding绑定的Function或Consumer都将收到所有的消息。

@Bean
public Consumer<String> consumer() {
  return message -> {
    System.out.println("Consumer收到消息:" + message);
  };
}

@Bean
public Function<String, String> function() {
  return input -> {
    System.out.println("Function接收到消息:" + input);
    return "response:" + input;
  };
}

当我们定义了上述这样的Function和Consumer类型的bean,又定义了spring.cloud.function.stream.shared=true,当往名为input的Binding指定的destination发送消息时,我们的控制台会输出类似如下这样的内容。

Function接收到消息:消息-3
Consumer收到消息:消息-3
Function接收到消息:消息-2
Consumer收到消息:消息-2
Function接收到消息:消息-0
Consumer收到消息:消息-0
Function接收到消息:消息-1
Consumer收到消息:消息-1
Function接收到消息:消息-4
Consumer收到消息:消息-4

那如果想精确的把一条消息发送到某个Consumer或Function怎么办呢?Spring Cloud Function Stream允许我们通过头信息stream_routekey来指定需要发送的Consumer或Function对应的bean名称。比如想要名为abc的Consumer处理,就指定stream_routekey的值为abc。

Map<String, Object> headers = new HashMap<>();
headers.put("stream_routekey", "consumer");
MessageHeaders messageHeaders = new MessageHeaders(headers);
for (int i=0; i<5; i++) {
  Message<String> message = MessageBuilder.createMessage("消息" + i, messageHeaders);
  this.sender.send(message);
}

因为笔者使用的基于RocketMQ的实现没有转发最原始的Header到新消息的Header中(会放到RocketMQ的消息的properties中),所以笔者模拟不了指定stream_routekey的效果。

 

参考文档

(注:本文是基于Spring Cloud Finchley.SR1所写)

0
0
分享到:
评论

相关推荐

    最简单的 SpringCloud 教程——服务的注册与发现(Eureka)

    在本教程中,我们将深入探讨SpringCloud的核心组件之一——Eureka,它是一个服务注册与发现的工具,使得微服务架构中的各个服务能够互相找到并进行通信。我们将通过两个主要步骤来学习如何使用Eureka:创建服务注册...

    SpringCloud——分布式配置中心(Spring Cloud Config)

    在微服务架构中,Spring Cloud Config 是一个强大的分布式配置中心,它允许开发人员将应用程序的配置存储在远程仓库中,并且可以在运行时动态地管理和更新这些配置,无需重启应用。这个特性对于大型分布式系统来说...

    spring-cloud项目_springcloud_springcloud项目_springcloud_spring-clou

    Spring Cloud 是一个基于 Spring Boot 实现的云应用开发工具集,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)...

    spring cloud 体系版本选型,涉及spring cloud alibaba spring boot spring cloud

    在构建基于Spring Cloud的微服务架构时,版本选型是一个至关重要的步骤,它直接影响到系统的稳定性和可维护性。Spring Cloud作为一个广泛使用的微服务框架,其版本迭代迅速,每个版本都有其特定的功能特性和生命周期...

    Spring Cloud实战 _springcloud实战_springcloud_

    《Spring Cloud实战》一书深入探讨了Spring Cloud这一强大的微服务框架,它是当前软件开发领域中最受欢迎的构建微服务架构的工具。Spring Cloud基于Spring Boot的便利性,为开发者提供了全面的微服务开发支持,包括...

    SpringCloud——Zookeeper(注册中心)

    SpringCloud为Zookeeper提供了一个名为`spring-cloud-starter-zookeeper`的启动器,允许开发者轻松地将Zookeeper集成到SpringBoot应用中。首先,我们需要在项目中引入依赖: ```xml &lt;groupId&gt;org.springframework...

    SpringCloud——消息总线(Bus)

    在微服务架构中,Spring Cloud Bus 是一个非常重要的组件,它起到了消息总线的作用,能够有效地连接各个微服务节点,并且结合轻量级的消息代理,如 RabbitMQ 或 Kafka,实现跨服务的通信和同步更新。下面我们将深入...

    SpringCloud——服务注册(consul)

    Spring Cloud作为微服务解决方案的一部分,提供了多种服务发现工具,其中Consul是其中一个流行的选择。本文将深入探讨Spring Cloud如何整合Consul实现服务注册。 Consul是由HashiCorp公司开发的一款分布式系统服务...

    SpringCloud——断路器(Hystrix)

    在分布式系统中,Spring Cloud Hystrix 是一个关键的组件,它作为一个断路器来防止服务雪崩。断路器模式是微服务架构中的一个重要概念,用于提高系统的容错性和稳定性。下面我们将深入探讨 Spring Cloud Hystrix 的...

    SpringCloud中文文档

    Spring Cloud中文文档 Spring Cloud 是一个用于快速构建分布式系统的工具集,提供了配置管理、服务发现、断路器、智能路由、微代理、控制总线等多种功能。这些功能可以帮助开发人员快速地支持实现分布式系统中的...

    SpringCloud项目源码下载.docx

    对于Spring Cloud项目的源码下载,文档中提供了百度网盘的下载链接(https://pan.baidu.com/s/1zWNippNZ1eYL14zVD-4xxQ),提取码为qmu5。下载后可以通过阅读源码来深入了解Spring Cloud的实现原理和技术细节,这...

    SpringCloud系列Demo代码,每个子项目都是SpringCloud的一个知识点

    SpringCloud系列Demo代码,每个子项目都是SpringCloud的一个知识点或者说技能点且都有对应的博客介绍,代码开箱即用适合新手学习或老司机复习。 SpringCloud系列Demo代码,每个子项目都是SpringCloud的一个知识点...

    方志朋版——深入理解Spring Cloud与微服务构建.pdf

    《深入理解Spring Cloud与微服务构建》是针对Java开发者,特别是对微服务架构感兴趣的开发者的一本重要参考资料。Spring Cloud作为目前最流行的微服务框架之一,它提供了构建分布式系统所需的多种工具和服务,包括...

    基于SpringCloud的快递驿站系统源码.zip

    基于SpringCloud的快递驿站系统源码 基于SpringCloud的快递驿站系统源码 基于SpringCloud的快递驿站系统源码 基于SpringCloud的快递驿站系统源码 基于SpringCloud的快递驿站系统源码 基于SpringCloud的...

    基于spring cloud 和vue全家桶的开源电商源码

    基于spring cloud 和vue全家桶的开源电商源码基于spring cloud 和vue全家桶的开源电商源码基于spring cloud 和vue全家桶的开源电商源码基于spring cloud 和vue全家桶的开源电商源码基于spring cloud 和vue全家桶的...

    springcloud视频学习

    《SpringCloud视频学习》 SpringCloud作为微服务架构的重要实现框架,深受广大开发者的喜爱。本资源包含了两部关于SpringCloud的视频教程,由尚硅谷出品,内容详实且易于理解,是学习SpringCloud的理想资料。 一、...

    SpringCloud项目实战各组件源代码案例

    Spring Cloud系列教程 Spring Boot Spring Cloud Stream 和 Kafka案例教程 springcloud生产者与消费者项目实战案例 Spring Cloud 中断路器 Circuit Breaker的应用 配置 Spring Cloud Config Server Spring Cloud ...

    SpringCloud.pdf

    Spring Cloud 是一个强大的框架,专为开发人员设计,旨在简化构建分布式系统中的各种常见模式。分布式系统的复杂性催生了一系列模板模式,Spring Cloud 提供了一站式的解决方案,使得开发者能够快速支持并实现这些...

    spring cloud 实战教程

    接着,会介绍Spring Cloud的核心组件——Eureka,它是服务注册与发现的实现,让服务提供者和服务消费者能够自动发现彼此。通过实例,读者可以学习到如何创建和注册服务,以及如何实现服务调用。 Spring Cloud ...

    Spring Cloud Alibaba操作手册.pdf

    Spring Cloud Alibaba 操作手册 Spring Cloud Alibaba 是一种基于 Spring Cloud 的微服务架构解决方案,旨在帮助开发者快速构建分布式系统。该手册旨在指导开发者如何使用 Spring Cloud Alibaba 实现微服务架构的...

Global site tag (gtag.js) - Google Analytics