主要使用integration的聚合器
1.pom.xml关键jar引用
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.51</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.5.6</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <scope>provided</scope> </dependency>
2.http-inbound-config.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-http="http://www.springframework.org/schema/integration/http" xmlns:context="http://www.springframework.org/schema/context" xmlns="http://www.springframework.org/schema/beans" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <int:channel id="echoChannel"> <int:queue capacity="1000"/> </int:channel> <int-http:inbound-gateway request-channel="echoChannel" path="/cc/test" supported-methods="POST" request-payload-type="java.lang.String"> <int-http:cross-origin/> </int-http:inbound-gateway> <int:chain input-channel="echoChannel" output-channel="search-request-a"> <int:poller fixed-rate="1" max-messages-per-poll="10" time-unit="SECONDS"> </int:poller> <int:header-enricher> <int:header name="ssmail" value="test"/> </int:header-enricher> <int-http:outbound-gateway charset="UTF-8" http-method="POST" expected-response-type="com.test.integration.splitteraggregator.CriteriaA" encode-uri="true" url="{testApi}" uri-variables-expression="@testApi" reply-timeout="30000" /> </int:chain> <bean id="serviceActivatorA" class="com.test.integration.splitteraggregator.ServiceActivatorA"></bean> <int:service-activator input-channel="search-request-a" method="service" output-channel="search-reply" ref="serviceActivatorA"> </int:service-activator> <int:channel id="echoChannel2"> <int:queue capacity="1000"/> </int:channel> <int:logging-channel-adapter id="logger" level="INFO" log-full-message="true"/> <int-http:inbound-gateway request-channel="echoChannel2" path="/cc/test2" supported-methods="POST" request-payload-type="java.lang.String"> <int-http:cross-origin/> </int-http:inbound-gateway> <int:chain input-channel="echoChannel2" output-channel="search-request-b"> <int:poller fixed-rate="1" max-messages-per-poll="10" time-unit="SECONDS"> </int:poller> <int:header-enricher> <int:header name="ssmail" value="test"/> </int:header-enricher> <int-http:outbound-gateway charset="UTF-8" http-method="POST" expected-response-type="byte[]" encode-uri="true" url="{test2Api}" uri-variables-expression="@test2Api" reply-timeout="30000" /> </int:chain> <!-- <int:channel id="search-request-b"/> --> <bean id="serviceActivatorB" class="com.test.integration.splitteraggregator.ServiceActivatorB"></bean> <int:service-activator input-channel="search-request-b" method="service" output-channel="search-reply" ref="serviceActivatorB"> </int:service-activator> <int:channel id="aggregated-reply"/> <int:channel id="search-reply" > <int:interceptors> <int:wire-tap channel="logger" /> </int:interceptors> </int:channel> <int:aggregator input-channel="search-reply" method="aggregate" ref="resultAggregator" output-channel="aggregated-reply" message-store="searchResultMessageStore" expire-groups-upon-completion="true" correlation-strategy-expression="headers['ssmail']" expire-groups-upon-timeout="true" release-strategy-expression="size() == 2" send-partial-result-on-expiry="true"> </int:aggregator> <int:logging-channel-adapter channel="aggregated-reply" level="INFO" log-full-message="true"></int:logging-channel-adapter> <!-- Define a store for our search results and set up a reaper that will periodically expire those results. --> <bean id="searchResultMessageStore" class="org.springframework.integration.store.SimpleMessageStore" /> <bean id="searchResultMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper"> <property name="messageGroupStore" ref="searchResultMessageStore" /> <property name="timeout" value="20000" /> </bean> <bean id="resultAggregator" class="com.test.integration.splitteraggregator.ResultAggregator" /> </beans>
说明:path="/cc/test"是本地监听路径,标蓝url="{testApi}"即为实际访问路径,并返回结果到search-request-a这个通道中,再由serviceActivatorA处理结果集,最后交给search-reply聚合数据。
3.相关java代码
//响应信息 public class Result { private String title; private String content; } public class CriteriaA { private Boolean success;//根据系统响应属性不同而定义 private String code; private String error; private Result result; } public class CriteriaB { private Integer code;//根据系统响应属性不同而定义 private String errmsg; private Result result; }
//响应信息处理,处理方式根据系统响应结果而定 public class ServiceActivatorA { public Result service(CriteriaA criteria) { Result result = new Result(); if(criteria.getSuccess() && criteria.getResult!=null){ result = criteria.getResult(); } return result; } } public class ServiceActivatorB { public Result service(byte[] data) { Result result = new Result(); try { String dataStr = new String(data, "UTF-8"); CriteriaB criterib = JSON.parseObject(dataStr, CriteriaB.class); if(criterib.getCode()==10000 && criterib.getResult()!=null) { result = criterib.getResult(); } } catch (Exception e) { e.printStackTrace(); } return result; } }
//聚合结果集 public class CompositeResult extends Result { private Collection<Result> results = new ArrayList<Result>(); public Collection<Result> getResults() { return results; } } public class ResultAggregator { public Result aggregate(Collection<Result> results) { CompositeResult result = new CompositeResult(); result.getResults().addAll(results); System.out.println("result size:"+result.getResults().size()); //具体业务处理。。。 return result; } }
另外说明uri-variables-expression动态配置:获取bean对象返回值(该值必须为map),然后通过{xxx}这个方式获取,关键代码:
@Configuration public class UrlConfig { @Value("${url.testApi}") private String testApi; @Value("${url.test2Api}") private String test2Api; @Bean(name="testApi") public Map<String, String> getTestApi() { Map<String, String> map = new HashMap<String, String>(); map.put("testApi", testApi); return map; } @Bean(name="test2Api") public Map<String, String> getTest2Api() { Map<String, String> map = new HashMap<String, String>(); map.put("test2Api", test2Api); return map; } }
4.项目启动
@SpringBootApplication @ImportResource(locations = "classpath:http-inbound-config.xml") @EnableIntegration public class HttpApplication { public static void main(String[] args) { SpringApplication.run(HttpApplication.class, args); } }
5.访问,注意要保证跳转的路径(testApi,test2Api)能访问
可通过Restlet Client访问,post请求输入localhost:8080/cc/test,localhost:8080/cc/test2,若有参数就输入参数
相关推荐
综上所述,`kafkaTest1`项目展示了Spring Boot基础的Kafka集成,而`kafkaTest`项目则利用了`spring-integration-kafka`插件来实现更灵活的消息处理。这两个项目都为开发者提供了一种便捷的方式,将Kafka集成到Spring...
Java_Apache Camel Spring Boot示例是一个综合性的项目,展示了如何在Spring Boot应用程序中集成Apache Camel框架。Apache Camel是一个流行的开源框架,它简化了企业级集成(EIP,Enterprise Integration Patterns)...
This book will help you understand what Spring Boot is, how Spring Boot helps you build Spring-based applications quickly and easily, and the inner workings of Spring Boot using easy-to-follow ...
14. **事件驱动和消息总线**:Spring Boot支持Spring Integration和RabbitMQ、Kafka等消息中间件,实现异步处理和解耦。 通过阅读《Pro Spring Boot》并结合源码,你可以全面掌握Spring Boot的各个方面,从基础到...
Build a microservices architecture with Spring Boot, by evolving an application from a small monolith to an event-driven architecture composed of several services. This book follows an incremental ...
Spring is the most popular Java-based framework for building ... Spring Boot addresses this “Spring applications need complex configuration” problem by using its powerful autoconfiguration mechanism.
Spring Boot还支持集成测试,使用`@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)`可以启动一个带有随机端口的完整应用,适用于测试不同组件之间的交互。 八、测试覆盖率 测试覆盖率是衡量代码被...
Spring Boot集成Java DSL的实现代码 Spring Boot集成Java DSL是指将Java DSL集成到Spring Boot项目中,以便使用Java DSL的强大功能来构建流程。Java DSL是Spring Integration的一部分,提供了一种基于Java Config的...
This book will help you understand what Spring Boot is, how Spring Boot helps you build Spring-based applications quickly and easily, and the inner workings of Spring Boot using easy-to-follow ...
spring-boot2-sleuth集成演示 Spring Cloud Sleuth与Spring Boot 2功能端点的集成。 一个演示应用程序,用于显示Spring Boot 2 WebFulx应用程序的Spring Cloud Sleuth集成。 要运行该应用程序,请使用以下命令...
这份指南涵盖了Spring Boot的核心概念、配置、启动与运行,以及与其他技术的集成。 1. **核心概念** - **起步依赖(Starter Dependencies)**:Spring Boot通过起步依赖简化了项目构建,这些依赖包含了运行应用所...
Spring Boot 整合 FTPClient 线程池的实现示例 在本文中,我们将探讨如何在 Spring Boot 项目中整合 FTPClient 线程池的实现示例。FTPClient 是一个常用的 FTP 客户端库,而线程池则可以帮助我们减少频繁创建和销毁...
Spring Boot 单元测试和集成测试实现详解 Springs Boot 是一个基于 Java 的框架,提供了一个便捷的方式来开发基于 Spring 的应用程序。在开发过程中,测试是一个非常重要的步骤,单元测试和集成测试是其中的两种...
在本篇文章中,我们将深入探讨如何在Spring Boot项目中集成MQTT,实现消息的推送和订阅。 首先,让我们了解MQTT的工作原理。MQTT协议基于客户端-服务器架构,其中客户端可以是设备、应用或其他系统,它们通过发布...
- **Spring Integration**:实现企业级应用集成的服务框架。 通过以上对 Spring Boot 2.0 完整版视频教程的知识点概述,我们不仅能够了解 Spring Boot 的基本概念和发展历程,还能深入学习其核心特性和高级用法,这...
Spring Boot Messaging涉及的是如何在Spring Boot环境中实现企业级消息传递和集成解决方案。Spring Boot作为Spring框架的一个模块,它为快速开发和运行独立的、生产级别的基于Spring的应用程序提供便利。Spring Boot...
- 安装后,为了支持Maven和Spring Boot,需要安装两个插件:M2E (Maven Integration for Eclipse) 和Spring Tools Suite (STS)。这两个插件可以在Eclipse Marketplace中找到并安装。 3. **Maven设置**: - Maven...