`
__SuRa丶Rain
  • 浏览: 44742 次
  • 性别: Icon_minigender_1
  • 来自: 火星
社区版块
存档分类
最新评论

KafkaTemplate的使用

阅读更多
	<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="${kafka.zookeeper.connect}" zk-connection-timeout="6000" zk-session-timeout="400" zk-sync-time="200" />	
	
	<bean id="zookeeperConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
		<constructor-arg name="zookeeperConnect" ref="zookeeperConnect" />
	</bean>
	
	<bean id="defaultConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
		<constructor-arg name="configuration" ref="zookeeperConfiguration" />
	</bean>

	<bean id="kafkaTemplate" class="org.springframework.integration.kafka.core.KafkaTemplate">
		<constructor-arg name="connectionFactory" ref="defaultConnectionFactory" />
	</bean>

 

package com.vti.test;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.integration.kafka.core.FetchRequest;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.KafkaMessageBatch;
import org.springframework.integration.kafka.core.KafkaTemplate;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.kafka.serializer.common.StringDecoder;
import org.springframework.integration.kafka.util.MessageUtils;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:beans.xml" })
public class KafkaTemplateTest {

	@Resource
	private KafkaTemplate kafkaTemplate;

	@Test
	public void receive() {

		String topic = "test";
		long offset = 0;
		int max = 1000;

		Collection<Partition> partitions = kafkaTemplate.getConnectionFactory().getPartitions(topic);

		while (partitions.iterator().hasNext()) {

			Partition partition = partitions.iterator().next();

			Result<KafkaMessageBatch> receive = kafkaTemplate
					.receive(Collections.singleton(new FetchRequest(partitions.iterator().next(), offset, max)));

			List<KafkaMessage> messages = receive.getResult(partition).getMessages();

			if (messages.size() > 0) {

				for (KafkaMessage kafkaMessage : messages) {

					String msg = MessageUtils.decodePayload(kafkaMessage, new StringDecoder());

					offset = kafkaMessage.getMetadata().getNextOffset();

					System.out.println(msg);
				}

			} else {
				try {
					Thread.sleep(3*1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				continue;
			}
		}
	}
}

 

分享到:
评论

相关推荐

    springboot使用Aspect切面1

    在本篇中,我们将主要探讨如何使用`@Aspect`注解在Spring Boot中创建切面。 首先,让我们理解什么是切面。在AOP中,切面是关注点的模块化,这些关注点是跨越多个对象的,例如,事务管理就是一种关注点,它涉及到多...

    SpringBoot使用Kafka详解含完整代码

    ### Spring Boot 使用 Kafka 详解 #### 一、前言 随着大数据时代的发展,实时数据处理的需求日益增加。Kafka作为一款高性能的分布式流处理平台,因其高吞吐量、低延迟等特点,在数据处理领域占据了一席之地。而...

    kafka使用文档.docx

    在 SpringBoot 应用中集成 Kafka,我们需要添加相关依赖,配置 Kafka 的连接信息,然后使用 Spring 提供的 `KafkaTemplate` 或者 `@KafkaListener` 注解进行生产和消费操作。 ### 4.1 生产者开发 ```java @...

    springboot集成kafka

    在Spring Boot中,我们可以使用`@Component`注解创建一个bean,该bean继承自`AbstractMessageProducer`或者实现`KafkaTemplate`接口。这里我们展示一个简单的`KafkaProducer`示例: ```java import org.spring...

    kafka-demo.zip

    在`kafka-demo`项目中,生产者部分可能包含了创建KafkaTemplate的配置,以及使用`send()`方法发送消息到特定主题的代码。而消费者部分则会展示如何设置一个监听器,用`@KafkaListener`注解来消费这些消息。 **生产...

    kafka_springboot_kafka_

    3. **创建Kafka生产者**:通过`@EnableKafka`注解启用Kafka支持,并创建一个生产者bean,使用`KafkaTemplate`来发送消息。 ```java @Bean public KafkaTemplate, String&gt; kafkaTemplate() { return new ...

    SpingBoot中使用Kafka的Demo

    private KafkaTemplate, String&gt; kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); System.out.println("Sent message: " + message); } } ``` ...

    spring-boot集成kafka

    创建一个`ProducerConfig`类,并使用`@Bean`注解声明KafkaTemplate实例: ```java @Configuration public class ProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String ...

    SpringBoot整合Kafka

    3. 使用 KafkaTemplate 操作 Kafka:使用 KafkaTemplate 发送和消费消息。 4. 使用消息监听器:使用 @KafkaListener 注解监听 Kafka 主题,并处理消息。 SpringBoot 整合 Kafka 的优点: * 简化了消息传输和处理...

    demokafkaboot.rar

    例如,可以使用`ListenableFuture`异步发送消息,或者使用`SeekToCurrentErrorHandler`来处理消费错误。 四、性能优化与最佳实践 1. **分区策略**:Kafka的分区机制允许并行处理,通过自定义分区器可以控制消息的...

    kafka监听样例.rar

    生产者类通常会包含一个方法,该方法使用`KafkaTemplate`来发送消息。例如: ```java @Service public class KafkaProducerService { private final KafkaTemplate, String&gt; kafkaTemplate; public ...

    springboot简单集成kafka(ue-kafka.zip)

    此外,还可以使用KafkaTemplate的其他方法,如`sendAndReceive`来进行同步发送和接收消息。 总的来说,Spring Boot与Kafka的集成使得构建基于消息的微服务系统变得简单而高效。通过以上步骤,你可以在本地环境中...

    springboot和kafka的集成

    然后,我们可以定义一个bean,使用`KafkaTemplate`来发送消息。以下是一个简单的示例: ```java @Configuration @EnableKafka public class KafkaConfig { @Bean public ProducerFactory, String&gt; ...

    SpringBoot整合kafka代码简洁(测试可用)

    在SpringBoot应用中,可以使用`KafkaTemplate`创建消息生产者。下面是一个简单的示例: ```java @Autowired private KafkaTemplate, String&gt; kafkaTemplate; public void sendMessage(String topic, String ...

    spring kafka demo (详解 可以跑通)

    然后,创建消息生产者类`MessageProducer`,使用`KafkaTemplate`发送消息到指定主题: ```java @Service public class MessageProducer { @Autowired private KafkaTemplate, String&gt; kafkaTemplate; public ...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    SpringBoot还提供了KafkaTemplate,使得与Kafka的交互变得更加简洁。 7. **数据处理**:在批量消费数据后,通常需要对数据进行业务逻辑处理,这可能涉及到数据清洗、聚合、分析等操作。SpringBoot结合其他数据处理...

    spring整合kafka注解和xml两种方式

    生产者可以使用`KafkaTemplate`,消费者则通过`MessageListenerContainer`监听消息: ```xml &lt;property name="kafkaTemplate" ref="kafkaTemplate"/&gt; ``` ### 总结 无论是注解方式还是XML...

    kafka操作详解.docx

    public KafkaTemplate, String&gt; kafkaTemplate() { return new KafkaTemplate(producerFactory()); } @Bean public ProducerFactory, String&gt; producerFactory() { Map, Object&gt; config = new HashMap(); ...

    springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    通过`@Component`注解声明一个生产者bean,使用`KafkaTemplate`发送消息到指定的Topic: ```java import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @...

Global site tag (gtag.js) - Google Analytics