`

kafka 集群实例消费同一个topic 动态设置消费组

 
阅读更多

背景:我服务A部署集群 3个实例。

场景:前端WS连接A服务的实例。服务端往客户端推送数据。

触发点:另一个系统通过kafka通知过来。

遇到的问题:按kafka的消费规则。这三个集群实例在同一个group中,及groupId是相同的。这时候只会有一个实例收到kafka消息。只会有一个客户端收到服务端推送的消息。另外两个实例是收不到kafka消息,就不会往前端推送数据。

 

目标:这三个集群实例都收到kafka消息。

 

解决方案:在实例启动时候动态创建自己实例 消费kafka的groupId。这样三个集群实例不在一个消费组中,自然都能收到kafka消息了。

 

代码实现:

 @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> instanceChangeContainerFactory() {
        Map<String, Object> properties = new ConcurrentHashMap<>(delayContainerFactory.getConsumerFactory().getConfigurationProperties());

        // 每个实例的groupId动态改成ip
        String nodeIp = System.getenv(Constants.ENV_CONTAINER_IP);
        String ip = StringUtils.isNotBlank(nodeIp) ? nodeIp : NetUtils.localIP();

        String groupId = ObjectUtils.isEmpty(properties.get(ConsumerConfig.GROUP_ID_CONFIG)) ? ip : properties.get(ConsumerConfig.GROUP_ID_CONFIG) + "_" + ip;
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(properties);

        ConcurrentKafkaListenerContainerFactory<String, String> instanceChangeContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        instanceChangeContainerFactory.setAutoStartup(kafkaOpenListener);
        instanceChangeContainerFactory.setConsumerFactory(consumerFactory);

        return instanceChangeContainerFactory;
    }

 

 

 

分享到:
评论

相关推荐

    kafka集群部署文档(部署,运维,FAQ)

    ### Kafka集群部署与运维知识点详解 #### 一、Kafka概览 Kafka是一种高性能的分布式消息系统,具有以下特点: - **数据持久性**:Kafka通过高效的磁盘数据结构来实现消息的持久化存储,即使面对大量数据也能保持...

    kafka模拟生产者消费者(集群模式)实例

    6. **运行模拟**:启动多个消费者实例,形成一个消费组。Kafka会自动平衡分区的消费,使得同一分区在消费组内只由一个消费者消费,保证消息的顺序。 在集群模式下,如果一个broker宕机,其上的分区将被自动重新分配...

    Python测试Kafka集群(pykafka)实例

    在这个例子中,我们创建了一个简单的消费者,使用`get_simple_consumer()`方法,指定消费组`consumer_group`,并启用自动提交偏移量(offset,即消息的位置)。 ```python consumer = topic.get_simple_consumer...

    C#kafka开发实例

    设置必要的配置,如BootstrapServers(Kafka集群的地址),然后使用ProduceAsync方法将消息发送到特定的主题。例如: ```csharp var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; ...

    介绍kafka及kafka集群安装

    - **Broker**:Kafka 集群中的每一个服务器实例被称为 Broker。 - **Topic**:消息分类存储的逻辑单元。 - **Partition**:为了提高吞吐量和实现水平扩展,每个 Topic 可以分为多个 Partition。 - **Replication**:...

    Kafka集群配置样例_3节点_源码

    本篇将详细解析如何在Linux环境下配置一个3节点的Kafka集群,特别关注`server.properties`配置文件中的`zookeeper.connect`设置。 首先,我们需要理解Kafka集群的基础架构。Kafka集群由多个Brokers(即服务器)组成...

    KAFKA集群文档

    如果消费者都属于同一个消费组,则消息会在消费者之间进行负载均衡;若每个消费者都属于不同的消费组,则每个消费者都会收到消息的副本,类似于发布-订阅模式。 总结而言,Kafka集群文档详细介绍了Kafka集群的基本...

    kafka生产者消费者实例

    Kafka 消费者采用分组(Consumer Group)的概念,每个消息仅被组内一个消费者消费,确保数据的唯一处理。消费者可以动态调整偏移量(Offset),决定从哪个位置开始读取数据,从而实现数据处理的灵活性。此外,Kafka ...

    docker-kafka-cluster集群安装

    本文将详细介绍如何使用Docker Compose来搭建一个Kafka集群,并解析Kafka的相关配置。 【描述】: 1、Kafka-Docker-Compose集群安装 Docker Compose是Docker官方提供的一种工具,用于定义和运行多容器的Docker...

    kafka集群安装

    在本文中,我们将详细介绍如何在三个服务器(service1、service2、service3)上搭建一个基本的Kafka集群。 1. **创建Kafka目录**: 在每个服务器的根目录下创建一个名为`kafka`的文件夹,这将作为Kafka安装的位置...

    kafka中partition和消费者对应关系1

    Partition是Kafka主题(Topic)的逻辑分片,每个Partition内部的消息是有序的,并且只能被同一个消费者组(Consumer Group)中的一个消费者实例消费,这就确保了消息的唯一消费性。消费者组是由一组消费者组成的,...

    kafka集群搭建和使用Java写kafka生产者消费者

    2. **创建生产者**: 创建一个`KafkaProducer`实例,传入配置,如`bootstrap.servers`(Kafka集群地址),`key.serializer`和`value.serializer`(序列化类)。 3. **发送消息**: 使用`producer.send()`方法将消息...

    kafka学习实例

    1. **Kafka集群配置**:Kafka集群由一个或多个服务器(被称为Broker)组成,每个Broker存储一部分主题的数据。为了确保高可用,需要配置多个Broker,并设置合适的复制因子。在部署Kafka时,还需要配置Zookeeper,它...

    Kafka集群文档

    - **Broker**: Kafka集群由多个Broker实例组成,每个Broker实例都是一个服务器节点,负责存储和处理消息。 - **Zookeeper**: Kafka集群依赖于Zookeeper来协调各个Broker间的活动,包括管理元数据、保持集群的一致性...

    kafka项目实例

    实时数据的获取可以通过监听主题的最新消息完成,这通常涉及设置消费者组并保持持续读取。 ### 3. 获得历史偏移量 要获取历史偏移量,你需要使用Kafka的消费者API。消费者可以查询每个分区的最新和最早偏移量,也...

    Kafka性能测试实例1

    创建一个拥有6个分区、1个副本的Topic,设置不同的线程数并发送相同的数据量,查看性能变化。 ### 3.1.1 测试结果 ### 3.1.2 结论 向一个拥有6个分区、1个副本的Topic中,发送500万条消息记录时,随着线程数的...

    kafka基础知识及集群搭建

    API主要包括创建Producer实例、发送消息、创建Consumer实例、设置消费组、订阅Topic、读取消息等功能。 总结来说,Kafka作为一个高效、可靠的分布式消息中间件,广泛应用于实时数据处理、日志收集、流式数据处理等...

    Kafka实例Kafka实例

    3. 消费者通过消费组(Consumer Group)订阅主题,每个分区只能被消费组内的一个消费者消费,确保消息的顺序性和无重复消费。 4. 消费者从 Broker 拉取数据,而不是像传统的消息队列那样推送数据,这样可以更好地...

    Kafka跨集群容灾备份.docx

    - **地理分布**:为了减少网络延迟并提高用户体验,可能需要在不同的地理位置部署多个Kafka集群。这种情况下,跨集群的数据同步可以确保各个集群之间数据的一致性。 #### 二、MirrorMaker架构 **MirrorMaker** 是 ...

    kafka生产者消费者Demo

    1. 生产者代码示例:展示如何创建一个生产者实例,发布消息到特定主题。 2. 消费者代码示例:展示如何创建消费者实例,订阅主题并处理消息。 3. 配置文件:可能包括生产者和消费者的配置文件,其中包含了如上所述的...

Global site tag (gtag.js) - Google Analytics