Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案:
kafka消息平台使用spring+kafka的集成方案,详情如下:
1. 使用最高版本2.1.0.RELEASE集成jar包:spring-integration-kafka
2. Zookeeper、Kafka分布式集群使用init.properties配置化方案。
kafka.servers=127.0.0.1:9092 kafka.topic=xxxooo
3. 使用消息生产者spring-context-producer配置化方案。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 定义producer的参数 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092" /> <entry key="group.id" value="2" /> <entry key="retries" value="10" /> <entry key="batch.size" value="16384" /> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 创建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <constructor-arg name="autoFlush" value="true" /> <property name="defaultTopic" value="test" /> </bean> </beans>
4. 使用消息消费者spring-context-producer配置化方案。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 定义consumer的参数 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="localhost:9092" /> <entry key="group.id" value="0" /> <entry key="enable.auto.commit" value="true" /> <entry key="auto.commit.interval.ms" value="1000" /> <entry key="session.timeout.ms" value="15000" /> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> <!-- 创建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties" /> </constructor-arg> </bean> <!-- 实际执行消息消费的类 --> <bean id="messageListernerConsumerService" class="com.sml.sz.kafka.KafKaConsumer" /> <!-- 消费者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test" /> <property name="messageListener" ref="messageListernerConsumerService" /> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory" /> <constructor-arg ref="containerProperties" /> </bean> </beans>
5. 使用注解方式注入消息类型
@Autowired
private KafkaTemplate<xxx, ooo> kafkaTemplate;
6. 重写MessageListener 的getMessage方法获取消息(业务实现)
7. RestFul服务方式测试消息服务
@CrossOrigin(origins = "*", maxAge = 3600, methods = { RequestMethod.GET, RequestMethod.POST, RequestMethod.DELETE, RequestMethod.PUT }) @RestController @RequestMapping(value = "/rest/kafka") public class KafKaProducer { @RequestMapping(value = "/send", method = RequestMethod.GET) public JSONObject save() { System.out.println("+++++++++++++++++++++++++++++++"); kafkaTemplate.sendDefault("HongHu KAFKA分布式消息服务测试"); return null; } @Autowired private KafkaTemplate<Integer, String> kafkaTemplate; }
@RestController public class KafKaConsumer implements MessageListener<Integer, String> { @Autowired private LogService logService; public void onMessage(ConsumerRecord<Integer, String> records) { System.out.println("====================" + records); Object o = records.value(); Log log = new Log(); log.setIsNewRecord(true); log.setId(IdGen.uuid()); log.setTitle(String.valueOf(o)); logService.save(log); } }接受消息了------------------:ConsumerRecord(topic = xxxooo, partition = 0, offset = 2489, CreateTime = 1479647648299, checksum = 3372898135, serialized key size = -1, serialized value size = 40, key = null, value = HongHu KAFKA分布式消息服务测试)
到此结束!
核心架构:Spring Cloud、Spring Boot、Mybatis、Redis、Rabbit MQ、微服务、分布式、电子商务
核心思想:产品微服务、模块化、原子化、持续集成、分布式、集群部署
开发模式:代码生成工具、驱动式开发模式、提高开发效率
(企业架构源码可以加求球:三五三六二四七二五九)
相关推荐
### 分布式消息中间件在支付系统中的应用 #### 一、削峰填谷与应用间解耦 在支付系统的设计与实现过程中,一个重要的环节是处理高峰期的高并发请求,以及不同应用间的解耦。这不仅关系到用户体验的好坏,还直接...
### 基于RabbitMQ消息队列的分布式事务解决方案 #### 一、RabbitMQ简介与核心概念 RabbitMQ是一款高效的分布式消息中间件,它基于Erlang语言开发,具备语言级别的高并发处理能力。RabbitMQ支持消息持久化、高可用...
分布式消息中间件Corgi的应用架构演进是一个关键的话题,特别是在现代互联网系统中,它扮演着数据同步、异步处理和解耦系统的角色。本文将深入探讨Corgi的演变历程,从早期的设计到后来的优化,以及为什么选择了...
《分布式事务JDTX与数据库中间件集成方案》 在现代互联网应用中,处理大量并发请求和海量数据时,分布式事务成为必不可少的技术手段。JDTX(京东数科分布式事务)是针对这一需求提出的解决方案,它与Apache ...
RocketMQ 是阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着关键角色,用于解耦应用、异步处理以及提高系统的响应速度和吞吐量。本压缩包中的资源可能是一个逐步学习 RocketMQ 的教程,从基础...
本文主要探讨的是基于分布式RFID中间件在制造车间数据采集中的应用研究,由孙庆义、郭宇和谢欣平三位作者在南京航空航天大学机电学院进行。该研究旨在充分利用RFID(射频识别)技术的优势,高效、安全地收集制造车间...
Sharding-Sphere-分布式数据库中间件Ecosphere文档概述Sharding-Sphere是一个开源的分布式数据库中间件解决方案套件,由Sharding-JDBC,Sharding-Sphere-分布式数据库中间件Ecosphere文档概述Sharding-Sphere是一个...
综上所述,该文献讨论了一种针对分布式视景仿真中遇到的问题的解决方案,即开发了一个中间件VPDR-C,它通过集成Vega Prime和CORBA技术,改善了分布式仿真程序的开发、调试和部署过程。这不仅为利用Vega Prime进行...
华为FusionStage中间件是华为公司推出的一款全面集成的中间件服务产品,它包括了分布式缓存中间件、分布式消息中间件和分布式数据库中间件等多种技术组件。通过这些组件,FusionStage旨在为用户提供高性能、高可靠性...
为了解决上述问题,研究者提出了一种通用的因果消息序时间管理分布式中间件。这种中间件允许分布式仿真应用通过一个中间件层来实现消息的因果序时间管理,而无需改变原有的调用接口。该中间件的设计旨在提供符合HLA...
### ICE分布式中间件开发VS分布式开发之ACE #### ICE分布式中间件概述 ICE(Internet Communications Engine)是一款由ZeroC公司提供的高性能、轻量级的中间件解决方案,它主要用于构建分布式应用程序和服务。ICE...
机载分布式中间件技术是航空电子系统发展的重要方向,它通过在航空器上部署分布式的中间件,来满足现代航空电子系统对于实时监控、智能诊断、远程软件加载、统一任务调度以及改善通信性能的需求。机载分布式中间件...
总结,RabbitMQ作为分布式消息队列的优秀解决方案,不仅提供了强大的消息传递能力,还支持高可用部署和各种工作模式,能够满足不同复杂场景下的需求。通过深入理解和实战应用,开发者可以充分利用RabbitMQ提升系统...
消息中间件(MOM)是一种特殊类型的中间件,它通过高效、可靠的消息传递机制实现跨平台的数据交流,特别适合于分布式系统集成。消息中间件支持同步和异步通信模式,其中异步模式具有更好的容错性和可扩展性。异步...
在此基础上,了解如何结合这两种策略以及Web Services技术来构建一个既能满足区域内部需求,又能实现区域间数据共享的异构数据库集成方案。 Web Services是一种基于XML的分布式计算技术,它使用标准协议如SOAP、...
为了解决分布式工作流系统异构集成的难点,本文提出了一种消息中间件的解决方案。消息中间件(MOM)是一种软件组件,它使用高效可靠的消息传递机制实现跨平台数据交流,并基于消息通信来集成分布式系统。它提供消息...
Seta作为分布式事务解决方案,解决了跨服务的事务一致性问题。Zookeeper和Redis则被用来实现分布式锁,确保在并发场景下的数据安全。Nacos再次出现,这次作为分布式配置中心,集中管理所有服务的配置。另外,通过...
本文将针对STK在分布式仿真系统集成中的应用问题,探讨基于STK/Connect中间件和STKX组件技术的两种仿真模式,并对它们的工作原理、实现方法以及优缺点进行深入分析。 STK最初主要应用于卫星轨道分析,但随着技术的...
综上所述,这篇论文详细研究了分布式异构数据库集成系统的构建方法,包括数据交换架构的设计、中间件的应用以及历史数据处理策略。它提供了有效的解决方案,对于应对当前大数据时代中异构数据库的挑战具有重要的理论...
2. 交互消息协议多样:分布式系统涉及多种不同的消息协议和消息中间件,给测试工作带来了额外的复杂性。 3. 缺乏统一有效的测试手段:由于分布式系统的特殊性,常规的测试方法往往难以直接应用。 4. 单元测试和集成...