`
m635674608
  • 浏览: 5028450 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Kafka详解五、Kafka Consumer的底层API- SimpleConsumer

    博客分类:
  • MQ
 
阅读更多
Kafka提供了两套API给Consumer
  1. The high-level Consumer API
  2. The SimpleConsumer API     

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情

  • 一个消息读取多次
  • 在一个处理过程中只消费Partition其中的一部分消息
  • 添加事务管理机制以保证消息被处理且仅被处理一次

使用SimpleConsumer有哪些弊端呢?

  • 必须在程序中跟踪offset值
  • 必须找出指定Topic Partition中的lead broker
  • 必须处理broker的变动

使用SimpleConsumer的步骤

  1. 从所有活跃的broker中找出哪个是指定Topic Partition中的leader broker
  2. 找出指定Topic Partition中的所有备份broker
  3. 构造请求
  4. 发送请求查询数据
  5. 处理leader broker变更
代码实例:
package bonree.consumer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {
private List<String> m_replicaBrokers = new ArrayList<String>();

public SimpleExample() {
m_replicaBrokers = new ArrayList<String>();
}

public static void main(String args[]) {
SimpleExample example = new SimpleExample();
// 最大读取消息数量
long maxReads = Long.parseLong("3");
// 要订阅的topic
String topic = "mytopic";
// 要查找的分区
int partition = Integer.parseInt("0");
// broker节点的ip
List<String> seeds = new ArrayList<String>();
seeds.add("192.168.4.30");
seeds.add("192.168.4.31");
seeds.add("192.168.4.32");
// 端口
int port = Integer.parseInt("9092");
try {
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
e.printStackTrace();
}
}

public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
// 获取指定Topic partition的元数据
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
System.out.println("Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
System.out.println("Can't find Leader for Topic and Partition. Exiting");
return;
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_partition;

SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
}
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);

if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
if (numErrors > 5)
break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue;
}
numErrors = 0;

long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}

readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}

if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
if (consumer != null)
consumer.close();
}

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}

/**
 * @param a_oldLeader
 * @param a_topic
 * @param a_partition
 * @param a_port
 * @return String
 * @throws Exception
 *             找一个leader broker
 */
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
System.out.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}

private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop: for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}
}

http://www.tuicool.com/articles/j6ZZnaI
 
分享到:
评论

相关推荐

    consumer-groups-get-utils.tar.gz

    《Kafka消费者组管理工具详解——consumer-groups-get-utils.tar.gz深度剖析》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的实时流处理平台,扮演着至关重要的角色。其强大的消息传递机制使得数据能够在...

    kafka_2.11-2.2.2.tgz

    Kafka提供了丰富的命令行工具,如`kafka-topics.sh`、`kafka-consumer-groups.sh`等,用于查看和管理Topics、Partitions、Consumer Groups等。 十一、注意事项 1. Kafka默认使用9092端口,确保没有其他服务占用。 ...

    最新版kafka kafka_2.12-2.5.1.tgz

    **Kafka 2.5.1 知识点详解** Kafka 是一个分布式流处理平台,由 Apache 软件基金会开发,广泛应用于大数据实时处理、日志收集、消息系统等多个领域。`kafka_2.12-2.5.1` 是 Kafka 的一个特定版本,针对 Scala 2.12 ...

    SpringBoot使用Kafka详解含完整代码

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(topics = "testTopic") public void listen(ConsumerRecord, ?&gt; record) { logger.info("Received ...

    1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试

    作者提供的其他相关文章,如 Java API 的使用、Kafka 的关键概念详解、分区和副本介绍,以及 Kafka 监控工具 Kafka-Eagle 的使用,将更深入地探讨 Kafka 的功能和操作。 总之,Apache Kafka 是一个强大的工具,适用...

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...

    kafka配置安装详解

    ### Kafka配置安装详解 #### 一、环境搭建与配置 Kafka是一款开源的消息队列中间件,被广泛应用于大数据处理领域。本篇文章将详细介绍如何在本地环境中安装并配置Kafka,以及进行基本的操作演示。 ##### 环境要求...

    kafka_2.11-2.2.1--.zip

    《Apache Kafka 2.2.1:分布式流处理平台详解》 Apache Kafka 是一个高度可扩展的开源消息系统,主要用于构建实时数据管道和流应用程序。Kafka 的核心特性包括发布订阅模式、高吞吐量、持久化存储以及容错性。在本...

    大数据kafka的api详细介绍和使用,包含kafka的安装部署

    **Kafka API详解与使用** Kafka是一款由LinkedIn开发并贡献给Apache的开源分布式消息系统,主要用于处理实时数据流。其强大的性能、高吞吐量和可扩展性使其成为大数据领域的重要组件。本文将深入探讨Kafka的API,...

    kafka-0.10.1.0 全集(API,程序win+linux)

    **Kafka API详解** 1. **Producer API**: 生产者API允许开发者将消息发布到Kafka主题。在0.10.1.0版本中,生产者支持批量发送,提高了效率。同时,引入了幂等性生产者,解决了重复消息的问题,保证了数据的一致性。...

    kafka可视化工具--kafkatool

    **Kafka工具详解——Kafkatool** Kafka作为一个分布式流处理平台,广泛应用于大数据实时处理和消息传递。然而,管理Kafka集群和操作其组件(如topics、partitions、offsets等)可能会变得复杂,这时就需要一些可视...

    Python库 | confluent_kafka-1.5.0-cp36-cp36m-win_amd64.whl

    《Python中的Confluent Kafka库详解》 在Python编程领域,Kafka作为一款强大的分布式流处理平台,被广泛应用于实时数据处理和消息传递。而Confluent Kafka是Apache Kafka的商业化实现,它提供了额外的企业级功能,...

    kafka eagle 1.4.8安装包kafka eagle 1.4.8

    **Kafka Eagle 1.4.8 安装详解** Kafka Eagle是一款由国内开发者智能洛里(smartloli)开源的Kafka监控系统,它为Apache Kafka提供了一种直观且功能丰富的管理界面,帮助用户更好地监控、管理和优化Kafka集群。在...

    kafka3.2常用命令

    ### Kafka 3.2 常用命令详解 #### 一、启动 ZooKeeper 服务 在启动 Kafka 之前,必须先启动 ZooKeeper 服务。ZooKeeper 为 Kafka 提供了集群协调服务。 ##### 操作步骤: 1. **打开命令行窗口**: - 打开 cmd ...

    kafka_2.11-0.11.0.3.zip

    《Kafka 0.11.0.3在CentOS 7.0系统中的安装与使用详解》 Apache Kafka是一款高性能、分布式的消息中间件,它主要用于处理实时数据流。Kafka_2.11-0.11.0.3是针对Java 2.11版本的一个发行版,适用于Linux环境,特别...

    kafka_2.12-1.0.0.zip

    Kafka提供了一些工具来监控和管理集群,如`kafka-topics.sh`用于管理主题,`kafka-consumer-groups.sh`用于查看消费者组状态,`kafka-log-dirs.sh`用于检查日志状态。 总结,Kafka 2.12-1.0.0版本在性能、稳定性和...

    kafka_2.10-0.10.1.0

    《Kafka 2.10-0.10.1.0在Windows环境下的应用与配置详解》 Kafka是一款高性能、分布式的消息中间件,它主要用于处理实时数据流。在这个版本,即Kafka 2.10-0.10.1.0,主要面向的是Java 2.10 SDK,提供了稳定且高效...

    win32下的kafkac++ api

    《Windows环境下基于librdkafka的C++ Kafka API详解》 在Windows系统下,Kafka的C++ API实现主要依赖于librdkafka库。这个库不仅提供了与Apache Kafka交互的接口,还支持多种操作系统,包括但不限于Linux、macOS...

    大数据技术之Kafka详解

    大数据技术之Kafka详解 本文将对Kafka的概念、架构、特点和应用场景进行详细的解释。 Kafka的定义 Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。 消息队列...

Global site tag (gtag.js) - Google Analytics