<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="${kafka.zookeeper.connect}" zk-connection-timeout="6000" zk-session-timeout="400" zk-sync-time="200" /> <bean id="zookeeperConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration"> <constructor-arg name="zookeeperConnect" ref="zookeeperConnect" /> </bean> <bean id="defaultConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory"> <constructor-arg name="configuration" ref="zookeeperConfiguration" /> </bean> <bean id="kafkaTemplate" class="org.springframework.integration.kafka.core.KafkaTemplate"> <constructor-arg name="connectionFactory" ref="defaultConnectionFactory" /> </bean>
package com.vti.test; import java.util.Collection; import java.util.Collections; import java.util.List; import javax.annotation.Resource; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.integration.kafka.core.FetchRequest; import org.springframework.integration.kafka.core.KafkaMessage; import org.springframework.integration.kafka.core.KafkaMessageBatch; import org.springframework.integration.kafka.core.KafkaTemplate; import org.springframework.integration.kafka.core.Partition; import org.springframework.integration.kafka.core.Result; import org.springframework.integration.kafka.serializer.common.StringDecoder; import org.springframework.integration.kafka.util.MessageUtils; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:beans.xml" }) public class KafkaTemplateTest { @Resource private KafkaTemplate kafkaTemplate; @Test public void receive() { String topic = "test"; long offset = 0; int max = 1000; Collection<Partition> partitions = kafkaTemplate.getConnectionFactory().getPartitions(topic); while (partitions.iterator().hasNext()) { Partition partition = partitions.iterator().next(); Result<KafkaMessageBatch> receive = kafkaTemplate .receive(Collections.singleton(new FetchRequest(partitions.iterator().next(), offset, max))); List<KafkaMessage> messages = receive.getResult(partition).getMessages(); if (messages.size() > 0) { for (KafkaMessage kafkaMessage : messages) { String msg = MessageUtils.decodePayload(kafkaMessage, new StringDecoder()); offset = kafkaMessage.getMetadata().getNextOffset(); System.out.println(msg); } } else { try { Thread.sleep(3*1000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } } } }
相关推荐
在本篇中,我们将主要探讨如何使用`@Aspect`注解在Spring Boot中创建切面。 首先,让我们理解什么是切面。在AOP中,切面是关注点的模块化,这些关注点是跨越多个对象的,例如,事务管理就是一种关注点,它涉及到多...
### Spring Boot 使用 Kafka 详解 #### 一、前言 随着大数据时代的发展,实时数据处理的需求日益增加。Kafka作为一款高性能的分布式流处理平台,因其高吞吐量、低延迟等特点,在数据处理领域占据了一席之地。而...
在 SpringBoot 应用中集成 Kafka,我们需要添加相关依赖,配置 Kafka 的连接信息,然后使用 Spring 提供的 `KafkaTemplate` 或者 `@KafkaListener` 注解进行生产和消费操作。 ### 4.1 生产者开发 ```java @...
在Spring Boot中,我们可以使用`@Component`注解创建一个bean,该bean继承自`AbstractMessageProducer`或者实现`KafkaTemplate`接口。这里我们展示一个简单的`KafkaProducer`示例: ```java import org.spring...
在`kafka-demo`项目中,生产者部分可能包含了创建KafkaTemplate的配置,以及使用`send()`方法发送消息到特定主题的代码。而消费者部分则会展示如何设置一个监听器,用`@KafkaListener`注解来消费这些消息。 **生产...
3. **创建Kafka生产者**:通过`@EnableKafka`注解启用Kafka支持,并创建一个生产者bean,使用`KafkaTemplate`来发送消息。 ```java @Bean public KafkaTemplate, String> kafkaTemplate() { return new ...
private KafkaTemplate, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); System.out.println("Sent message: " + message); } } ``` ...
创建一个`ProducerConfig`类,并使用`@Bean`注解声明KafkaTemplate实例: ```java @Configuration public class ProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String ...
3. 使用 KafkaTemplate 操作 Kafka:使用 KafkaTemplate 发送和消费消息。 4. 使用消息监听器:使用 @KafkaListener 注解监听 Kafka 主题,并处理消息。 SpringBoot 整合 Kafka 的优点: * 简化了消息传输和处理...
例如,可以使用`ListenableFuture`异步发送消息,或者使用`SeekToCurrentErrorHandler`来处理消费错误。 四、性能优化与最佳实践 1. **分区策略**:Kafka的分区机制允许并行处理,通过自定义分区器可以控制消息的...
生产者类通常会包含一个方法,该方法使用`KafkaTemplate`来发送消息。例如: ```java @Service public class KafkaProducerService { private final KafkaTemplate, String> kafkaTemplate; public ...
此外,还可以使用KafkaTemplate的其他方法,如`sendAndReceive`来进行同步发送和接收消息。 总的来说,Spring Boot与Kafka的集成使得构建基于消息的微服务系统变得简单而高效。通过以上步骤,你可以在本地环境中...
然后,我们可以定义一个bean,使用`KafkaTemplate`来发送消息。以下是一个简单的示例: ```java @Configuration @EnableKafka public class KafkaConfig { @Bean public ProducerFactory, String> ...
在SpringBoot应用中,可以使用`KafkaTemplate`创建消息生产者。下面是一个简单的示例: ```java @Autowired private KafkaTemplate, String> kafkaTemplate; public void sendMessage(String topic, String ...
然后,创建消息生产者类`MessageProducer`,使用`KafkaTemplate`发送消息到指定主题: ```java @Service public class MessageProducer { @Autowired private KafkaTemplate, String> kafkaTemplate; public ...
生产者可以使用`KafkaTemplate`,消费者则通过`MessageListenerContainer`监听消息: ```xml <property name="kafkaTemplate" ref="kafkaTemplate"/> ``` ### 总结 无论是注解方式还是XML...
SpringBoot还提供了KafkaTemplate,使得与Kafka的交互变得更加简洁。 7. **数据处理**:在批量消费数据后,通常需要对数据进行业务逻辑处理,这可能涉及到数据清洗、聚合、分析等操作。SpringBoot结合其他数据处理...
public KafkaTemplate, String> kafkaTemplate() { return new KafkaTemplate(producerFactory()); } @Bean public ProducerFactory, String> producerFactory() { Map, Object> config = new HashMap(); ...
通过`@Component`注解声明一个生产者bean,使用`KafkaTemplate`发送消息到指定的Topic: ```java import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @...