`
QING____
  • 浏览: 2250534 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

spring-kafka在springboot项目中的使用样例

 
阅读更多

    我本人在使用spring-kafka集成组件时,遇到了不少问题,感觉这个组件使用的方式比较多,有时候会有些纠结。spring-kafka组件设计时考虑的比较多的场景,比如兼容JMS设计范式、兼容springboot自动装配等等,所以使用时需要适度考虑自己的实际场景。如下示例的环境:

    1、springboot:1.5.11 

    2、spring-kakfa:1.3.5

    3、kafka broker:1.1.0

 

一、兼容性

    参考:

    https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix

    https://projects.spring.io/spring-kafka/

    kafka-spring:

    1)spring 4.3.8+ (spring 4,springboot 1.5.x):

        > spring-kafka:1.3.5+(只兼容1.3.x版本)

        > kafka-client:0.11.0.2+、1.0.x

    2)spring 5.x (spring 5,springboot 2.x)

        > spring-kafka:2.0.x,兼容kafka-client 0.11.0.x、1.0.x

        > spring-kafka:2.1.x+,兼容kafkaclient 1.x+

 

    较低版本的spring与高版本的spring-kafka集成时会出现API不兼容问题,此外kakfa-client与kafka broker不兼容时也会出现运行时异常。所以在版本上尽量保持对应;此外,我们已知,在kafka 0.10.2之后,客户端和broker的兼容能力有一定的准则,即较高版本的kakfa client可以访问较低版本的broker。

 

二、代码适用场景

    1、假定,项目中有多个kafka集群,所以我们需要考虑配置层面的便捷性。

    2、假定,即使一个kafka集群,可能存在配置方式不同的多个producers或者consumers,所以我们还需要在此方面考虑易用程度。

    3、部分配置参数,需要考虑多环境部署。

 

三、配置

    1、pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.5.RELEASE</version>
</dependency>

 

    2、application.yml

 

sample:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    ackMode: -1
    filter-regex: test\..*
    max-poll-records: 12
    group-id: test
    topic-pattern: metric_*

 

    你的项目中可能会引入多个broker集群,我们约定每个集群一个单独的配置片段,比如上述例子为“sample”集群的配置,配置中包含consumer和producer的主要配置项。如果你还有“sample2”的集群,那么他们的配置片段应该以“sample2.kafka”作为前缀。

 

    3、SampleConfiguration.java (spring配置管理类)

 

/**
 * Description
 * <p>
 * </p>
 * DATE 2018/5/20.
 *
 * @author liuguanqing.
 */
@Configuration
@ConfigurationProperties(prefix = "sample.kafka")
public class SampleConfiguration {

    //必填
    private String bootstrapServers;

    //"0","1",或者"all"/"-1"
    private String ackMode = "1";

    private int maxBlockMS = 30000;

    private int retries = 3;

    /**
     * 等待响应的超时时长,如果超出阈值,则会导致请求被重试,取决"retries"参数.
     * 此参数值必须大于sessionTimeoutMS
     */
    private int requestTimeoutMS = 300000;//[0,500000]

    //单次最多允许poll的消息条数.
    //此值不建议过大,应该考虑你的业务处理效率.
    private int maxPollRecords = 32;

    //两次poll之间的时间隔间最大值,如果超过此值将会被认为此consumer失效,触发consumer重新平衡.
    //此值必须大于,一个batch所有消息处理时间总和.
    //最大于500000
    private int maxPollIntervalMS = 120000;//2分钟

    //会话过期时长,consumer通过ConsumerCoordinator间歇性发送心跳
    //超期后,会被认为consumer失效,服务迁移到其他consumer节点.(group)
    //需要注意,Coordinator与kafkaConsumer共享底层通道,也是基于poll获取协调事件,但是会在单独的线程中
    private int sessionTimeoutMS = 60000;

    //我们建议同一个project中,使用同一个broker集群的消费者,使用相同的groupId
    private String groupId;

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public void setAckMode(String ackMode) {
        this.ackMode = ackMode;
    }

    public void setMaxBlockMS(int maxBlockMS) {
        this.maxBlockMS = maxBlockMS;
    }

    public void setRetries(int retries) {
        this.retries = retries;
    }

    public void setRequestTimeoutMS(int requestTimeoutMS) {
        this.requestTimeoutMS = requestTimeoutMS;
    }

    public void setMaxPollRecords(int maxPollRecords) {
        this.maxPollRecords = maxPollRecords;
    }

    public void setMaxPollIntervalMS(int maxPollIntervalMS) {
        this.maxPollIntervalMS = maxPollIntervalMS;
    }

    public void setSessionTimeoutMS(int sessionTimeoutMS) {
        this.sessionTimeoutMS = sessionTimeoutMS;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    ////////////生产者部分//////////

    public Map<String, Object> producerConfiguration() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, this.ackMode);
        //批量发送字节数,可以声明为来自yml配置
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//字节数
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);////可以声明为来自yml配置
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);////可以声明为来自yml配置
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //org.springframework.kafka.support.serializer.JsonSerializer;
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, this.maxBlockMS);
        props.put(ProducerConfig.RETRIES_CONFIG, this.retries);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, this.requestTimeoutMS);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 6 * 1024 * 1024);//6M
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
        return props;
    }

    //EventRecord,是自己封装的一个POJO类,此处主要演示序列化的方式
    public ProducerFactory<String, EventRecord> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfiguration());
    }

    /**
     * export,bean的名称默认与方法名一致,所以其他Service引用时需要注意
     * @return
     */
    @Bean
    public KafkaTemplate<String, EventRecord> sampleKafkaTemplate() {
        return new KafkaTemplate<String, EventRecord>(producerFactory());
    }



    ////////////////消费者部分//////////////

    /**
     * export,bean可以被kafkaListener引用.
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, EventRecord> sampleListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EventRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(sampleConsumerFactory());
        factory.setConcurrency(3);
        factory.setBatchListener(true);//开启批量消费
        factory.getContainerProperties().setPollTimeout(3000);
        //开启手动ACK.
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        return factory;
    }

    public ConsumerFactory<String, EventRecord> sampleConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(sampleConsumerConfiguration(),new StringDeserializer(),new JsonDeserializer<>(EventRecord.class));
    }

    public Map<String, Object> sampleConsumerConfiguration() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        //强烈建议关闭自动确认,我们使用手动ACK模式,Spring Kafka基于JMS语义为我们设计好了兼容实现.
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,this.maxPollIntervalMS);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,this.maxPollRecords);//单次poll允许获取的最多条数.
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,this.sessionTimeoutMS);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,this.requestTimeoutMS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,this.groupId);
        return props;
    }
}

 

    同上,每个broker集群,对应一个Configuration类,此类将会负责初始化KafkaTemplate和ConsumerContainer,分别服务于此集群的多个Producers和listeners。我们约定,不同的Configuration类创建的Bean名称(kafkaTemplate、listenerContainerFactory)应该不同。

    上述的部分配置参数,仅供参考,请根据自己的业务实际情况适度调优。

    1)kafkaTemplate:按照上述配置样例配置即可,主要关注acks和reties相关参数,其他没有太多细节。

    2)listenerContainerFactory:建议关闭自动提交,而是使用spring-kakfa提供的“手动提交”模式,开启batchListener即listener可以一次获取多条record。

 

 

四、服务

    1、Producer样例

@Component
public class SampleService {

    @Autowired
    private KafkaTemplate<String,EventRecord> sampleKafkaTemplate;

    private String topic;//常量,或者来自profile等

    public void send() {
        ProducerRecord<String, EventRecord> producerRecord = new ProducerRecord<String, EventRecord>(
                this.topic,
                0,
                key,
                record);
        this.sampleKafkaTemplate.send(producerRecord);
    }
    
}

 

    2、listener样例

@Component
public class SampleMessageListener {

    /**
     * topic的参数可以来自常量或者profile或者env等
     * 其中topics、topicPattern均可以使用表达式,来自propertySource或者Env。
     * @param consumerRecords
     * @param acknowledgment
     */
    @KafkaListener(topicPattern = "${sample.kafka.topic-pattern}",containerFactory = "sampleListenerContainerFactory")
    public void onMessage(List<ConsumerRecord<String, EventRecord>> consumerRecords,Acknowledgment acknowledgment) {
        try {
            for (ConsumerRecord<String,EventRecord> record : consumerRecords) {
                //process(record.value());
            }
            acknowledgment.acknowledge();
        } catch (Exception e) {
            //处理异常,不建议ACK
        }
    }
}

 

    因为我们使用了手动确认,所以listener要在合适的时机调用acknowledge()方法。

 

五、消费者过滤器

     Consumer可以使用Filter过滤不需要的消息,这些消息将不会被传递给listener,我们需要在创建Consumer之前指定Filter,Filter将会有spring-kafka组件在poll消息列表之后逐个验证,对于Filter返回“true”的消息将会被丢弃,“false”的将会被传递给listener;特别注意,当我们使用Filter之后,consumer必须使用“手动确认”模式,即:

    1)设定kafka参数:ENABLE_AUTO_COMMIT = false

    2)设定容器参数为“手动确认”:

        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

    3)将“过滤抛弃的消息自动确认”:

        factory.setAckDiscarded(true);

    4)在listener中感知Acknowledgement,并在消息执行完毕后ACK。

        acknowledgment.acknowledge();

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, EventRecord> sampleListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EventRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(sampleConsumerFactory());
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        factory.setRecordFilterStrategy(new RecordFilterStrategy<String, EventRecord>() {

            private AviatorRegexMatcher regexFilter = new AviatorRegexMatcher(filterRegex);

            @Override
            public boolean filter(ConsumerRecord<String, EventRecord> consumerRecord) {
                EventHeader header = consumerRecord.value().getHeader();
                String source = header.getSchemaName() + "." + header.getTableName();
                return !regexFilter.match(source);
            }
        });
        factory.setAckDiscarded(true);//对于不符合filter的消息,是否自动丢弃 + ack
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        return factory;
    }

 

 

六、问题小结

    1、兼容性错误:

java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)

    可能是你基于springMVC 4或者springboot 1框架,但是引用了高版本的spring-kafka组件导致。spring 4、springboot 1,只能兼容spring-kakfa 1.3.x版本。

 

    2、反序列化问题

Caused by: java.lang.IllegalAccessException: Class org.apache.kafka.common.utils.Utils can not access a member of class org.springframework.kafka.support.serializer.JsonDeserializer with modifiers "protected"
    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) ~[na:1.8.0_121]
    at java.lang.Class.newInstance(Class.java:436) ~[na:1.8.0_121]
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:314) ~[kafka-clients-0.10.1.1.jar:na]
    ... 26 common frames omitted

 

    上述例子中,producer可以直接指定JsonSerializer而忽略类型;但是消费者的配置中,则需要使用构造方法来传递JsonDeserializer,因为反序列化无法预判类型。

public ConsumerFactory<String, EventRecord> sampleConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(sampleConsumerConfiguration(),new StringDeserializer(),new JsonDeserializer<>(EventRecord.class));
}

 

分享到:
评论

相关推荐

    spring-kafka 整合官方文档

    spring-kafka是Spring框架对Apache Kafka消息系统进行整合的一个项目,它简化...通过阅读这份官方文档,开发者能够更好地理解和掌握如何在Spring项目中集成和使用Kafka,以及如何解决在使用过程中可能遇到的各种问题。

    spring-boot-starter-kafka示例程序

    spring-boot-starter-kafka示例程序\n 支持springcloud1.5.4,kafka0.8.2.x\n 项目地址:https://github.com/zhyea/spring-boot-starter-kafka

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...

    springboot - 2.7.3版本 - (七)整合Kafka

    在`springboot-kafka`目录下,可能包含了一个示例项目,你可以通过导入IDE进行编译和运行,以了解Spring Boot与Kafka集成的实践操作。 通过以上步骤,你已经掌握了如何在Spring Boot 2.7.3中集成Apache Kafka的基本...

    通过springboot 实现spring和kafka集成springboot-kafka-master.zip

    在本项目"springboot-kafka-master.zip"中,我们将探讨如何使用Spring Boot框架与Apache Kafka进行集成,以构建高效、可扩展的消息传递系统。Spring Boot简化了Spring的应用配置,而Kafka作为一个分布式流处理平台,...

    springboot-kafka_springboot_kafka_

    本教程将详细介绍如何在SpringBoot项目中集成Kafka,构建高效的消息队列系统。 **一、SpringBoot与Kafka简介** 1. **SpringBoot**: SpringBoot是Spring框架的扩展,它简化了Spring应用的初始搭建以及开发过程,...

    springboot 基于spring-kafka动态创建kafka消费者

    本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:Kafka是一种分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、低延迟的消息传递,...

    springboot-kafka:springboot-kafka

    1. **Spring for Apache Kafka**: Spring提供了Spring for Apache Kafka模块,使得在Spring应用中使用Kafka变得更加简单。这包括自动配置、模板方法、监听器容器以及生产者和消费者的抽象。 2. **配置**: 在Spring ...

    spring-kafka-2.2.0.RELEASE.jar

    spring-kafka-2.2.0.RELEASE.jar是用于springboot+kafka,用springboot框架向kafka推送或消费数据所需maven包

    spring-kafka源代码

    Spring Kafka是Spring框架的一部分,专为集成Apache Kafka而设计,提供了一套轻量级且强大的API,使得在Java应用中使用Kafka变得更加简单。本文将围绕Spring Kafka的源代码进行深度解析,帮助开发者更好地理解和运用...

    springboot-kafka-connect-debezium-ksqldb:该项目的目标是与Kafka,Debezium和ksqlDB一起使用。 为此,我们有

    springboot-kafka-连接-debezium-ksqldb 该项目的目标是与 , 和 。 为此,我们有: research-service ,用于在中插入/更新/删除记录; Source Connectors ,用于监视MySQL中记录的更改并将与这些更改相关的消息推送...

    spring-kafka整合.docx

    1. 简洁易用:Spring for Apache Kafka提供了一个简洁、易用的API,方便开发者使用Apache Kafka的功能。 2. 高性能:该项目的实现提高了消息处理的效率和可靠性。 3. 灵活性:该项目提供了灵活的消息处理机制,适应...

    spring-boot集成kafka

    Spring for Apache Kafka提供了与Spring Boot整合的便利,通过引入`spring-kafka`和`spring-boot-starter`依赖,我们可以轻松地在Spring Boot应用中使用Kafka。添加以下依赖: ```xml &lt;groupId&gt;org.spring...

    spring-kafka文档.zip

    本文将深入探讨Spring Kafka的核心概念、主要功能以及实际应用,旨在帮助读者理解和掌握如何在Java项目中有效地使用Spring Kafka。 一、Spring Kafka概述 Spring Kafka作为一个轻量级的库,它允许开发者以声明式的...

    spring-kafka-protobuf:在您的Spring-boot项目中使用kafka和protobuf

    Spring-kafka-protobuf运行java -jar target/spring-kafka-protobuf-0.0.1-SNAPSHOT.jar curl -v 'http://localhost:8080/rest/n/spring/kafka/protobuf/demo' 查看并检查弹簧日志

    spring-kafka-demo

    Spring Kafka 是 Spring 框架为集成 Apache Kafka 提供的一个模块,它使得在 Java 应用程序中使用 Kafka 变得简单且直观。本项目 "spring-kafka-demo" 提供了一个完整的示例,涵盖了生产者和消费者的配置与实现,...

    springboot-kafka--消息队列

    springboot集成kafka消息队列代码,可在上面进行二次开发。亲测有效,可以通过注释进行消息的接口

    Spring Boot集群管理工具KafkaAdminClient使用方法解析

    Spring Boot 集群管理工具 KafkaAdminClient 使用方法解析 KafkaAdminClient 是 Spring Boot 集群管理工具中的一部分,主要用于管理和检视 Kafka 集群中的Topic、Broker、ACL 等对象。下面将详细介绍 Kafka...

    flink-connector-kafka-0.10-2.11-1.10.0-API文档-中文版.zip

    赠送jar包:flink-connector-kafka-0.10_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-0.10_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-0.10_2.11-1.10.0-sources.jar; 赠送Maven...

Global site tag (gtag.js) - Google Analytics