`

Kafka Consumer的底层API- SimpleConsumer

阅读更多

Kafka Consumer的底层API- SimpleConsumer

 1.Kafka提供了两套API给Consumer

  1. The high-level Consumer API
  2. The SimpleConsumer API     
第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情
  • 一个消息读取多次
  • 在一个处理过程中只消费Partition其中的一部分消息
  • 添加事务管理机制以保证消息被处理且仅被处理一次

2.使用SimpleConsumer有哪些弊端呢?

  • 必须在程序中跟踪offset值
  • 必须找出指定Topic Partition中的lead broker
  • 必须处理broker的变动

3.使用SimpleConsumer的步骤

  1. 从所有活跃的broker中找出哪个是指定Topic Partition中的leader broker
  2. 找出指定Topic Partition中的所有备份broker
  3. 构造请求
  4. 发送请求查询数据
  5. 处理leader broker变更

4.代码实例

 

package bonree.consumer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        // 最大读取消息数量
        long maxReads = Long.parseLong("3");
        // 要订阅的topic
        String topic = "mytopic";
        // 要查找的分区
        int partition = Integer.parseInt("0");
        // broker节点的ip
        List<String> seeds = new ArrayList<String>();
        seeds.add("192.168.4.30");
        seeds.add("192.168.4.31");
        seeds.add("192.168.4.32");
        // 端口
        int port = Integer.parseInt("9092");
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    public void run(long a_maxReads, String a_topic,
            int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // 获取指定Topic partition的元数据
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic,
                a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder().clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) {
                    break;
                }
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic,
                            a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset
                    : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: "
                            + currentOffset + " Expecting: " + readOffset);
                    continue;
                }

                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset())
                        + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) {
            consumer.close();
        }
    }

    public static long getLastOffset(SimpleConsumer consumer,
            String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo
                = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: "
                    + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    /**
     * @param a_oldLeader
     * @param a_topic
     * @param a_partition
     * @param a_port
     * @return String
     * @throws Exception 找一个leader broker
     */
    private String findNewLeader(String a_oldLeader,
            String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers,
            int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed
                        + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

 

分享到:
评论

相关推荐

    kafka-clients-2.4.1-API文档-中英对照版.zip

    包含翻译后的API文档:kafka-clients-2.4.1-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.4.1; 标签:apache、kafka、clients、中英对照文档、jar包、java; 使用...

    kafka-schema-registry-client-6.2.2.jar

    mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....

    kafka-clients-2.0.1-API文档-中文版.zip

    包含翻译后的API文档:kafka-clients-2.0.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.0.1; 标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后...

    kafka-clients-2.0.0-API文档-中文版.zip

    包含翻译后的API文档:kafka-clients-2.0.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.0.0; 标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后...

    kafka-clients-2.4.1-API文档-中文版.zip

    包含翻译后的API文档:kafka-clients-2.4.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.4.1; 标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后...

    kafka-clients-2.2.0-API文档-中文版.zip

    包含翻译后的API文档:kafka-clients-2.2.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.2.0; 标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后...

    kafka-clients-2.0.0-API文档-中英对照版.zip

    包含翻译后的API文档:kafka-clients-2.0.0-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.0.0; 标签:apache、kafka、clients、中英对照文档、jar包、java; 使用...

    flink-connector-kafka-2.12-1.14.3-API文档-中英对照版.zip

    包含翻译后的API文档:flink-connector-kafka_2.12-1.14.3-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.flink:flink-connector-kafka_2.12:1.14.3; 标签:apache、flink、connector、...

    flink-connector-kafka-base-2.11-1.10.0-API文档-中文版.zip

    包含翻译后的API文档:flink-connector-kafka-base_2.11-1.10.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.flink:flink-connector-kafka-base_2.11:1.10.0; 标签:flink、11、apache、base_2、...

    flink-connector-kafka-0.9-2.11-1.10.0-API文档-中文版.zip

    包含翻译后的API文档:flink-connector-kafka-0.9_2.11-1.10.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.flink:flink-connector-kafka-0.9_2.11:1.10.0; 标签:flink、0、11、apache、connector...

    最新版kafka kafka_2.12-2.5.1.tgz

    - 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning` 7. **最佳实践** - 为了保证高可用性,推荐至少配置 3 个 broker。 - 合理设置分区数和...

    kafka-clients-0.10.0.1-API文档-中文版.zip

    包含翻译后的API文档:kafka-clients-0.10.0.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:0.10.0.1; 标签:apache、clients、kafka、jar包、java、API文档、中文版; 使用...

    kafka-clients-2.0.1-API文档-中英对照版.zip

    包含翻译后的API文档:kafka-clients-2.0.1-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.0.1; 标签:apache、kafka、clients、中英对照文档、jar包、java; 使用...

    kafka-2.12-2.7.0.tar

    kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-...

    kafka-clients-0.10.1.1-API文档-中文版.zip

    包含翻译后的API文档:kafka-clients-0.10.1.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:0.10.1.1; 标签:apache、clients、kafka、jar包、java、API文档、中文版; 使用...

    kafka-eagle-bin-2.1.0.tar.gz

    4. 启动:执行`bin/kafka-eagle-start.sh`启动服务,使用`bin/kafka-eagle-stop.sh`停止服务。 5. 访问:在Web浏览器中输入`http://服务器IP:端口`,按照提示进行登录和操作。 ### 使用注意事项 1. 确保运行环境已...

    配置好的kafka_2.12-2.8.0 + SCRAM-SHA-256

    bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=your-password]' --entity-type users --entity-name your-user ``` 3. **配置客户端**:在...

    kafka-2.12-3.6.1.tgz

    /usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...

    flink-connector-kafka-2.12-1.14.3-API文档-中文版.zip

    包含翻译后的API文档:flink-connector-kafka_2.12-1.14.3-javadoc-API文档-中文(简体)版.zip 对应Maven信息:groupId:org.apache.flink,artifactId:flink-connector-kafka_2.12,version:1.14.3 使用方法:...

    kafka-schema-registry-client-3.2.0.jar

    kafka-schema-registry-client-3.2.0.jar包,亲测可用,在aliyun仓库内找不到,可以下载此jar包来进行手动安装

Global site tag (gtag.js) - Google Analytics