`
357029540
  • 浏览: 738560 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

在kafka单服务器的情况下消费者端如何确定服务器是否启动

阅读更多

            在kafka集群的单服务器情况下,如何通过consumer消费者确定kafka服务器或者zookeeper服务器是否启动(因为消费者目前是无法判断服务器是否启动的,它只是去轮询获取服务器数据而不报错),如果没有启动,消费者端做出相应的操作来提醒消费者端使用人员进行维护,在这里我提供一个简单的解决方案,可能并不是非常通用,提供一个简单的思路而已。

         如果kafka服务器或者zookeeper服务器没有启动,在producer生产者端向服务器发送信息的时候会出现错误(org.apache.kafka.common.errors.TimeoutException),我使用的kafka的maven版本是0.11.0.0的,当有异常错误出现的时候我们可以在生产者端使用一个静态变量来变更记录这个状态,同时在生产者端提供一个接口以供消费者端调用;在consumer消费者端我们提供一个定时任务,如果消费者端在规定的时间里面没有从kafka服务器获取到数据,定义一个静态变量去记录获取数据状态,那么在定时任务里面就会根据这个静态变量的值决定是否去调用producer消费者端的接口判断接口返回的状态情况,如果返回的状态表示服务器没有启动则消费者端做出相应的操作。如果在集群多服务器的情况下,客户端可以根据订阅的主题topic判断哪些服务器是否存活的,下面提供了生产者producer和消费者consumer的main方法,并没有按照上面的叙述去完成接口,如果需要自己去实现。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.11.0.0</version>
</dependency>
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author liaoyubo
 * @version 1.0 2017/7/25
 * @description
 */
public class KafkaProducerTest {

    public static void main(String [] args){
        Properties properties = new Properties();
        //properties.put("zookeeper.connect","localhost:2181");
        properties.put("bootstrap.servers", "192.168.201.190:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<String, String>(properties);

        for(int i = 0;i < 10;i++){
            Future<RecordMetadata> futureRecordMetadata =  producer.send(new ProducerRecord<String, String>("myTopic",Integer.toString(i),Integer.toString(i)));
            try {
                futureRecordMetadata.get(3, TimeUnit.SECONDS);
                System.out.println("发送的message:"+i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                if(e.getMessage().split(":")[0].split("\\.")[5].equals("TimeoutException")){
                    System.out.println("无法连接到服务器");
                }
                e.printStackTrace();
            } catch (TimeoutException e) {
                System.out.println("无法连接到服务器");
                e.printStackTrace();
            }
        }

        producer.close();

    }

}

 

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * @author liaoyubo
 * @version 1.0 2017/7/26
 * @description
 */
public class KafkaConsumerClientTest {

    public static void main(String [] args){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.201.190:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Arrays.asList("myTopic","myTest"));
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                //int partition = record.partition();
                String topic = record.topic();
                List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
                for (PartitionInfo partitionInfo : partitionInfoList){
                    Node node = partitionInfo.leader();
                    System.out.println(node.host());
                    //获取存活的服务器
                    Node [] nodes = partitionInfo.replicas();
                }
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

}

 

0
0
分享到:
评论

相关推荐

    springboot 基于spring-kafka动态创建kafka消费者

    2. **配置Kafka**:在`application.yml`或`application.properties`中配置Kafka的相关信息,如服务器地址、消费者组ID等。 ```yaml spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-...

    kafka生产者消费者实例

    在 Kafka 实例中,首先需要设置和启动 Kafka 服务器,包括 ZooKeeper(协调服务)和 Kafka Broker(数据存储节点)。然后,我们可以编写 Java 或者其他语言的生产者代码,将数据发布到预定义的主题。例如,可以创建...

    kafka模拟生产者消费者(集群模式)实例

    在本文中,我们将深入探讨如何在集群模式下模拟Kafka的生产者和消费者。Kafka是一种分布式流处理平台,常用于大数据实时处理和消息传递。它由Apache开发,以其高吞吐量、低延迟和可扩展性而闻名。 首先,我们要理解...

    springboot整合kafka实现生产者和消费者

    在本文中,我们将深入探讨如何使用Spring Boot整合Apache Kafka,以构建一个生产者和消费者的示例。Apache Kafka是一个分布式流处理平台,常被用于构建实时数据管道和流应用。Spring Boot简化了Java开发,提供了开箱...

    Kafka中生产者和消费者java实现

    这就是使用Java在IDEA和Maven环境下实现Kafka生产者和消费者的基本步骤。通过这个基础,你可以根据实际需求进行更复杂的应用,例如设置回调函数、处理错误、调整消费者分组策略等。Kafka的强大之处在于其高吞吐量、...

    kafka生产者消费者Demo

    生产者API允许开发者向Kafka服务器发送消息,这些消息会被存储在特定的主题中。在配置中,“zookeeper.connect”是关键参数,它指定了ZooKeeper集群的地址,ZooKeeper是Kafka用来协调集群和服务发现的工具。修改这个...

    kafka大数据 生产者消费者实例

    在实际运行时,你需要确保Kafka服务已经启动,并正确配置了生产者和消费者的配置文件,比如设置正确的服务器地址和主题名。 总的来说,这个实例为你提供了一个基础的Kafka使用模型,你可以在此基础上根据实际需求...

    使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据

    5. **Kafka消费者**:在消费端,我们可以创建一个Kafka消费者,通过`@KafkaListener`注解监听特定主题。为了实现批量消费,可以使用`poll()`方法来获取一批记录,然后一次性处理。这种方式可以优化处理性能,尤其在...

    kafka集群搭建和使用Java写kafka生产者消费者

    在本文中,我们将深入探讨如何搭建Kafka集群以及如何使用Java编写Kafka的生产者和消费者。Kafka是由LinkedIn开发并贡献给Apache软件基金会的消息队列系统,它被广泛用于实时数据流处理和大数据分析。 ### Kafka集群...

    在linux中搭建kafka集群

    10. 一旦Zookeeper集群成功启动,你就可以继续配置和启动Kafka服务器了,这个过程涉及到创建broker配置,设置broker.id,配置Zookeeper连接,以及启动`kafka-server-start.sh`脚本。 搭建Kafka集群是一个涉及多个...

    windows下kafka_2.12-2.9.0.rar(含单机伪分布式配置)

    在实际操作中,用户还需要了解如何创建主题(topics)、生产者(producers)和消费者(consumers),以及如何使用Kafka命令行工具来管理和监控Kafka集群。此外,熟悉Kafka的副本复制、分区策略以及高可用性设置也是...

    kafka报文模拟工具

    在日常项目中,作为Kafka消费者,我们可以利用这款工具生成特定格式或内容的报文,以此来测试消费者的消费能力和异常处理机制。 在了解工具的使用前,我们需要先理解Kafka的基本概念。Kafka是一种分布式流处理平台...

    springboot整合kafka的发布/消费demo项目源码

    在Kafka中,生产者是发布消息到主题(Topic)的组件,而消费者则订阅这些主题并消费其中的消息。Kafka具有高吞吐量、低延迟以及容错性等特性,使得它成为大数据和实时数据处理的理想选择。 现在,我们将探讨如何在...

    kafka 可运行实例

    Kafka支持高可用性和容错性的消费者群组机制,当消费者加入或离开群组时,分区的分配会自动调整。 3. **主题和分区**:主题可以被分为多个分区,每个分区有唯一的标识符。分区内的消息是有序的,但是不同分区之间...

    kafka搭建windows版本单机环境

    在同一个命令行窗口中,启动Kafka服务器: ``` kafka-server-start.bat config/server.properties ``` ### 6. 创建主题 创建一个名为`my-topic`的主题,分区数为1,副本数为1: ``` kafka-topics.bat --create --...

    Kafka详细课程讲义

    **Kafka详细课程讲义** 本课程主要涵盖了Apache ...5. 如何处理Kafka的消费者挂掉或新消费者加入的情况? 通过学习以上章节,你可以深入了解Kafka的原理、配置、使用和优化,为实际项目中的数据流处理打下坚实基础。

    kafka搭建与使用.doc

    依次在 hdp-1、hdp-2、hdp-3 节点上启动 Kafka 服务器。 三、Kafka 指令的使用 1. 查看当前机器中的所有 topics 使用 kafka-topics.sh 命令可以查看当前机器中的所有 topics。 2. 创建 topics 使用 kafka-...

    kafka-2.12-2.8.2

    3. 启动:启动Zookeeper服务,然后启动Kafka服务器。 4. 创建主题:使用Kafka命令行工具创建主题,定义分区和副本数量。 5. 生产与消费:编写生产者和消费者程序,实现数据的发布和订阅。 三、Kafka使用场景 1. ...

    kafka直接解压可用

    1. **发布/订阅模型**:Kafka采用发布/订阅模型,生产者发布消息到主题(Topic),消费者则订阅这些主题以获取消息。 2. **分区与副本**:每个主题都可以被划分为多个分区(Partition),每个分区都有一个主副本和...

    kafka安装包-2.13-3.6.2

    Kafka支持两种消费模式:单消费者模式(每个分区只能有一个消费者)和消费者组模式(多个消费者可以组成一个组,共享主题的分区)。 5. **Partitions**: 主题被分成多个分区,分区内的消息按照顺序存储,且每个分区...

Global site tag (gtag.js) - Google Analytics