`
bit1129
  • 浏览: 1069654 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Kafka十三】Kafka Simple Consumer

 
阅读更多

代码中关于Host和Port是割裂开的,这会导致单机环境下的伪分布式Kafka集群环境下,这个例子没法运行。

实际情况是需要将host和port绑定到一起,

 

package kafka.examples.lowlevel;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.*;

public class KafkaLowLevelConsumer {
    //参数说明:
    /*
    Maximum number of messages to read (so we don’t loop forever)
    Topic to read from
    Partition to read from
    One broker to use for Metadata lookup
    Port the brokers listen on
     */
    public static void main(String args[]) {
        KafkaLowLevelConsumer consumer = new KafkaLowLevelConsumer();

        //读取的消息数
        long maxReads = Long.parseLong(args[0]);

        //读取的topic名称
        String topic = args[1];

        //读取的partition,从0开始的
        int partition = Integer.parseInt(args[2]);


        List<String> seeds = new ArrayList<String>();
        seeds.add(args[3]);

        //seed broker的监听端口,每个Topic和Partition的信息是存放于zk目录:/brokers/topics/learn.topic.p8.r2
        int port = Integer.parseInt(args[4]);
        try {
            consumer.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    private List<String> replicaBrokers = new ArrayList<String>();

    public KafkaLowLevelConsumer() {
        replicaBrokers = new ArrayList<String>();
    }

    public void run(long maxReads, String topic, int partition, List<String> seedBrokers, int port) throws Exception {

        //获取指定topic和partition的元信息,PartitionMetadata的leader和replicas方法返回leader和replicas brokers
        PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }

        //获取lead partition所在的broker
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }

        //获取leader broker的host信息,不包括端口信息
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + topic + "_" + partition;

        //构造SimpleConsumer,为什么port和leaderBroker不一致?
        //这里的leadBroker, port是配对的,应该是metadata.leader().port()
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);


        //获取读取的offset
        long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;
        while (maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(topic, partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            //Since the SimpleConsumer doesn't handle lead Broker failures, you have to write a bit of code to handle it.
            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(topic, partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, topic, partition, port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                maxReads--;
            }
            //This method uses the findLeader() logic we defined earlier to find the new leader,
            // except here we only try to connect to one of the replicas for the topic/partition.
            // This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard.
            //Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer.
            // In reality ZooKeeper often does the failover very quickly so you never sleep
            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    // Finding Starting Offset for Reads
    // Now define where to start reading data. Kafka includes two constants to help,
    // kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there,
    // kafka.api.OffsetRequest.LatestTime() will only stream new messages.
    // Don’t assume that offset 0 is the beginning offset, since messages age out of the log over time.
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        //Get a list of valid offsets (up to maxSize) before the given time.
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    //Since the SimpleConsumer doesn't handle lead Broker failures, you have to write a bit of code to handle it.
    //Here, once the fetch returns an error, we log the reason, close the consumer then try to figure out who the new leader is.
    private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep;
            PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper a second to recover
                // second time, assume the broker did recover before failover, or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    ///根据指定的Broker,查找指定topic和partition的Lead Partition
    //Finding the Lead Broker for a Topic and Partition
    //The easiest way to do this is to pass in a set of known Brokers to your logic,
    // either via a properties file or the command line.
    // These don’t have to be all the Brokers in the cluster,
    // rather just a set where you can start looking for a live Broker to query for Leader information.
    //seedBrokers是使用的replicaBrokers列表
    //调用PartitionMetadata的leader和replicas方法可以得到该Partition对应的Leader和Replicas Broker信息
    private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {

        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                /**
                 class SimpleConsumer(val host: String,
                 val port: Int,
                 val soTimeout: Int,
                 val bufferSize: Int,
                 val clientId: String)
                 */
                consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                //The call to topicsMetadata() asks the Broker you are connected to for all the details about the topic we are interested in
                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    //The loop on partitionsMetadata iterates through all the partitions until we find the one we want. Once we find it, we can break out of all the loops.
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + topic
                        + ", " + partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            replicaBrokers.clear();

            ///将replicaBrokers进行缓存
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

 参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

 

 

分享到:
评论

相关推荐

    springboot-kafka-simple-demo

    本示例"springboot-kafka-simple-demo"旨在帮助开发者了解如何在SpringBoot应用中集成和使用Kafka。 首先,我们需要了解Kafka的基本概念。Kafka是一个高吞吐量、低延迟的消息队列,它可以处理PB级别的数据,适用于...

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    描述中的“Simple application demonstrate kafka java springboot”进一步确认了这是一个基于Java和Spring Boot的简单示例,用于演示Kafka的功能。 Apache Kafka是一种高吞吐量、分布式的发布/订阅消息系统,它被...

    Kafka 配置用户名密码例子

    首先,我们需要了解Kafka支持的三种认证机制:SSL(Secure Sockets Layer)、SASL/Kerberos和SASL/PLAIN。SSL主要用于加密传输,防止数据在传输过程中被窃取;SASL/Kerberos基于票证的认证方式,适合大型企业环境,...

    kafka-python开发文档

    文档包含了对API的引用说明,包括已被弃用的Simple APIs,以及使用兼容性说明,其中详细解释了不同Kafka版本间的兼容性问题。 kafka-python的文档内容覆盖了使用者所关心的方方面面,从安装、测试、兼容性、支持、...

    Python-kafka集群搭建PythonAPI调用Producer和Consumer

    在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...

    kafka的java依赖包

    Consumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, String&gt; records = consumer.poll(100); for (ConsumerRecord, ...

    kafka-2.13-2.8.0

    4. **消费者(Consumer)**:订阅并消费主题中的消息,Kafka支持两种消费者模式:Simple Consumer和Consumer Group。 5. ** Broker**:Kafka集群中的服务器节点,负责存储、读取和转发消息。 二、Kafka 2.8.0新...

    Simple-Kafka:易于使用的 Apache Kafka Java API 包装器和实现

    Simple-Kafka(开发阶段) Simple-Kafka 旨在为生产者和消费者提供一个更简单的 Java API。 因此,对 Apache Kafka 知之甚少的人可以轻松地将这个实时消息传递系统集成到他们的项目中。 主要特征 1 : 更简单的Java ...

    kafka_simple.zip

    KafkaConsumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("my-topic")); ``` 循环读取消息并处理: ```java while (true) { ConsumerRecords, String&gt; ...

    大数据kafka的api详细介绍和使用,包含kafka的安装部署

    Kafka提供两种类型的消费者:旧版的Simple Consumer和新的High-Level Consumer。High-Level Consumer支持自动分区分配和组管理,更易于使用。 3. **Admin API**:用于管理Kafka对象,如创建、删除主题,查询分区和...

    kafka在centos7上搭建及springboot集成kafka的小demo用例

    - 运行`kafka-simple-consumer-shell.sh`命令检查Kafka消费者是否工作正常。 5. **Spring Boot整合Kafka** - 在Spring Boot项目中添加Kafka相关的依赖。 - 配置`application.yml`,提供Kafka的相关连接信息,如`...

    Apache Kafka 3.1.0 (kafka-3.1.0-src.tgz)

    - **消费者模型**:提供两种消费者模型——旧版的Simple Consumer和较新的Consumer Group,后者支持并行消费和负载均衡。 2. **架构** - **Brokers**:Kafka集群由多个节点(brokers)组成,它们负责存储、转发和...

    [转]Kafka深度解析

    Kafka有两种消费者模型:旧版的Simple Consumer和新版的Consumer Group。Simple Consumer适合单机应用,而Consumer Group模型支持多消费者共享订阅,实现负载均衡,是现代Kafka应用的首选。 【知识点四】:Kafka的...

    maven-kafka

    KafkaConsumer, String&gt; consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ConsumerRecords, String&gt; records = consumer.poll(100); for ...

    使用sasl的kafka集群的搭建使用

    首先,SASL(Simple Authentication and Security Layer)是为C/S(Client/Server)模型设计的一种认证和安全层机制,用于扩充传统的TCP/IP认证和授权过程。在Kafka集群中,使用SASL机制可以实现生产者和消费者与...

    《kafka权威指南 》

    4. **消费者模型**:学习两种消费者模型——旧的Simple Consumer和新的Consumer Group API,理解它们的工作方式以及在不同场景下的选择。 5. **Kafka Streams**:了解Kafka提供的流处理库,用于构建复杂的数据处理...

    Learning Apache Kafka, 2nd Edition

    Kafka is one of those systems that is very simple to describe at a high level but has an incredible depth of technical detail when you dig deeper. Learning Apache Kafka Second Edition provides you ...

    kafka学习实例

    Kafka支持两种消费者模式:旧版的Simple Consumer和新版的Consumer Group。Consumer Group允许多消费者并行消费同一主题,实现负载均衡。在Kafka-demo中,我们可能有示例代码展示了如何创建消费者,订阅主题,然后...

    kafka_2.9.2-0.8.1

    3. **消费者API改进**:在这个版本中,Kafka 引入了新的消费者 API,即 Simple Consumer 被 High Level Consumer 所取代,后者引入了消费组的概念,提高了消息处理的可靠性和效率。 4. **故障恢复与复制**:Kafka ...

    spark-streaming-kafka.rar

    在Spark Streaming应用中集成Kafka,你需要在Spark配置中指定Kafka的相关参数,如Bootstrap Servers(Kafka集群地址)、Consumer Group ID(消费者组ID)以及需要消费的Topic等。同时,还需要添加这些jar包到Spark...

Global site tag (gtag.js) - Google Analytics