`
huangyongxing310
  • 浏览: 490223 次
  • 性别: Icon_minigender_1
  • 来自: 广州
文章分类
社区版块
存档分类
最新评论

spring-integration-kafka简单应用

阅读更多
spring-integration-kafka简单应用


pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.3.3.RELEASE</version>
	</parent>
	<groupId>com.sunney</groupId>
	<artifactId>kafka-demo</artifactId>
	<packaging>jar</packaging>
	<version>1.0-SNAPSHOT</version>
	<name>kafka-demo</name>
	<url>http://maven.apache.org</url>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-kafka</artifactId>
			<version>1.3.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId> org.apache.avro</groupId>
			<artifactId>avro</artifactId>
			<version>1.7.7</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.7</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.8.2.0</version>
		</dependency>
	</dependencies>
	<build>
		<finalName>kafak-demo</finalName>
		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<filtering>true</filtering>
			</resource>
		</resources>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.3</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<version>1.3.3.RELEASE</version>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
				<configuration>
					<!-- do not enable it, this will creats a non standard jar and cause
						autoconfig to fail -->
					<executable>false</executable>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-resources-plugin</artifactId>
				<version>2.6</version>
				<configuration>
					<delimiters>
						<delimiter>@</delimiter>
					</delimiters>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>



applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:mvc="http://www.springframework.org/schema/mvc"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
						http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
						http://www.springframework.org/schema/context
						http://www.springframework.org/schema/context/spring-context-4.2.xsd
						http://www.springframework.org/schema/aop
						http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
						http://www.springframework.org/schema/tx
						http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	 					http://www.springframework.org/schema/mvc
                        http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd"
	default-autowire="byName">

	<!-- <context:annotation-config />
	<context:component-scan base-package="com.*" /> -->

	<!-- 导入Spring配置文件 -->
	<!-- <import resource="spring-kafka-consumer.xml" /> -->
 	<import resource="spring-kafka-producer.xml" />

</beans>



spring-kafka-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
                        http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
                        http://www.springframework.org/schema/integration
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/task
                        http://www.springframework.org/schema/task/spring-task.xsd">

	<!-- channel配置 -->
	<int:channel id="inputFromKafka">
		<int:dispatcher task-executor="kafkaMessageExecutor" />
	</int:channel>

	<!-- zookeeper配置 可以配置多个 -->
	<int-kafka:zookeeper-connect id="zookeeperConnect"
		zk-connect="114.55.72.173:2181" zk-connection-timeout="6000"
		zk-session-timeout="6000" zk-sync-time="10" />

	<!-- channel配置 auto-startup="true" 否则接收不发数据 -->
	<int-kafka:inbound-channel-adapter
		id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
		auto-startup="true" channel="inputFromKafka">
		<int:poller fixed-delay="1" time-unit="MILLISECONDS" />
	</int-kafka:inbound-channel-adapter>

	<task:executor id="kafkaMessageExecutor" pool-size="4"
		keep-alive="120" queue-capacity="10000" />

	<!-- 序列化配置 -->
	<bean id="kafkaDecoder"
		class="org.springframework.integration.kafka.serializer.common.StringDecoder" />

	<!-- consumer参数配置 -->
	<bean id="consumerProperties"
		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="properties">
			<props>
				<prop key="auto.offset.reset">smallest</prop><!-- 如果zookeeper上没有offset合理的初始值情况下获取第一条消息开始的策略smallest\largeset -->
				<prop key="socket.receive.buffer.bytes">10485760</prop> <!-- socket的接受缓存空间大小 -->
				<prop key="fetch.message.max.bytes">5242880</prop><!-- 针对一个partition的fetch request所能拉取的最大消息字节数,必须大于等于Kafka运行的最大消息 -->
				<prop key="auto.commit.interval.ms">1000</prop><!-- 是否自动周期性提交已经拉取到消费端的消息offset -->
			</props>
		</property>
	</bean>

	<!-- 消息接收的BEEN -->
	<bean id="kafkaConsumerService" class="com.SpringKafka.service.impl.KafkaConsumerService" />
	<!-- 指定接收的方法 -->
	<int:outbound-channel-adapter channel="inputFromKafka"
		ref="kafkaConsumerService" method="processMessage" />

	<!-- producer列表配置 -->
	<int-kafka:consumer-context id="consumerContext"
		consumer-timeout="4000" zookeeper-connect="zookeeperConnect"
		consumer-properties="consumerProperties">
		<int-kafka:consumer-configurations>
			<int-kafka:consumer-configuration
				group-id="group-id-xing" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
				max-messages="5000">
				<!-- 两个TOPIC配置 -->
				<int-kafka:topic id="test-yongxing" streams="4" />
			</int-kafka:consumer-configuration>
		</int-kafka:consumer-configurations>
	</int-kafka:consumer-context>
</beans>


spring-kafka-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

	<!-- commons config -->
	<bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
	<bean id="kafkaEncoder"
		class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
		<constructor-arg value="java.lang.String" />
	</bean>

	<!-- producer参数配置 -->
	<bean id="producerProperties"
		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="properties">
			<props>
				<prop key="topic.metadata.refresh.interval.ms">3600000</prop><!-- metadata刷新间隔时间,如果负值则失败的时候才会刷新,如果0则每次发送后都刷新,正值则是一种周期行为 -->
				<prop key="message.send.max.retries">5</prop>  <!-- 发送失败的情况下,重试发送的次数 -->
				<prop key="serializer.class">kafka.serializer.StringEncoder</prop><!-- 消息序列化类实现方式 -->
				<prop key="request.required.acks">1</prop><!-- 参与消息确认的broker数量控制,0代表不需要任何确认 1代表需要leader
					replica确认 -1代表需要ISR中所有进行确认 -->
			</props>
		</property>
	</bean>

	<!-- channel配置 -->
	<int:channel id="messageChannel">
		<int:queue />
	</int:channel>

	<!-- outbound-channel-adapter配置 -->
	<int-kafka:outbound-channel-adapter
		id="kafkaOutboundChannelAdapterTopicTest" kafka-producer-context-ref="producerContextTopicTest"
		auto-startup="true" channel="messageChannel" order="3">
		<int:poller fixed-delay="1000" time-unit="MILLISECONDS"
			receive-timeout="1" task-executor="taskExecutor" />
	</int-kafka:outbound-channel-adapter>

	<!-- 线程池配置 -->
	<task:executor id="taskExecutor" pool-size="5"
		keep-alive="120" queue-capacity="500" />
	<!-- pool-size:线程池活跃的线程数,keep-alive:线程没有任务执行时最多保持多久时间会终止(s),queue-capacity:任务队列的最大容量 -->

	<!-- producer列表配置 -->
	<int-kafka:producer-context id="producerContextTopicTest"
		producer-properties="producerProperties">
		<int-kafka:producer-configurations>
			<!-- 多个topic配置 -->
			<int-kafka:producer-configuration
				broker-list="114.55.72.173:9092" key-serializer="stringSerializer"
				value-class-type="java.lang.String" value-serializer="stringSerializer"
				topic="test-yongxing" />
		</int-kafka:producer-configurations>
		<!-- <int-kafka:producer-configuration broker-list="114.55.72.173:9092"
			kafka集群服务器列表 key-serializer="stringSerializer" key序列化方式 value-class-type="java.lang.String"
			value数据类型 value-serializer="stringSerializer" key序列化方式 topic="test-xing"
			/> 主题名字 </int-kafka:producer-configurations> -->
	</int-kafka:producer-context>
</beans>



package com.SpringKafka;

import java.util.HashSet;
import java.util.Set;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.SpringKafka.service.KafkaConsumer;
import com.SpringKafka.service.KafkaSendMessageService;

@SpringBootApplication
@EnableScheduling
@ComponentScan
@EnableAutoConfiguration
@ServletComponentScan
public class ApplicationMain {
	private static String topic = "test-yongxing";
	public static ApplicationContext applicationContext;

	public static void main(String[] args) throws Exception {
		SpringApplication app = new SpringApplication(ApplicationMain.class);
		app.setWebEnvironment(false);
		Set<Object> set = new HashSet<Object>();
		set.add("classpath:applicationContext.xml");
		app.setSources(set);
		applicationContext = app.run(args);

		new KafkaConsumer(topic).start();// 消费者
		send();

	}

	public static void send() {
		KafkaSendMessageService kafkaSendMessageService = ApplicationMain.applicationContext
				.getBean("kafkaSendMessageService", KafkaSendMessageService.class);
		if (kafkaSendMessageService == null) {
			System.out.println("kafkaSendMessageService == null");

		}
		String topic = "test-yongxing";
		String message = "yongxing i == ";

		for (int i = 0; i < 10; i++) {
			kafkaSendMessageService.sendMessageToKafka("test-yongxing", message + i);
			System.out.println("sendMessageToKafka i == " + i);
		}
	}
}



package com.SpringKafka.service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer extends Thread {

	private String topic;
	private ConsumerConnector consumer;

	public KafkaConsumer(String topic) {
		super();
		this.topic = topic;
	}

	@Override
	public void run() {
		consumer = createConsumer();
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
		Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
		KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
		ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
		while (iterator.hasNext()) {
			String message = new String(iterator.next().message());
			System.out.println("接收到: " + message);
		}
	}

	private ConsumerConnector createConsumer() {
		Properties properties = new Properties();
		properties.put("zookeeper.connect", "114.55.72.173:2181");// 声明zk
		// 必须要使用别的组名称,如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
		properties.put("group.id", "group_xing");

		// ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
		// properties.put("zookeeper.session.timeout.ms", "4000");

		// zk follower落后于zk leader的最长时间
		// properties.put("zookeeper.sync.time.ms", "200");

		// 往zookeeper上写offset的频率
		properties.put("auto.commit.interval.ms", "1000");

		// 消费最老消息为smallest,最新为largest
		properties.put("auto.offset.reset", "smallest");

		// 序列化类
		// properties.put("serializer.class", "kafka.serializer.StringEncoder");

		return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
	}

	public void dropDown() {
		if (consumer != null) {
			consumer.shutdown();
		}
	}

}


package com.SpringKafka.service.impl;

import javax.annotation.Resource;

import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

import com.SpringKafka.service.KafkaSendMessageService;

@Service("kafkaSendMessageService")
public class KafkaSendMessageServiceImpl  implements KafkaSendMessageService{

    @Resource(name = "messageChannel")
    MessageChannel messageChannel;

    public void sendMessageToKafka(String topic, String message) {
    	messageChannel.send(MessageBuilder.withPayload(message)
                                    .setHeader(KafkaHeaders.TOPIC,topic)
                                    .build());
    }

}



package com.SpringKafka.service.impl;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

import com.SpringKafka.service.KafkaService;

@Service("kafkaService")
public class KafkaServiceImpl  implements KafkaService{
    @Resource(name = "messageChannel")
    MessageChannel messageChannel;

    public void sendUserInfo(String topic, Object obj) {
    	messageChannel.send(MessageBuilder.withPayload(obj)
                                    .setHeader(KafkaHeaders.TOPIC,topic)
                                    .build());
    }

}



package com.SpringKafka.service;

public interface KafkaSendMessageService {
    public void sendMessageToKafka(String topic, String message);
}


package com.SpringKafka.service.impl;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.SpringKafka.service.UserDto;
import com.alibaba.fastjson.JSON;

/**
 * 类KafkaConsumerService.java的实现描述:消费接收类
 */
public class KafkaConsumerService {

    static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);

    public void processMessage(Map<String, Map<Integer, String>> msgs) {
        logger.info("===============================================processMessage===============");
        for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
            logger.info("============Topic:" + entry.getKey());
            LinkedHashMap<Integer, String> messages = (LinkedHashMap<Integer, String>) entry.getValue();
            Set<Integer> keys = messages.keySet();
            for (Integer i : keys)
                logger.info("======Partition:" + i);
            Collection<String> values = messages.values();
            for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) {
                String message = "["+iterator.next()+"]";
                logger.info("=====message:" + message);
//                List<UserDto> userList = JSON.parseArray(message, UserDto.class);
                logger.info("=====userList.size:" + message);

            }

        }
    }
}



注意:这里有一个是Spring的消费者程序(spring-kafka-consumer.xml、KafkaConsumerService.java),有一个是原生应用的消费者程序(KafkaConsumer.java)



原生应用例子:
package com;

import java.util.ArrayList;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerRecords extends Thread {

	private static String topic;

	public KafkaConsumerRecords(String topic) {
		super();
		this.topic = topic;
	}

	@Override
	public void run() {
		KafkaConsumer<String, String> kafkaConsumer = kafkaConsumer();
		ArrayList<String> topics = new ArrayList<>();
		topics.add(this.topic);
		kafkaConsumer.subscribe(topics);

		while (true) {
			ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
			kafkaConsumer.commitAsync(); // 使用commitAsync则是非阻塞方式,会在成功提交或者失败时,触发OffsetCommitCallback回调函数的执行。

			for (ConsumerRecord<String, String> record : records) {
				String json = record.value();
				System.out.println("json == " + json);
			}
		}
	}

	public KafkaConsumer<String, String> kafkaConsumer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "114.55.72.173:9092");
		props.put("group.id", "cebigdata_engin_dev");
		props.put("enable.auto.commit", false);
		props.put("auto.commit.interval.ms", 1000);
		props.put("auto.offset.reset", "latest");
		props.put("session.timeout.ms", 30000);
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("heartbeat.interval.ms", "12000");

		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		return consumer;
	}

}


XMl
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>1.1.1.RELEASE</version>
</dependency>


参考原文(原生应用例子):https://my.oschina.net/cloudcoder/blog/299215
分享到:
评论

相关推荐

    spring-integration

    **Spring Integration**是Spring家族中的一个重要成员,它提供了构建企业集成应用所需的多种工具和技术。该框架基于消息传递模型,利用事件驱动架构(EDA)来处理业务流程中的数据交换。 **Spring Integration**的...

    spring boot集成kafka

    Spring Boot是一个简化Spring应用开发的框架,它使得配置过程更加简单,并提供了自动配置功能。 首先,我们来看第一个集成示例——`kafkaTest1`。在这个项目中,Spring Boot通过其内置的自动配置能力与Kafka进行...

    spring-5.2.3.RELEASE-dist.zip

    - **消息处理**:Spring Integration提供了一套完整的消息处理框架,支持多种消息中间件,如RabbitMQ、Kafka等。 五、学习资源与进阶 要深入了解Spring 5.2.3.RELEASE,可以参考官方文档、教程和示例代码。此外,...

    spring-boot-reference-zh

    13. **事件驱动和消息传递**:通过 Spring Integration 和 AMQP 支持消息队列(如 RabbitMQ、Kafka),实现异步通信和解耦。 以上只是部分核心概念,Spring Boot 还包括许多其他功能和特性,如邮件服务、缓存支持、...

    spring-kafka

    Spring Kafka 提供了生产者模板(ProducerTemplate)和配置可注入的 ProducerFactory,使得创建和管理 Kafka 生产者变得简单。 - **Consumers**: 消费者从 Kafka 主题中拉取消息。Spring Kafka 支持基于 Spring ...

    kafka-spring-cloud-stream:Apache Kafka的Spring Cloud Stream展示

    Spring Cloud Stream通过`@EnableBinding`注解使得与Kafka的集成变得简单。这个注解定义了StreamBinder,它负责处理与Kafka的连接和配置。例如,我们可以创建一个接口,声明输入和输出通道,然后Spring Cloud ...

    Spring for Apache Kafka.pdf

    - **快速启动**:通过简单的步骤展示如何创建并运行一个Spring Kafka应用程序。 ##### 2.1.2 发送消息 - **KafkaTemplate**:提供了发送消息到Kafka Broker的高级抽象。通过这个模板,可以轻松地将消息发送到特定...

    spring-boot-reference.pdf

    40. Spring Integration 41. Spring Session 42. Monitoring and Management over JMX 43. Testing 43.1. Test Scope Dependencies 43.2. Testing Spring Applications 43.3. Testing Spring Boot Applications 43.3...

    spring4.3.9相关jar包文件

    10. **与其他技术的集成**:Spring与众多其他技术,如Spring Boot、Spring Cloud、Apache Kafka、Quartz Scheduler等有着良好的集成,可以方便地构建复杂的企业级应用。 在“spring-framework-4.3.9.RELEASE”目录...

    springboot-integration-examples,springboot与其他常用技术的集成.zip

    SpringBoot 基于 Spring 平台,但通过内嵌的 Tomcat 或 Jetty 服务器、自动配置和起步依赖项,使得创建独立的、生产级别的基于 Spring 的应用程序变得异常简单。这个名为 "springboot-integration-examples" 的开源...

    spring integration master

    Spring Integration 是...总之,Spring Integration 是一种强大的工具,可以帮助开发者构建松散耦合、可扩展的应用程序,实现系统间的高效集成。通过深入研究提供的示例代码,你将能够更好地掌握其工作原理和应用场景。

    Spring-5.0.3.RELEASE完整版

    在5.0.3.RELEASE中,Spring Integration加强了对HTTP、FTP、JMS等协议的支持,同时增加了对WebSocket和Kafka等新兴技术的集成,使得应用程序能够更方便地与各种外部系统进行交互。 压缩包中的`integration`目录包含...

    mastering-spring-cloud2018

    will learn how to integrate RabbitMQ and Apache Kafka message brokers with your Spring Cloud application to enable asynchronous one-to-one and publish/subscribe communication styles. Chapter 12, ...

    spring-boot-examples:Spring引导示例

    Spring Boot对Apache Kafka的支持使得在Java应用中集成消息队列变得简单。通过Spring的KafkaTemplate和Consumer/Producer监听器,我们可以方便地发送和接收消息,实现异步通信和解耦。 总的来说,"spring-boot-...

    mnp-integration-microservice

    Java在企业级应用开发领域有着广泛的应用,尤其在微服务场景下,得益于其强大的跨平台能力、丰富的库支持和成熟的Spring框架,使得Java成为构建微服务的理想选择。 项目名为“mnp-integration-microservice-master...

    integration-gadget:使用Kafka和Java 8将多个源数据集成到一个工具中

    本文将深入探讨如何使用Apache Kafka和Java 8来创建一个名为"integration-gadget"的工具,该工具能有效地整合多个源的数据,并将其呈现于一个统一的仪表板上。 首先,Apache Kafka是一款开源的流处理平台,由...

    Java_Spring Integration提供了Spring编程模型的扩展,以支持众所周知的企业集成模式EIP.zip

    Java Spring Integration 是一个强大的框架,它扩展了Spring编程模型,以实现企业级应用程序之间的无缝集成。这个框架的主要目标是简化微服务、系统和服务之间的通信,遵循Enterprise Integration Patterns (EIP)这...

    Pro Spring Boot(Apress,2016)对应源码

    14. **事件驱动和消息总线**:Spring Boot支持Spring Integration和RabbitMQ、Kafka等消息中间件,实现异步处理和解耦。 通过阅读《Pro Spring Boot》并结合源码,你可以全面掌握Spring Boot的各个方面,从基础到...

Global site tag (gtag.js) - Google Analytics