- 浏览: 99611 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (50)
- druid (1)
- java (7)
- database (1)
- 源码 (6)
- dubbo (4)
- jetty (1)
- protocol (1)
- 工作流 (3)
- 电商 (1)
- 支付平台 (1)
- 大数据 (0)
- 微信小程序 (1)
- 短信平台 (1)
- jms (1)
- jndi (1)
- spi (1)
- FileUpload (0)
- concurrent (1)
- 统计业务 (0)
- 业务 (3)
- sql (2)
- andriod (1)
- maven (1)
- OAuth (1)
- ws (1)
- spring (6)
- mybatis (1)
- java rocketmq (0)
- rocketmq (1)
- RxJava (1)
- java模式 (1)
- AI (0)
- 机器学习 (1)
- springboot (1)
- tomcat (1)
- 协议 (1)
- springcloud (1)
- stream (1)
- 技术管理 (1)
- k8s (1)
- ser (0)
- istio (1)
- ddd (1)
- 微服务 (1)
- 操作笔记 (1)
最新评论
-
herman_liu76:
luozhen89 写道讲得很好。知识后面的KAFKA跟OAu ...
尽量把OAuth2.0的原理讲透透的 -
luozhen89:
讲得很好。知识后面的KAFKA跟OAuth有什么关系,没看懂。 ...
尽量把OAuth2.0的原理讲透透的 -
herman_liu76:
ZHENFENGSHISAN 写道太累了啊,哥唉~ 我也觉得很 ...
代码快看哭了-吐槽与感悟汇总 -
ZHENFENGSHISAN:
太累了啊,哥
代码快看哭了-吐槽与感悟汇总 -
herman_liu76:
1126481146 写道厉害啊,有联系方式吗,学习学习,我现 ...
druid 源码分析与学习(含详细监控设计思路的彩蛋)
# 前言
刚看到Stream的功能是对接mq产品,以为就是包装一些mq产品接口,实现自动装配后统一使用。但看了一个简单的demo,是使用rabbitMq产品的binder,还有输入输出接口方法通过配置,来对应不同的mq产品。所以作者实现的功能是在自己的channel与mq产品之间做了一个binder,这样方便的改变配置就使用多个mq,也可以方便的换不同的mq。
但是这些stream的channel如何被实现的,实现类是什么,binder又是如何加载进来的,又是如何通过binding操作把两者绑在一起的?什么时候绑的?都值得了解一下。于是简单浏览了源码,并基于一个rabitmq的demo,补充写一个简单的binder进行了测试。
本文草稿用markdown写的,格式也没过多清除。主要内容包含三部分:分别是基本的stream的使用,源码分析,自定义binder的使用。
# 1. rabbit为例如何使用stream
## 1.1 基本的使用步骤
这个方便找到一些相关的帖子
## 1.2 测试项目与结果展示
四个红色的文件:有自己加的spring.binders文件,有channel接口,有controller类产生消息,也有消息到了mockmq后,消费消息的类。最下面的红框是日志结果。mockmq只是一个MsgHolder,可以写入消息,可以配置监听,把消息发给监听器。
## 1.3 业务过程
用户使用channel发消息,由于channel在binding时给加了一个监听器,监听器收到消息后才发给mq的生产者。(客户用channel发送,channel的监听器收到,再转发给mqProducer)
使用channel接收消息时,由于binding会产生监听,作为mq的消费者,它得到mq消息后用这个channel发送。而用户设置了channel的监听,就收到了消息。(mqConsumer监听消息,用channel发送,客户的channel监听收到)
下面都看看channel是如何产生的,binder是如何进入容器的。binder什么时候给channel加上需要的监听器,或者给mqConsumer加上监听器的。
# 2. 源码分析
## 2.1 从@enableBinding开始
我们知道很多功能都是从@enable***开始的。这个注解可以加上几个channel的接口。
具体是导入这些类:@Import({ BindingServiceConfiguration.class, BindingBeansRegistrar.class, BinderFactoryConfiguration.class,SpelExpressionConverterConfiguration.class })
## 2.2 导入的BindingServiceConfiguration.class
**看名字是【绑定服务】的配置,这是个@Configuration的类。主要看下面三个类:**
后面两个都实现了smartlifecycle接口,在容器启动时,也会执行start(),这时会通过bindingService来进行所有的绑定。这就是绑定时机,另外在stop()时,还会用bindingService进行unbinding操作。
**初步看看bindingService的主要操作:**
绑定服务有了,要绑定的两个对象还没有看到。一个channel将与一个mq产品的生产者或者消费者进行绑定。
## 2.3 导入的BindingBeansRegistrar.class
这个是用于注册bean定义的类,用于处理@EnableBinding中的值,也就是channel接口的类,应该就是被绑定的对象了。
重点看BindableProxyFactory这个类,是个工厂bean。
上面有了绑定服务,也有了绑定对象了,还缺少绑定者binder。
## 2.4 导入的BinderFactoryConfiguration.class
看名字,这个是binder的工厂,有了工厂,binder就肯定有了。重点看两个bean.
看一下这个binder工厂:DefaultBinderFactory,以及其中最主要的getBinder方法。
## 2.5 回头看绑定操作-OutputBindingLifecycle
以发送绑定为例子。
# 3. 测试自定义的binder
直接在一个stream的rabbitmq的demo项目上添加,前面已经显示过一些文件。这里的mq就用一个可接受msg,也可以收到监听时,把msg给它的类代替。
## 3.1 写一个binder与它的配置类及文件
binder类:
binder的配置类
建一个meta-inf文件夹,里面写一个spring.binders文本文件,只有一句,指出binders的配置类。
## 3.2 channel接口类并放在@enableBinding中
特意排除了binder的配置类。
application.propeties文件中增加这些,表示两个通道都被绑定到我自己加的binder上。spring.binders中有`liujunmq`
## 3.3 发送、接收及模拟的mq
在一个controller中,装配一个特定的发送通道,把http请求中的msg发出去。
两种方式消费,指定所用的Input通道名字。
## 3.4 运行结果
浏览器输入:[http://192.168.1.6:8080/message/sendLiujun?message=herriman目]
按照设计,正确的输出了每一步的日志。通过本次学习,兼容多种产品学到了一招,在springcloud bus中也是,一个应用产生了event,又自己的容器监听到再发给对方,对方收到后,产生一个event,再由自己的监听处理。即兼容了本地的机制,又兼顾了远程传输。
另外就是如何把你要的类,比如本例中的channel实例都加载到容器中,binder实例如何加载到容器中。而关联功能又是一个类负责,职责明确。另外还有两个类在正确的容器时机,利用关联类进行绑定与解除的处理。所有设计功能,职责,时机都很有讲究。
刚看到Stream的功能是对接mq产品,以为就是包装一些mq产品接口,实现自动装配后统一使用。但看了一个简单的demo,是使用rabbitMq产品的binder,还有输入输出接口方法通过配置,来对应不同的mq产品。所以作者实现的功能是在自己的channel与mq产品之间做了一个binder,这样方便的改变配置就使用多个mq,也可以方便的换不同的mq。
但是这些stream的channel如何被实现的,实现类是什么,binder又是如何加载进来的,又是如何通过binding操作把两者绑在一起的?什么时候绑的?都值得了解一下。于是简单浏览了源码,并基于一个rabitmq的demo,补充写一个简单的binder进行了测试。
本文草稿用markdown写的,格式也没过多清除。主要内容包含三部分:分别是基本的stream的使用,源码分析,自定义binder的使用。
# 1. rabbit为例如何使用stream
## 1.1 基本的使用步骤
这个方便找到一些相关的帖子
```java //1. pom中引入 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> //2. 定义各个stream自己的通道。Output的发送,Input的接收。这个接口类会被一个配置类,上面通过加@EnableBinding({MessageSource.class})来触发实现类。和@Enable***都差不多。 //这里有三个chnanel,一个接收,两个发送。 public interface MessageSource { String NAME = "pao";//管道名称:"pao" @Output(NAME) MessageChannel pao(); @Output("liujunTopic") MessageChannel liujunTopic(); @Input("liujunRevc") MessageChannel liujunRevc(); } //3. 配置文件application.properties中相关的内容。 //spring.cloud.stream.bindings.${channel-name}.destination //bindings后面的就是上面的通道名字,表示这个通道将和哪个mq的binder绑定。destination表示toipc吧。 //相关的binder:rabbit,上面的pox中引入包的meta-inf中的spring.binders文件中有,比如: //rabbit:org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration。 //前缀【rabbit】可以看到下面的default中有。指明的【RabbitServiceAutoConfiguration】这个自动配置文件中,会产生RabbitMessageChannelBinder.class这个类对象到容器中。 //liujunmq是我自己弄的一个binder,就是测试用的。 spring.cloud.stream.bindings.output.destination = ${kafka.topic} spring.cloud.stream.bindings.pao.destination = test spring.cloud.stream.bindings.input.destination = ${kafka.topic} spring.cloud.stream.defaultBinder=rabbit spring.cloud.stream.bindings.liujunTopic.binder=liujunmq #spring.cloud.stream.bindings.liujunTopic.destination=liujunTest spring.cloud.stream.bindings.liujunRevc.binder=liujunmq //4. 使用2中的接口,在controller中,可以装配channel,按名字liujunTopic。后面就可以直接用它来发送了。 @Autowired @Qualifier("liujunTopic") // Bean 名称 private MessageChannel liujunMessageChannel; @GetMapping("/message/sendLiujun") public Boolean sendLiujun(@RequestParam String message) { System.out.println("1. msg received through the httpClient..."); liujunMessageChannel.send(MessageBuilder.withPayload(message).build()); return true; } ```
## 1.2 测试项目与结果展示
四个红色的文件:有自己加的spring.binders文件,有channel接口,有controller类产生消息,也有消息到了mockmq后,消费消息的类。最下面的红框是日志结果。mockmq只是一个MsgHolder,可以写入消息,可以配置监听,把消息发给监听器。
## 1.3 业务过程
用户使用channel发消息,由于channel在binding时给加了一个监听器,监听器收到消息后才发给mq的生产者。(客户用channel发送,channel的监听器收到,再转发给mqProducer)
使用channel接收消息时,由于binding会产生监听,作为mq的消费者,它得到mq消息后用这个channel发送。而用户设置了channel的监听,就收到了消息。(mqConsumer监听消息,用channel发送,客户的channel监听收到)
下面都看看channel是如何产生的,binder是如何进入容器的。binder什么时候给channel加上需要的监听器,或者给mqConsumer加上监听器的。
# 2. 源码分析
## 2.1 从@enableBinding开始
我们知道很多功能都是从@enable***开始的。这个注解可以加上几个channel的接口。
具体是导入这些类:@Import({ BindingServiceConfiguration.class, BindingBeansRegistrar.class, BinderFactoryConfiguration.class,SpelExpressionConverterConfiguration.class })
## 2.2 导入的BindingServiceConfiguration.class
**看名字是【绑定服务】的配置,这是个@Configuration的类。主要看下面三个类:**
- - new BindingService(bindingServiceProperties, binderFactory);
- - new OutputBindingLifecycle();-->bindable.bindOutputs(bindingService);
- - new InputBindingLifecycle();-->bindable.bindInputs(bindingService);
后面两个都实现了smartlifecycle接口,在容器启动时,也会执行start(),这时会通过bindingService来进行所有的绑定。这就是绑定时机,另外在stop()时,还会用bindingService进行unbinding操作。
**初步看看bindingService的主要操作:**
- - bindProducer:getBinder得到binder,再用它binder.bindProducer(bindingTarget, output,
- producerProperties);---参数主要是stream的channel与属性值。
- - bindConsumer:getBinder得到binder,再用它binder.bindConsumer(target,
- bindingServiceProperties.getGroup(inputName), input,
- consumerProperties);---参数主要是stream的channel与属性值。
绑定服务有了,要绑定的两个对象还没有看到。一个channel将与一个mq产品的生产者或者消费者进行绑定。
## 2.3 导入的BindingBeansRegistrar.class
这个是用于注册bean定义的类,用于处理@EnableBinding中的值,也就是channel接口的类,应该就是被绑定的对象了。
```java //一般对于接口,肯定是动态代理产生一个类。这个类一般通过一个factoryBean的getObject()方法得到。比如duboo中,对接口的实现就是把请求代理成一个远程的消息发送。 // if (type.isInterface()) { RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(BindableProxyFactory.class); rootBeanDefinition.addQualifier(new AutowireCandidateQualifier(Bindings.class, parent)); rootBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(type); registry.registerBeanDefinition(type.getName(), rootBeanDefinition); } ```
重点看BindableProxyFactory这个类,是个工厂bean。
```java * {@link FactoryBean} for instantiating the interfaces specified via * {@link EnableBinding} //所有EnableBinding指明的接口的实例化类,实现了工厂bean //自己又是一个Interceptor,产生代理类 public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Object>, Bindable, InitializingBean ------------------------------------------------------------------ //getObject果然返回代理对象,MethodInterceptor还是this。 @Override public synchronized Object getObject() throws Exception { if (this.proxy == null) { ProxyFactory factory = new ProxyFactory(this.type, this); this.proxy = factory.getProxy(); } return this.proxy; } ------------------------------------------------------------------ //看看channel接口的代理对象的方法,执行是如何的?是直接从inputHolders拿到,按名字缓存起来。 @Override public synchronized Object invoke(MethodInvocation invocation) throws Throwable { Method method = invocation.getMethod(); ...//方法上的Input注解的名字,作为channel的名字。 Input input = AnnotationUtils.findAnnotation(method, Input.class); if (input != null) { String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(input, method); boundTarget = this.inputHolders.get(name).getBoundTarget(); targetCache.put(method, boundTarget); return boundTarget; } else { ...//output略 } return null; } ------------------------------------------------------------------ //从invoker方法中,看到代理类是根据Input.class注解的名字,从inputHolders这样一个map中拿到的对象。说明这个对象应该已经存在了。 //在产生代理类产生之前,即调用getObject()之前,早就先加载了相应的boundTarget放map中了。 //果然有afterPropertiesSet方法,它对input与output分别进行了处理,把产生的channel对象放到了inputHolders中。上面的invoke才能拿到。这句是input注解的方法的处理。 BindableProxyFactory.this.inputHolders.put(name, new BoundTargetHolder(getBindingTargetFactory(returnType).createInput(name), true)); //按类型得到工厂,再根据名字产生绑定对象。 //关于工厂,第一个导入类中有这个bean,就是BindingTargetFactory,可以生成BindingTarget。 @Bean public SubscribableChannelBindingTargetFactory channelFactory( CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) { return new SubscribableChannelBindingTargetFactory(compositeMessageChannelConfigurer); } //工厂是这么产生绑定对象的。 SubscribableChannelBindingTargetFactory-->createInput/createOutput-->return SubscribableChannel subscribableChannel = new DirectChannel(); //这个steam的channel对象就是下面的样子,是可以被订阅的。最开始的例子说明发送/接收都可以被订阅(监听)。input与output都是这个,因为都需要一端发,另一端订阅接收。 public class DirectChannel extends AbstractSubscribableChannel ```
上面有了绑定服务,也有了绑定对象了,还缺少绑定者binder。
## 2.4 导入的BinderFactoryConfiguration.class
看名字,这个是binder的工厂,有了工厂,binder就肯定有了。重点看两个bean.
```java //这个是binder的工厂 @Bean @ConditionalOnMissingBean(BinderFactory.class) public DefaultBinderFactory binderFactory() { DefaultBinderFactory binderFactory = new DefaultBinderFactory(getBinderConfigurations()); binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder()); binderFactory.setListeners(binderFactoryListeners); return binderFactory; } //------------------------------------------------------------------ //这个是binder的类型注册,从META-INF/spring.binders文件中来,本例中有两个,一个自己写的,一个是rabbit的。 @Bean @ConditionalOnMissingBean(BinderTypeRegistry.class) public BinderTypeRegistry binderTypeRegistry(ConfigurableApplicationContext configurableApplicationContext) { Map<String, BinderType> binderTypes = new HashMap<>(); ... try { Enumeration<URL> resources = classLoader.getResources("META-INF/spring.binders"); ... while (resources.hasMoreElements()) { URL url = resources.nextElement(); UrlResource resource = new UrlResource(url); for (BinderType binderType : parseBinderConfigurations(classLoader, resource)) { binderTypes.put(binderType.getDefaultName(), binderType); } } } ... return new DefaultBinderTypeRegistry(binderTypes); } ```
看一下这个binder工厂:DefaultBinderFactory,以及其中最主要的getBinder方法。
```java public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware //得到binder的方法。其中根据文件中的binder配置类,还产生了一个子容器。再从中取出Binder.class类型的bean。说明每个mq都是一个子容器当中? //子容器当然可以使用父容器中的对象,父容器也可以通过这个工厂类,得到子容器中的binder。 private <T> Binder<T, ?, ?> getBinderInstance(String configurationName) { ... Properties binderProperties = binderConfiguration.getProperties(); ArrayList<String> args = new ArrayList<>(); for (Map.Entry<Object, Object> property : binderProperties.entrySet()) { args.add(String.format("--%s=%s", property.getKey(), property.getValue())); } ... args.add("--spring.main.applicationContextClass=" + AnnotationConfigApplicationContext.class.getName()); List<Class<?>> configurationClasses = new ArrayList<Class<?>>( Arrays.asList(binderConfiguration.getBinderType().getConfigurationClasses())); SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder() .sources(configurationClasses.toArray(new Class<?>[] {})).bannerMode(Mode.OFF).web(false); if (useApplicationContextAsParent) { springApplicationBuilder.parent(this.context); } ... ConfigurableApplicationContext binderProducingContext = springApplicationBuilder .run(args.toArray(new String[args.size()])); @SuppressWarnings("unchecked") Binder<T, ?, ?> binder = binderProducingContext.getBean(Binder.class); ... this.binderInstanceCache.put(configurationName, new BinderInstanceHolder(binder, binderProducingContext)); } return (Binder<T, ?, ?>) this.binderInstanceCache.get(configurationName).getBinderInstance(); } ```
## 2.5 回头看绑定操作-OutputBindingLifecycle
以发送绑定为例子。
```java //实现了SmartLifecycle,可以随容器启停。实现了ApplicationContextAware,可以方便拿容器中的bean使用。 public class OutputBindingLifecycle implements SmartLifecycle, ApplicationContextAware ////容器启动带着这个也start()。找出bindable进行bindInputs and OutPuts @Override public void start() { if (!running) { // retrieve the BindingService lazily, avoiding early initialization try { BindingService bindingService = this.applicationContext .getBean(BindingService.class); Map<String, Bindable> bindables = this.applicationContext .getBeansOfType(Bindable.class); for (Bindable bindable : bindables.values()) { bindable.bindOutputs(bindingService); } } ... } } //-------------------------------------------------------- //BindableProxyFactory中执行上面的bindable.bindOutputs(bindingService); //BindableProxyFactory实现了bindable,正好也是因为它的outputHolders持有所有的outputchannel这些target。通过bindingService来进行。 @Override public void bindOutputs(BindingService bindingService) { ... for (Map.Entry<String, BoundTargetHolder> boundTargetHolderEntry : this.inputHolders.entrySet()) { String inputTargetName = boundTargetHolderEntry.getKey(); BoundTargetHolder boundTargetHolder = boundTargetHolderEntry.getValue(); if (boundTargetHolder.isBindable()) { bindingService.bindConsumer(boundTargetHolder.getBoundTarget(), inputTargetName); } } } //-------------------------------------------------------- //bindingService.bindConsumer。找到binder,绑定目标。 public <T> Binding<T> bindProducer(T output, String outputName) { String bindingTarget = this.bindingServiceProperties .getBindingDestination(outputName); Binder<T, ?, ProducerProperties> binder = (Binder<T, ?, ProducerProperties>) getBinder(outputName, output.getClass()); ... Binding<T> binding = binder.bindProducer(bindingTarget, output, producerProperties); this.producerBindings.put(outputName, binding); return binding; } //-------------------------------------------------------- //getBinder(outputName, output.getClass());用binder工厂,以配置文件与类型为参数。前面说过可能有子容器的问题。 private <T> Binder<T, ?, ?> getBinder(String channelName, Class<T> bindableType) { String binderConfigurationName = this.bindingServiceProperties.getBinder(channelName); return binderFactory.getBinder(binderConfigurationName, bindableType); } ```
# 3. 测试自定义的binder
直接在一个stream的rabbitmq的demo项目上添加,前面已经显示过一些文件。这里的mq就用一个可接受msg,也可以收到监听时,把msg给它的类代替。
## 3.1 写一个binder与它的配置类及文件
binder类:
```java public class LiujunMessageChannelBinder implements Binder<MessageChannel, ConsumerProperties, ProducerProperties> { @Autowired MsgHolder msgHolder;//一个mock的mq.应该在父容器中。 @Override public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inboundBindTarget, ConsumerProperties consumerProperties) { try { //在mq中加一个监听消费者,收到消息就用stream的Channel发出去。真正的客户会监听stream的Channel msgHolder.setMsgListener(msg -> { System.out.println("4. mqClient listener got the msg from mock-mq,then sent to targetOutput!"); inboundBindTarget.send(msg); }); } catch (Exception e) { e.printStackTrace(); } //返回实现unbinding类 return () -> { System.out.println("Unbinding"); }; } @Override public Binding<MessageChannel> bindProducer(String name, MessageChannel outputChannel, ProducerProperties producerProperties) { Assert.isInstanceOf(SubscribableChannel.class, outputChannel, "Binding is supported only for SubscribableChannel instances"); SubscribableChannel subscribableChannel = (SubscribableChannel) outputChannel; //监听真正用户用Channel发来send来的消息,再发(设置)给mq。 subscribableChannel.subscribe(message -> { // Object messageBody = message.getPayload(); System.out.println("2. Input subscriber get the msg and send to mock-mq:" + message); msgHolder.setMsg(message); }); //返回实现unbinding类 return () -> { System.out.println("Unbinding"); }; } } ```
binder的配置类
```java //binder的配置类。 @Configuration //@ConditionalOnMissingBean(Binder.class) public class LiujunMessageChannelBinderConfiguration { @Bean LiujunMessageChannelBinder liujunMessageChannelBinder() { LiujunMessageChannelBinder binder = new LiujunMessageChannelBinder(); return binder; } } ```
建一个meta-inf文件夹,里面写一个spring.binders文本文件,只有一句,指出binders的配置类。
```properties liujunmq:\ com.gupao.springcloudstream.rabbitmq.stream.LiujunMessageChannelBinderConfiguration ```
## 3.2 channel接口类并放在@enableBinding中
```java public interface MessageSource { //发消息 @Output("liujunTopic") MessageChannel liujunTopic(); //收 @Input("liujunRevc") MessageChannel liujunRevc(); } ```
特意排除了binder的配置类。
```java @SpringBootApplication @ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {LiujunMessageChannelBinderConfiguration.class})) @EnableBinding({MessageSource.class}) public class SpringCloudStreamApplication { public static void main(String[] args) { SpringApplication.run(SpringCloudStreamApplication.class, args); } } ```
application.propeties文件中增加这些,表示两个通道都被绑定到我自己加的binder上。spring.binders中有`liujunmq`
```properties spring.cloud.stream.bindings.liujunTopic.binder=liujunmq spring.cloud.stream.bindings.liujunRevc.binder=liujunmq ```
## 3.3 发送、接收及模拟的mq
在一个controller中,装配一个特定的发送通道,把http请求中的msg发出去。
```java @Autowired @Qualifier("liujunTopic") // Bean 名称 private MessageChannel liujunMessageChannel; @GetMapping("/message/sendLiujun") public Boolean sendLiujun(@RequestParam String message) { System.out.println("1. msg received through the httpClient..."); liujunMessageChannel.send(MessageBuilder.withPayload(message).build()); return true; } ```
两种方式消费,指定所用的Input通道名字。
```java @Component public class LiujunMessageConsumerBean { @Autowired private MessageSource messageSource; // @StreamListener("liujunRevc") // public void onMessage(String message){ // System.out.println("5. targetOutput Listener get msg: " + message); // } @ServiceActivator(inputChannel = "liujunRevc") public void onMessage(Object message) { System.out.println("6. targetOutput another Listener get msg: " + message); } } ```
```java //模拟的mq @Component public class MsgHolder { public Message msg; public MsgListener msgListener; public Message getMsg() { return msg; } //设置消息时(收到)后,又把消息给接收者(消费) public void setMsg(Message msg) { this.msg = msg; System.out.println("3. mock-mq get msg and tell the mqClientlistener..."); msgListener.OnMsg(msg); msg=null; } ... } ```
## 3.4 运行结果
浏览器输入:[http://192.168.1.6:8080/message/sendLiujun?message=herriman目]
```verilog 2020-01-08 23:01:29.683 INFO 12984 --- [6SvEgzY8IDiA-61] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@5d4b0dbd: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0 1. msg received through the httpClient... 2. Input subscriber get the msg and send to mock-mq:GenericMessage [payload=herriman目, headers={id=67fb81dc-ed14-5850-f2d1-499b25d055c7, timestamp=1578495697875}] 3. mock-mq get msg and tell the mqClientlistener... 4. mqClient listener got the msg from mock-mq,then sent to targetOutput! 6. targetOutput another Listener get msg: herriman目 ```
按照设计,正确的输出了每一步的日志。通过本次学习,兼容多种产品学到了一招,在springcloud bus中也是,一个应用产生了event,又自己的容器监听到再发给对方,对方收到后,产生一个event,再由自己的监听处理。即兼容了本地的机制,又兼顾了远程传输。
另外就是如何把你要的类,比如本例中的channel实例都加载到容器中,binder实例如何加载到容器中。而关联功能又是一个类负责,职责明确。另外还有两个类在正确的容器时机,利用关联类进行绑定与解除的处理。所有设计功能,职责,时机都很有讲究。
相关推荐
Spring Cloud Stream 可以与Spring Cloud Sleuth 结合,提供分布式追踪功能,这对于理解大规模分布式系统的数据流非常有用。 9. **监控和度量** 通过集成Spring Actuator,可以对消息流进行监控和度量,包括消息...
1. 绑定(Binding):这是SpringCloud Stream的核心概念,它定义了应用程序如何与消息代理交互。每个绑定可以是输入(消费)或输出(生产),并且可以连接到通道(Channel)。 2. 通道(Channel):通道是消息传递...
【Spring Cloud微服务架构笔记(四)】 在微服务架构中,Spring Cloud Stream是一个关键组件,它为企业级开发提供了一种高效、灵活的消息处理机制。本文将深入探讨Spring Cloud Stream的功能、核心概念以及如何在...
在使用Spring Cloud Stream与RabbitMQ集成时,你需要以下配置: 1. **添加依赖**: 在你的`pom.xml`或`build.gradle`文件中添加Spring Cloud Stream和RabbitMQ的依赖。 2. **配置Binder**: 在`application.yml`或`...
SpringCloudStreamApplication应用程序通过inputs或者outputs来与SpringCloudStream中binder交互,通过我们配置来binding,而SpringCloudStream的binder负责与中间件交互。所以,我们只需要搞清楚如何与Spring
这些设置可能包括连接信息、交换机(exchange)、队列(queue)以及绑定(binding)定义,这些都是Spring Cloud Stream与RabbitMQ交互的关键元素。 接着,Spring Cloud Stream 使用绑定接口(Binder Interfaces)来...
通过这个名为"spring-cloud-stream-step-by-step-master"的项目,你将全面了解如何使用Kotlin构建Spring Cloud Stream应用,从创建源、处理器到配置Binder,以及如何进行测试和部署。这个逐步指南将帮助你掌握消息...
本教程将深入探讨如何在SpringCloud项目中使用RabbitMQ确保消息的可靠性投递。 首先,了解RabbitMQ的基本概念是必要的。RabbitMQ包含生产者(Producer)、消费者(Consumer)、交换机(Exchange)、队列(Queue)和...
接下来,我们将详细讨论如何在SpringCloud项目中配置和使用RabbitMQ。 首先,你需要在本地或者服务器上安装RabbitMQ。可以从官方网站下载对应操作系统的安装包,按照指南完成安装。安装完成后,确保RabbitMQ服务...
本学习笔记将深入探讨Spring MVC中的数据绑定特性。 1. **数据绑定的基本概念** 数据绑定是Spring MVC中的一种核心功能,它允许我们将HTTP请求参数、JSON或XML数据自动映射到Java对象的属性上。这减少了手动提取...
新手学习微服务SpringCloud项目架构搭建方法 微服务架构是当前软件架构开发的热门趋势之一,而Spring Cloud正是微服务架构的优秀实现者。微服务架构的主要特点是将大型应用程序拆分成多个小型独立的服务,每个服务...
Spring Cloud Stream提供了一种声明式的方法来定义输入和输出绑定,使得应用可以轻松地与消息代理(如RabbitMQ)进行交互。以下是一些关键知识点: 1. **Spring Cloud Stream概念**: - **Binder**: Binder是...
例如,通过Spring Cloud Stream,开发者可以定义消息生产者和消费者,配置相应的Binding,以实现消息的发送和接收。 总之,SpringCloud分布式消息处理是微服务架构中不可或缺的一部分,它通过MQ中间件帮助服务之间...
【WPF Binding 学习笔记】 WPF (Windows Presentation Foundation) 的数据绑定是其核心特性之一,它使得UI层和业务逻辑层之间的数据交互变得简单而直观。在WPF中,数据绑定允许UI控件(目标)动态地显示或响应数据...
这个源码示例可能包含了创建和使用ICommand的基本示例,包括定义自定义命令、设置数据上下文、绑定命令到UI元素以及实现INotifyPropertyChanged接口以通知数据变化。通过查看和运行这个示例,你可以更好地理解WPF中...
**RabbitMQ学习笔记与软件插件详解** RabbitMQ是一种广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议,提供可靠的消息传递服务。在分布式系统中,RabbitMQ扮演着数据交换中心的角色,...
《林信良的JDK6学习笔记源代码》是一份珍贵的学习资源,它包含了林信良在其著作《JDK6学习笔记》中所使用的全部源代码。这份源代码集可以帮助读者深入理解书中讲解的Java编程概念和技术,尤其对于正在学习JDK6版本的...
**Spring MVC 学习笔记** Spring MVC 是 Spring 框架的一个模块,专门用于构建 Web 应用程序。它提供了一种模型-视图-控制器(Model-View-Controller)架构,帮助开发者处理请求、控制应用程序流程,并实现业务逻辑...
在Windows Presentation Foundation (WPF) 中,数据绑定是一种强大的机制,它允许UI元素与应用程序的数据模型进行联动。在这个“WPF的binding代码实例”中,我们将深入探讨几个基础的绑定用法,帮助开发者更好地理解...