- 浏览: 495994 次
- 性别:
- 来自: 广州
文章分类
- 全部博客 (502)
- Java (70)
- Linux (10)
- 数据库 (38)
- 网络 (10)
- WEB (13)
- JSP (4)
- 互联网 (71)
- JavaScript (30)
- Spring MVC (19)
- HTML (13)
- CSS (3)
- AngularJS (18)
- Redis (5)
- Bootstrap CSS (1)
- ZooKeeper (4)
- kafka (6)
- 服务器缓存 (4)
- Storm (1)
- MongoDB (9)
- Spring boot (16)
- log4j (2)
- maven (3)
- nginx (5)
- Tomcat (2)
- Eclipse (4)
- Swagger (2)
- Netty (5)
- Dubbo (1)
- Docker (7)
- Hadoop (12)
- OAuth (1)
- webSocket (4)
- 服务器性能 (7)
- Session共享 (1)
- tieye修改 (1)
- 工作 (1)
- 有用的语录 (0)
- https (2)
- common (5)
- 产品开发管理 (1)
- CDN 工作原理 (1)
- APNS、GCM (1)
- 架构图 (3)
- 功能实现分析 (1)
- JMX (1)
- 服务器相关操作命令 (1)
- img02 (0)
- 服务器环境搭建 (9)
- goodMenuBook (1)
- CEInstantPot (0)
- 有用数据 (1)
- 百度地图WEB API (2)
- 正则表达式 (1)
- 样式例子 (2)
- staticRecipePressureCooker.zip (1)
- jCanvas (1)
- 网站攻击方法原理 (1)
- 架构设计 (3)
- 物联网相关 (3)
- 研发管理 (7)
- 技术需求点 (1)
- 计划 (1)
- spring cloud (11)
- 服务器开发的一些实用工具和方法 (1)
- 每天学到的技术点 (4)
- Guava (1)
- ERP 技术注意要点 (2)
- 微信小程序 (1)
- FineRepor (1)
- 收藏夹 (1)
- temp (5)
- 服务架构 (4)
- 任职资格方案 (0)
- osno_test (1)
- jquery相关 (3)
- mybatis (4)
- ueditor (1)
- VueJS (7)
- python (10)
- Spring EL (1)
- shiro (1)
- 前端开发原理与使用 (7)
- YARN (1)
- Spark (1)
- Hbase (2)
- Pig (2)
- 机器学习 (30)
- matplotlib (1)
- OpenCV (17)
- Hystrix (1)
- 公司 (1)
- miniui (4)
- 前端功能实现 (3)
- 前端插件 (1)
- 钉钉开发 (2)
- Jenkins (1)
- elasticSearch使用 (2)
- 技术规范 (4)
- 技术实现原理 (0)
最新评论
spring-integration-kafka简单应用
pom.xml
applicationContext.xml
spring-kafka-consumer.xml
spring-kafka-producer.xml
注意:这里有一个是Spring的消费者程序(spring-kafka-consumer.xml、KafkaConsumerService.java),有一个是原生应用的消费者程序(KafkaConsumer.java)
原生应用例子:
参考原文(原生应用例子):https://my.oschina.net/cloudcoder/blog/299215
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.3.RELEASE</version> </parent> <groupId>com.sunney</groupId> <artifactId>kafka-demo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>kafka-demo</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId> org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> </dependency> </dependencies> <build> <finalName>kafak-demo</finalName> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> </resource> </resources> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>1.3.3.RELEASE</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> <configuration> <!-- do not enable it, this will creats a non standard jar and cause autoconfig to fail --> <executable>false</executable> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>2.6</version> <configuration> <delimiters> <delimiter>@</delimiter> </delimiters> </configuration> </plugin> </plugins> </build> </project>
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd" default-autowire="byName"> <!-- <context:annotation-config /> <context:component-scan base-package="com.*" /> --> <!-- 导入Spring配置文件 --> <!-- <import resource="spring-kafka-consumer.xml" /> --> <import resource="spring-kafka-producer.xml" /> </beans>
spring-kafka-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- channel配置 --> <int:channel id="inputFromKafka"> <int:dispatcher task-executor="kafkaMessageExecutor" /> </int:channel> <!-- zookeeper配置 可以配置多个 --> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="114.55.72.173:2181" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="10" /> <!-- channel配置 auto-startup="true" 否则接收不发数据 --> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="inputFromKafka"> <int:poller fixed-delay="1" time-unit="MILLISECONDS" /> </int-kafka:inbound-channel-adapter> <task:executor id="kafkaMessageExecutor" pool-size="4" keep-alive="120" queue-capacity="10000" /> <!-- 序列化配置 --> <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" /> <!-- consumer参数配置 --> <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="auto.offset.reset">smallest</prop><!-- 如果zookeeper上没有offset合理的初始值情况下获取第一条消息开始的策略smallest\largeset --> <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- socket的接受缓存空间大小 --> <prop key="fetch.message.max.bytes">5242880</prop><!-- 针对一个partition的fetch request所能拉取的最大消息字节数,必须大于等于Kafka运行的最大消息 --> <prop key="auto.commit.interval.ms">1000</prop><!-- 是否自动周期性提交已经拉取到消费端的消息offset --> </props> </property> </bean> <!-- 消息接收的BEEN --> <bean id="kafkaConsumerService" class="com.SpringKafka.service.impl.KafkaConsumerService" /> <!-- 指定接收的方法 --> <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumerService" method="processMessage" /> <!-- producer列表配置 --> <int-kafka:consumer-context id="consumerContext" consumer-timeout="4000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="group-id-xing" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder" max-messages="5000"> <!-- 两个TOPIC配置 --> <int-kafka:topic id="test-yongxing" streams="4" /> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> </beans>
spring-kafka-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- commons config --> <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" /> <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> <constructor-arg value="java.lang.String" /> </bean> <!-- producer参数配置 --> <bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="topic.metadata.refresh.interval.ms">3600000</prop><!-- metadata刷新间隔时间,如果负值则失败的时候才会刷新,如果0则每次发送后都刷新,正值则是一种周期行为 --> <prop key="message.send.max.retries">5</prop> <!-- 发送失败的情况下,重试发送的次数 --> <prop key="serializer.class">kafka.serializer.StringEncoder</prop><!-- 消息序列化类实现方式 --> <prop key="request.required.acks">1</prop><!-- 参与消息确认的broker数量控制,0代表不需要任何确认 1代表需要leader replica确认 -1代表需要ISR中所有进行确认 --> </props> </property> </bean> <!-- channel配置 --> <int:channel id="messageChannel"> <int:queue /> </int:channel> <!-- outbound-channel-adapter配置 --> <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapterTopicTest" kafka-producer-context-ref="producerContextTopicTest" auto-startup="true" channel="messageChannel" order="3"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="1" task-executor="taskExecutor" /> </int-kafka:outbound-channel-adapter> <!-- 线程池配置 --> <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500" /> <!-- pool-size:线程池活跃的线程数,keep-alive:线程没有任务执行时最多保持多久时间会终止(s),queue-capacity:任务队列的最大容量 --> <!-- producer列表配置 --> <int-kafka:producer-context id="producerContextTopicTest" producer-properties="producerProperties"> <int-kafka:producer-configurations> <!-- 多个topic配置 --> <int-kafka:producer-configuration broker-list="114.55.72.173:9092" key-serializer="stringSerializer" value-class-type="java.lang.String" value-serializer="stringSerializer" topic="test-yongxing" /> </int-kafka:producer-configurations> <!-- <int-kafka:producer-configuration broker-list="114.55.72.173:9092" kafka集群服务器列表 key-serializer="stringSerializer" key序列化方式 value-class-type="java.lang.String" value数据类型 value-serializer="stringSerializer" key序列化方式 topic="test-xing" /> 主题名字 </int-kafka:producer-configurations> --> </int-kafka:producer-context> </beans>
package com.SpringKafka; import java.util.HashSet; import java.util.Set; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableScheduling; import com.SpringKafka.service.KafkaConsumer; import com.SpringKafka.service.KafkaSendMessageService; @SpringBootApplication @EnableScheduling @ComponentScan @EnableAutoConfiguration @ServletComponentScan public class ApplicationMain { private static String topic = "test-yongxing"; public static ApplicationContext applicationContext; public static void main(String[] args) throws Exception { SpringApplication app = new SpringApplication(ApplicationMain.class); app.setWebEnvironment(false); Set<Object> set = new HashSet<Object>(); set.add("classpath:applicationContext.xml"); app.setSources(set); applicationContext = app.run(args); new KafkaConsumer(topic).start();// 消费者 send(); } public static void send() { KafkaSendMessageService kafkaSendMessageService = ApplicationMain.applicationContext .getBean("kafkaSendMessageService", KafkaSendMessageService.class); if (kafkaSendMessageService == null) { System.out.println("kafkaSendMessageService == null"); } String topic = "test-yongxing"; String message = "yongxing i == "; for (int i = 0; i < 10; i++) { kafkaSendMessageService.sendMessageToKafka("test-yongxing", message + i); System.out.println("sendMessageToKafka i == " + i); } } }
package com.SpringKafka.service; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; private ConsumerConnector consumer; public KafkaConsumer(String topic) { super(); this.topic = topic; } @Override public void run() { consumer = createConsumer(); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); // 一次从主题中获取一个数据 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while (iterator.hasNext()) { String message = new String(iterator.next().message()); System.out.println("接收到: " + message); } } private ConsumerConnector createConsumer() { Properties properties = new Properties(); properties.put("zookeeper.connect", "114.55.72.173:2181");// 声明zk // 必须要使用别的组名称,如果生产者和消费者都在同一组,则不能访问同一组内的topic数据 properties.put("group.id", "group_xing"); // ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 // properties.put("zookeeper.session.timeout.ms", "4000"); // zk follower落后于zk leader的最长时间 // properties.put("zookeeper.sync.time.ms", "200"); // 往zookeeper上写offset的频率 properties.put("auto.commit.interval.ms", "1000"); // 消费最老消息为smallest,最新为largest properties.put("auto.offset.reset", "smallest"); // 序列化类 // properties.put("serializer.class", "kafka.serializer.StringEncoder"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public void dropDown() { if (consumer != null) { consumer.shutdown(); } } }
package com.SpringKafka.service.impl; import javax.annotation.Resource; import org.springframework.integration.kafka.support.KafkaHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Service; import com.SpringKafka.service.KafkaSendMessageService; @Service("kafkaSendMessageService") public class KafkaSendMessageServiceImpl implements KafkaSendMessageService{ @Resource(name = "messageChannel") MessageChannel messageChannel; public void sendMessageToKafka(String topic, String message) { messageChannel.send(MessageBuilder.withPayload(message) .setHeader(KafkaHeaders.TOPIC,topic) .build()); } }
package com.SpringKafka.service.impl; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.kafka.support.KafkaHeaders; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Service; import com.SpringKafka.service.KafkaService; @Service("kafkaService") public class KafkaServiceImpl implements KafkaService{ @Resource(name = "messageChannel") MessageChannel messageChannel; public void sendUserInfo(String topic, Object obj) { messageChannel.send(MessageBuilder.withPayload(obj) .setHeader(KafkaHeaders.TOPIC,topic) .build()); } }
package com.SpringKafka.service; public interface KafkaSendMessageService { public void sendMessageToKafka(String topic, String message); } package com.SpringKafka.service.impl; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.SpringKafka.service.UserDto; import com.alibaba.fastjson.JSON; /** * 类KafkaConsumerService.java的实现描述:消费接收类 */ public class KafkaConsumerService { static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class); public void processMessage(Map<String, Map<Integer, String>> msgs) { logger.info("===============================================processMessage==============="); for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) { logger.info("============Topic:" + entry.getKey()); LinkedHashMap<Integer, String> messages = (LinkedHashMap<Integer, String>) entry.getValue(); Set<Integer> keys = messages.keySet(); for (Integer i : keys) logger.info("======Partition:" + i); Collection<String> values = messages.values(); for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) { String message = "["+iterator.next()+"]"; logger.info("=====message:" + message); // List<UserDto> userList = JSON.parseArray(message, UserDto.class); logger.info("=====userList.size:" + message); } } } }
注意:这里有一个是Spring的消费者程序(spring-kafka-consumer.xml、KafkaConsumerService.java),有一个是原生应用的消费者程序(KafkaConsumer.java)
原生应用例子:
package com; import java.util.ArrayList; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerRecords extends Thread { private static String topic; public KafkaConsumerRecords(String topic) { super(); this.topic = topic; } @Override public void run() { KafkaConsumer<String, String> kafkaConsumer = kafkaConsumer(); ArrayList<String> topics = new ArrayList<>(); topics.add(this.topic); kafkaConsumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); kafkaConsumer.commitAsync(); // 使用commitAsync则是非阻塞方式,会在成功提交或者失败时,触发OffsetCommitCallback回调函数的执行。 for (ConsumerRecord<String, String> record : records) { String json = record.value(); System.out.println("json == " + json); } } } public KafkaConsumer<String, String> kafkaConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "114.55.72.173:9092"); props.put("group.id", "cebigdata_engin_dev"); props.put("enable.auto.commit", false); props.put("auto.commit.interval.ms", 1000); props.put("auto.offset.reset", "latest"); props.put("session.timeout.ms", 30000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("heartbeat.interval.ms", "12000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); return consumer; } }
XMl <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
参考原文(原生应用例子):https://my.oschina.net/cloudcoder/blog/299215
发表评论
-
选举算法
2022-06-17 08:48 427选举算法 常用的选举 ... -
elasticSearch使用
2022-04-27 08:42 416ElasticSearch 基于Apache Lucene构建 ... -
IDEA 快捷键
2022-03-02 16:55 245大小写转换快捷键 ctr+shift+u IDEA ... -
zookeeper dubbo 安装
2021-12-04 19:27 315docker-machine ssh default d ... -
将博客搬至CSDN
2021-11-18 19:57 189将博客搬至CSDN -
docker mysql 主从安装
2021-11-10 16:55 235docker run -d -p 13306:3306 --n ... -
rocketmq安装部署.txt
2021-11-07 19:10 218docker search rocketmq docke ... -
百度人脸识别
2021-05-21 16:11 363package com.gaojinsoft.htwy.y20 ... -
springBoot tomcat配置参数说明
2021-05-12 09:13 3018#最大连接数 server.tomcat.max-connec ... -
技术选型
2021-01-29 17:34 2921.移动端组件vux,vant,vant好点,文档好的,基于v ... -
方便开发调试和问题跟踪
2021-01-01 10:17 2481.外网最好可以连接数据库 2.关键信息可以在接口返回信息, ... -
Jenkins脚本
2020-03-12 17:55 443#!/bin/bash -ilx echo "开始 ... -
base64与file 相互转换
2019-10-23 18:19 775base64与file 相互转换 import org. ... -
钉钉开发
2019-09-17 20:16 433钉钉开发 开发者帐号 1357047443 x***310* ... -
安卓模拟器使用
2019-07-03 23:13 4逍遥pc版的安卓模拟器 http://www.xyaz.cn/ ... -
ZLTest
2019-03-19 23:41 264ZLTest -
要同步回来的文件
2019-01-25 11:14 0Spring Boot中整合Sharding-JDBC m ... -
画相关图表的工具
2019-01-25 10:59 580制作流程图的工具 1、Visio很好用,很强大,微软出的,水平 ... -
JVM 监控工具
2019-01-21 18:04 381JVM 监控工具 //========== ... -
Hystrix
2019-01-10 17:02 533Hystrix Hystrix的设计原则包括: 资源隔离 ...
相关推荐
**Spring Integration**是Spring家族中的一个重要成员,它提供了构建企业集成应用所需的多种工具和技术。该框架基于消息传递模型,利用事件驱动架构(EDA)来处理业务流程中的数据交换。 **Spring Integration**的...
Spring Boot是一个简化Spring应用开发的框架,它使得配置过程更加简单,并提供了自动配置功能。 首先,我们来看第一个集成示例——`kafkaTest1`。在这个项目中,Spring Boot通过其内置的自动配置能力与Kafka进行...
- **消息处理**:Spring Integration提供了一套完整的消息处理框架,支持多种消息中间件,如RabbitMQ、Kafka等。 五、学习资源与进阶 要深入了解Spring 5.2.3.RELEASE,可以参考官方文档、教程和示例代码。此外,...
13. **事件驱动和消息传递**:通过 Spring Integration 和 AMQP 支持消息队列(如 RabbitMQ、Kafka),实现异步通信和解耦。 以上只是部分核心概念,Spring Boot 还包括许多其他功能和特性,如邮件服务、缓存支持、...
Spring Kafka 提供了生产者模板(ProducerTemplate)和配置可注入的 ProducerFactory,使得创建和管理 Kafka 生产者变得简单。 - **Consumers**: 消费者从 Kafka 主题中拉取消息。Spring Kafka 支持基于 Spring ...
Spring Cloud Stream通过`@EnableBinding`注解使得与Kafka的集成变得简单。这个注解定义了StreamBinder,它负责处理与Kafka的连接和配置。例如,我们可以创建一个接口,声明输入和输出通道,然后Spring Cloud ...
- **快速启动**:通过简单的步骤展示如何创建并运行一个Spring Kafka应用程序。 ##### 2.1.2 发送消息 - **KafkaTemplate**:提供了发送消息到Kafka Broker的高级抽象。通过这个模板,可以轻松地将消息发送到特定...
40. Spring Integration 41. Spring Session 42. Monitoring and Management over JMX 43. Testing 43.1. Test Scope Dependencies 43.2. Testing Spring Applications 43.3. Testing Spring Boot Applications 43.3...
10. **与其他技术的集成**:Spring与众多其他技术,如Spring Boot、Spring Cloud、Apache Kafka、Quartz Scheduler等有着良好的集成,可以方便地构建复杂的企业级应用。 在“spring-framework-4.3.9.RELEASE”目录...
SpringBoot 基于 Spring 平台,但通过内嵌的 Tomcat 或 Jetty 服务器、自动配置和起步依赖项,使得创建独立的、生产级别的基于 Spring 的应用程序变得异常简单。这个名为 "springboot-integration-examples" 的开源...
Spring Integration 是...总之,Spring Integration 是一种强大的工具,可以帮助开发者构建松散耦合、可扩展的应用程序,实现系统间的高效集成。通过深入研究提供的示例代码,你将能够更好地掌握其工作原理和应用场景。
在5.0.3.RELEASE中,Spring Integration加强了对HTTP、FTP、JMS等协议的支持,同时增加了对WebSocket和Kafka等新兴技术的集成,使得应用程序能够更方便地与各种外部系统进行交互。 压缩包中的`integration`目录包含...
will learn how to integrate RabbitMQ and Apache Kafka message brokers with your Spring Cloud application to enable asynchronous one-to-one and publish/subscribe communication styles. Chapter 12, ...
Spring Boot对Apache Kafka的支持使得在Java应用中集成消息队列变得简单。通过Spring的KafkaTemplate和Consumer/Producer监听器,我们可以方便地发送和接收消息,实现异步通信和解耦。 总的来说,"spring-boot-...
Java在企业级应用开发领域有着广泛的应用,尤其在微服务场景下,得益于其强大的跨平台能力、丰富的库支持和成熟的Spring框架,使得Java成为构建微服务的理想选择。 项目名为“mnp-integration-microservice-master...
本文将深入探讨如何使用Apache Kafka和Java 8来创建一个名为"integration-gadget"的工具,该工具能有效地整合多个源的数据,并将其呈现于一个统一的仪表板上。 首先,Apache Kafka是一款开源的流处理平台,由...
Java Spring Integration 是一个强大的框架,它扩展了Spring编程模型,以实现企业级应用程序之间的无缝集成。这个框架的主要目标是简化微服务、系统和服务之间的通信,遵循Enterprise Integration Patterns (EIP)这...
14. **事件驱动和消息总线**:Spring Boot支持Spring Integration和RabbitMQ、Kafka等消息中间件,实现异步处理和解耦。 通过阅读《Pro Spring Boot》并结合源码,你可以全面掌握Spring Boot的各个方面,从基础到...