我本人在使用spring-kafka集成组件时,遇到了不少问题,感觉这个组件使用的方式比较多,有时候会有些纠结。spring-kafka组件设计时考虑的比较多的场景,比如兼容JMS设计范式、兼容springboot自动装配等等,所以使用时需要适度考虑自己的实际场景。如下示例的环境:
1、springboot:1.5.11
2、spring-kakfa:1.3.5
3、kafka broker:1.1.0
一、兼容性
参考:
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
https://projects.spring.io/spring-kafka/
kafka-spring:
1)spring 4.3.8+ (spring 4,springboot 1.5.x):
> spring-kafka:1.3.5+(只兼容1.3.x版本)
> kafka-client:0.11.0.2+、1.0.x
2)spring 5.x (spring 5,springboot 2.x)
> spring-kafka:2.0.x,兼容kafka-client 0.11.0.x、1.0.x
> spring-kafka:2.1.x+,兼容kafkaclient 1.x+
较低版本的spring与高版本的spring-kafka集成时会出现API不兼容问题,此外kakfa-client与kafka broker不兼容时也会出现运行时异常。所以在版本上尽量保持对应;此外,我们已知,在kafka 0.10.2之后,客户端和broker的兼容能力有一定的准则,即较高版本的kakfa client可以访问较低版本的broker。
二、代码适用场景
1、假定,项目中有多个kafka集群,所以我们需要考虑配置层面的便捷性。
2、假定,即使一个kafka集群,可能存在配置方式不同的多个producers或者consumers,所以我们还需要在此方面考虑易用程度。
3、部分配置参数,需要考虑多环境部署。
三、配置
1、pom.xml
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2、application.yml
sample: kafka: bootstrap-servers: 127.0.0.1:9092 ackMode: -1 filter-regex: test\..* max-poll-records: 12 group-id: test topic-pattern: metric_*
你的项目中可能会引入多个broker集群,我们约定每个集群一个单独的配置片段,比如上述例子为“sample”集群的配置,配置中包含consumer和producer的主要配置项。如果你还有“sample2”的集群,那么他们的配置片段应该以“sample2.kafka”作为前缀。
3、SampleConfiguration.java (spring配置管理类)
/** * Description * <p> * </p> * DATE 2018/5/20. * * @author liuguanqing. */ @Configuration @ConfigurationProperties(prefix = "sample.kafka") public class SampleConfiguration { //必填 private String bootstrapServers; //"0","1",或者"all"/"-1" private String ackMode = "1"; private int maxBlockMS = 30000; private int retries = 3; /** * 等待响应的超时时长,如果超出阈值,则会导致请求被重试,取决"retries"参数. * 此参数值必须大于sessionTimeoutMS */ private int requestTimeoutMS = 300000;//[0,500000] //单次最多允许poll的消息条数. //此值不建议过大,应该考虑你的业务处理效率. private int maxPollRecords = 32; //两次poll之间的时间隔间最大值,如果超过此值将会被认为此consumer失效,触发consumer重新平衡. //此值必须大于,一个batch所有消息处理时间总和. //最大于500000 private int maxPollIntervalMS = 120000;//2分钟 //会话过期时长,consumer通过ConsumerCoordinator间歇性发送心跳 //超期后,会被认为consumer失效,服务迁移到其他consumer节点.(group) //需要注意,Coordinator与kafkaConsumer共享底层通道,也是基于poll获取协调事件,但是会在单独的线程中 private int sessionTimeoutMS = 60000; //我们建议同一个project中,使用同一个broker集群的消费者,使用相同的groupId private String groupId; public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public void setAckMode(String ackMode) { this.ackMode = ackMode; } public void setMaxBlockMS(int maxBlockMS) { this.maxBlockMS = maxBlockMS; } public void setRetries(int retries) { this.retries = retries; } public void setRequestTimeoutMS(int requestTimeoutMS) { this.requestTimeoutMS = requestTimeoutMS; } public void setMaxPollRecords(int maxPollRecords) { this.maxPollRecords = maxPollRecords; } public void setMaxPollIntervalMS(int maxPollIntervalMS) { this.maxPollIntervalMS = maxPollIntervalMS; } public void setSessionTimeoutMS(int sessionTimeoutMS) { this.sessionTimeoutMS = sessionTimeoutMS; } public void setGroupId(String groupId) { this.groupId = groupId; } ////////////生产者部分////////// public Map<String, Object> producerConfiguration() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, this.ackMode); //批量发送字节数,可以声明为来自yml配置 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//字节数 props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);////可以声明为来自yml配置 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);////可以声明为来自yml配置 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //org.springframework.kafka.support.serializer.JsonSerializer; props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, this.maxBlockMS); props.put(ProducerConfig.RETRIES_CONFIG, this.retries); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, this.requestTimeoutMS); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 6 * 1024 * 1024);//6M props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4"); return props; } //EventRecord,是自己封装的一个POJO类,此处主要演示序列化的方式 public ProducerFactory<String, EventRecord> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfiguration()); } /** * export,bean的名称默认与方法名一致,所以其他Service引用时需要注意 * @return */ @Bean public KafkaTemplate<String, EventRecord> sampleKafkaTemplate() { return new KafkaTemplate<String, EventRecord>(producerFactory()); } ////////////////消费者部分////////////// /** * export,bean可以被kafkaListener引用. * @return */ @Bean public ConcurrentKafkaListenerContainerFactory<String, EventRecord> sampleListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, EventRecord> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(sampleConsumerFactory()); factory.setConcurrency(3); factory.setBatchListener(true);//开启批量消费 factory.getContainerProperties().setPollTimeout(3000); //开启手动ACK. factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); return factory; } public ConsumerFactory<String, EventRecord> sampleConsumerFactory() { return new DefaultKafkaConsumerFactory<>(sampleConsumerConfiguration(),new StringDeserializer(),new JsonDeserializer<>(EventRecord.class)); } public Map<String, Object> sampleConsumerConfiguration() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); //强烈建议关闭自动确认,我们使用手动ACK模式,Spring Kafka基于JMS语义为我们设计好了兼容实现. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,this.maxPollIntervalMS); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,this.maxPollRecords);//单次poll允许获取的最多条数. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,this.sessionTimeoutMS); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,this.requestTimeoutMS); props.put(ConsumerConfig.GROUP_ID_CONFIG,this.groupId); return props; } }
同上,每个broker集群,对应一个Configuration类,此类将会负责初始化KafkaTemplate和ConsumerContainer,分别服务于此集群的多个Producers和listeners。我们约定,不同的Configuration类创建的Bean名称(kafkaTemplate、listenerContainerFactory)应该不同。
上述的部分配置参数,仅供参考,请根据自己的业务实际情况适度调优。
1)kafkaTemplate:按照上述配置样例配置即可,主要关注acks和reties相关参数,其他没有太多细节。
2)listenerContainerFactory:建议关闭自动提交,而是使用spring-kakfa提供的“手动提交”模式,开启batchListener即listener可以一次获取多条record。
四、服务
1、Producer样例
@Component public class SampleService { @Autowired private KafkaTemplate<String,EventRecord> sampleKafkaTemplate; private String topic;//常量,或者来自profile等 public void send() { ProducerRecord<String, EventRecord> producerRecord = new ProducerRecord<String, EventRecord>( this.topic, 0, key, record); this.sampleKafkaTemplate.send(producerRecord); } }
2、listener样例
@Component public class SampleMessageListener { /** * topic的参数可以来自常量或者profile或者env等 * 其中topics、topicPattern均可以使用表达式,来自propertySource或者Env。 * @param consumerRecords * @param acknowledgment */ @KafkaListener(topicPattern = "${sample.kafka.topic-pattern}",containerFactory = "sampleListenerContainerFactory") public void onMessage(List<ConsumerRecord<String, EventRecord>> consumerRecords,Acknowledgment acknowledgment) { try { for (ConsumerRecord<String,EventRecord> record : consumerRecords) { //process(record.value()); } acknowledgment.acknowledge(); } catch (Exception e) { //处理异常,不建议ACK } } }
因为我们使用了手动确认,所以listener要在合适的时机调用acknowledge()方法。
五、消费者过滤器
Consumer可以使用Filter过滤不需要的消息,这些消息将不会被传递给listener,我们需要在创建Consumer之前指定Filter,Filter将会有spring-kafka组件在poll消息列表之后逐个验证,对于Filter返回“true”的消息将会被丢弃,“false”的将会被传递给listener;特别注意,当我们使用Filter之后,consumer必须使用“手动确认”模式,即:
1)设定kafka参数:ENABLE_AUTO_COMMIT = false
2)设定容器参数为“手动确认”:
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
3)将“过滤抛弃的消息自动确认”:
factory.setAckDiscarded(true);
4)在listener中感知Acknowledgement,并在消息执行完毕后ACK。
acknowledgment.acknowledge();
@Bean public ConcurrentKafkaListenerContainerFactory<String, EventRecord> sampleListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, EventRecord> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(sampleConsumerFactory()); factory.setConcurrency(3); factory.setBatchListener(true); factory.setRecordFilterStrategy(new RecordFilterStrategy<String, EventRecord>() { private AviatorRegexMatcher regexFilter = new AviatorRegexMatcher(filterRegex); @Override public boolean filter(ConsumerRecord<String, EventRecord> consumerRecord) { EventHeader header = consumerRecord.value().getHeader(); String source = header.getSchemaName() + "." + header.getTableName(); return !regexFilter.match(source); } }); factory.setAckDiscarded(true);//对于不符合filter的消息,是否自动丢弃 + ack factory.getContainerProperties().setPollTimeout(3000); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); return factory; }
六、问题小结
1、兼容性错误:
java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)
可能是你基于springMVC 4或者springboot 1框架,但是引用了高版本的spring-kafka组件导致。spring 4、springboot 1,只能兼容spring-kakfa 1.3.x版本。
2、反序列化问题
Caused by: java.lang.IllegalAccessException: Class org.apache.kafka.common.utils.Utils can not access a member of class org.springframework.kafka.support.serializer.JsonDeserializer with modifiers "protected" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) ~[na:1.8.0_121] at java.lang.Class.newInstance(Class.java:436) ~[na:1.8.0_121] at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:314) ~[kafka-clients-0.10.1.1.jar:na] ... 26 common frames omitted
上述例子中,producer可以直接指定JsonSerializer而忽略类型;但是消费者的配置中,则需要使用构造方法来传递JsonDeserializer,因为反序列化无法预判类型。
public ConsumerFactory<String, EventRecord> sampleConsumerFactory() { return new DefaultKafkaConsumerFactory<>(sampleConsumerConfiguration(),new StringDeserializer(),new JsonDeserializer<>(EventRecord.class)); }
相关推荐
spring-kafka是Spring框架对Apache Kafka消息系统进行整合的一个项目,它简化...通过阅读这份官方文档,开发者能够更好地理解和掌握如何在Spring项目中集成和使用Kafka,以及如何解决在使用过程中可能遇到的各种问题。
spring-boot-starter-kafka示例程序\n 支持springcloud1.5.4,kafka0.8.2.x\n 项目地址:https://github.com/zhyea/spring-boot-starter-kafka
本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:Kafka是一种分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、低延迟的消息传递,...
标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...
在本项目"springboot-kafka-master.zip"中,我们将探讨如何使用Spring Boot框架与Apache Kafka进行集成,以构建高效、可扩展的消息传递系统。Spring Boot简化了Spring的应用配置,而Kafka作为一个分布式流处理平台,...
在`springboot-kafka`目录下,可能包含了一个示例项目,你可以通过导入IDE进行编译和运行,以了解Spring Boot与Kafka集成的实践操作。 通过以上步骤,你已经掌握了如何在Spring Boot 2.7.3中集成Apache Kafka的基本...
本教程将详细介绍如何在SpringBoot项目中集成Kafka,构建高效的消息队列系统。 **一、SpringBoot与Kafka简介** 1. **SpringBoot**: SpringBoot是Spring框架的扩展,它简化了Spring应用的初始搭建以及开发过程,...
1. **Spring for Apache Kafka**: Spring提供了Spring for Apache Kafka模块,使得在Spring应用中使用Kafka变得更加简单。这包括自动配置、模板方法、监听器容器以及生产者和消费者的抽象。 2. **配置**: 在Spring ...
Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...
spring-kafka-2.2.0.RELEASE.jar是用于springboot+kafka,用springboot框架向kafka推送或消费数据所需maven包
Spring Kafka是Spring框架的一部分,专为集成Apache Kafka而设计,提供了一套轻量级且强大的API,使得在Java应用中使用Kafka变得更加简单。本文将围绕Spring Kafka的源代码进行深度解析,帮助开发者更好地理解和运用...
springboot-kafka-连接-debezium-ksqldb 该项目的目标是与 , 和 。 为此,我们有: research-service ,用于在中插入/更新/删除记录; Source Connectors ,用于监视MySQL中记录的更改并将与这些更改相关的消息推送...
1. 简洁易用:Spring for Apache Kafka提供了一个简洁、易用的API,方便开发者使用Apache Kafka的功能。 2. 高性能:该项目的实现提高了消息处理的效率和可靠性。 3. 灵活性:该项目提供了灵活的消息处理机制,适应...
Spring for Apache Kafka提供了与Spring Boot整合的便利,通过引入`spring-kafka`和`spring-boot-starter`依赖,我们可以轻松地在Spring Boot应用中使用Kafka。添加以下依赖: ```xml <groupId>org.spring...
本文将深入探讨Spring Kafka的核心概念、主要功能以及实际应用,旨在帮助读者理解和掌握如何在Java项目中有效地使用Spring Kafka。 一、Spring Kafka概述 Spring Kafka作为一个轻量级的库,它允许开发者以声明式的...
Spring-kafka-protobuf运行java -jar target/spring-kafka-protobuf-0.0.1-SNAPSHOT.jar curl -v 'http://localhost:8080/rest/n/spring/kafka/protobuf/demo' 查看并检查弹簧日志
Spring Kafka 是 Spring 框架为集成 Apache Kafka 提供的一个模块,它使得在 Java 应用程序中使用 Kafka 变得简单且直观。本项目 "spring-kafka-demo" 提供了一个完整的示例,涵盖了生产者和消费者的配置与实现,...
springboot集成kafka消息队列代码,可在上面进行二次开发。亲测有效,可以通过注释进行消息的接口
赠送jar包:flink-connector-kafka-0.10_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-0.10_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-0.10_2.11-1.10.0-sources.jar; 赠送Maven...