`
gaojingsong
  • 浏览: 1202072 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【大数据kafka之low-level consumer】

阅读更多

 

一、什么时候用这个接口?

     1)Read a message multiple times

     2)Consume only a subset of the partitions in a topic in a process

    3)Manage transactions to make sure a message is processed once and only once

 

二、使用SimpleConsumer的步骤:

1)Find an active Broker and find out which Broker is the leader for your topic and partition

2)Determine who the replica Brokers are for your topic and partition

3)Build the request defining what data you are interested in

4)Fetch the data ,Identify and recover from leader changes

首先,你必须知道读哪个topic的哪个partition 

然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker 

再者,自己去写request并fetch数据 

最终,还要注意需要识别和处理broker leader的改变

 

 

三、代码如下:

package kafkatest.kakfademo;

 

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.cluster.BrokerEndPoint;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

 

import kafka.javaapi.FetchResponse;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

 

public class SimpleExample {

public static void main(String args[]) {

SimpleExample example = new SimpleExample();

long maxReads = 10;

String topicName = "test";

int partition = 0;

List<String> seeds = new ArrayList<String>();

seeds.add("hadoop0");

int port = 9092;

try {

example.run(maxReads, topicName, partition, seeds, port);

} catch (Exception e) {

System.out.println("Oops:" + e);

e.printStackTrace();

}

}

 

private List<String> m_replicaBrokers = new ArrayList<String>();

 

public SimpleExample() {

m_replicaBrokers = new ArrayList<String>();

}

 

public void run(long a_maxReads, String a_topic, int a_partition,

List<String> a_seedBrokers, int a_port) throws Exception {

// find the meta data about the topic and partition we are interested in

//

PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,

a_partition);

if (metadata == null) {

System.out

.println("Can't find metadata for Topic and Partition. Exiting");

return;

}

if (metadata.leader() == null) {

System.out

.println("Can't find Leader for Topic and Partition. Exiting");

return;

}

String leadBroker = metadata.leader().host();

String clientName = "Client_" + a_topic + "_" + a_partition;

 

SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,

100000, 64 * 1024, clientName);

long readOffset = getLastOffset(consumer, a_topic, a_partition,

kafka.api.OffsetRequest.EarliestTime(), clientName);

 

int numErrors = 0;

while (a_maxReads > 0) {

if (consumer == null) {

consumer = new SimpleConsumer(leadBroker, a_port, 100000,

64 * 1024, clientName);

}

// Note: this fetchSize of 100000 might need to be increased if

// large batches are written to Kafka

FetchRequest req = new FetchRequestBuilder().clientId(clientName)

.addFetch(a_topic, a_partition, readOffset, 100000).build();

FetchResponse fetchResponse = consumer.fetch(req);

 

if (fetchResponse.hasError()) {

numErrors++;

// Something went wrong!

short code = fetchResponse.errorCode(a_topic, a_partition);

System.out.println("Error fetching data from the Broker:"

+ leadBroker + " Reason: " + code);

if (numErrors > 5)

break;

if (code == ErrorMapping.OffsetOutOfRangeCode()) {

// We asked for an invalid offset. For simple case ask for

// the last element to reset

readOffset = getLastOffset(consumer, a_topic, a_partition,

kafka.api.OffsetRequest.LatestTime(), clientName);

continue;

}

consumer.close();

consumer = null;

leadBroker = findNewLeader(leadBroker, a_topic, a_partition,

a_port);

continue;

}

numErrors = 0;

 

long numRead = 0;

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(

a_topic, a_partition)) {

long currentOffset = messageAndOffset.offset();

if (currentOffset < readOffset) {

System.out.println("Found an old offset: " + currentOffset

+ " Expecting: " + readOffset);

continue;

}

readOffset = messageAndOffset.nextOffset();

ByteBuffer payload = messageAndOffset.message().payload();

 

byte[] bytes = new byte[payload.limit()];

payload.get(bytes);

System.out.println(String.valueOf(messageAndOffset.offset())

+ ": " + new String(bytes, "UTF-8"));

numRead++;

a_maxReads--;

}

 

if (numRead == 0) {

try {

Thread.sleep(1000);

} catch (InterruptedException ie) {

}

}

}

if (consumer != null)

consumer.close();

}

 

public static long getLastOffset(SimpleConsumer consumer, String topic,

int partition, long whichTime, String clientName) {

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,

partition);

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(

whichTime, 1));

kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

requestInfo, kafka.api.OffsetRequest.CurrentVersion(),

clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

 

if (response.hasError()) {

System.out

.println("Error fetching data Offset Data the Broker. Reason: "

+ response.errorCode(topic, partition));

return 0;

}

long[] offsets = response.offsets(topic, partition);

return offsets[0];

}

 

private String findNewLeader(String a_oldLeader, String a_topic,

int a_partition, int a_port) throws Exception {

for (int i = 0; i < 3; i++) {

boolean goToSleep = false;

PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,

a_topic, a_partition);

if (metadata == null) {

goToSleep = true;

} else if (metadata.leader() == null) {

goToSleep = true;

} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())

&& i == 0) {

// first time through if the leader hasn't changed give

// ZooKeeper a second to recover

// second time, assume the broker did recover before failover,

// or it was a non-Broker issue

//

goToSleep = true;

} else {

return metadata.leader().host();

}

if (goToSleep) {

try {

Thread.sleep(1000);

} catch (InterruptedException ie) {

}

}

}

System.out

.println("Unable to find new leader after Broker failure. Exiting");

throw new Exception(

"Unable to find new leader after Broker failure. Exiting");

}

 

private PartitionMetadata findLeader(List<String> a_seedBrokers,

int a_port, String a_topic, int a_partition) {

PartitionMetadata returnMetaData = null;

loop: for (String seed : a_seedBrokers) {

SimpleConsumer consumer = null;

try {

consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,

"leaderLookup");

List<String> topics = Collections.singletonList(a_topic);

TopicMetadataRequest req = new TopicMetadataRequest(topics);

kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

 

List<TopicMetadata> metaData = resp.topicsMetadata();

for (TopicMetadata item : metaData) {

for (PartitionMetadata part : item.partitionsMetadata()) {

if (part.partitionId() == a_partition) {

returnMetaData = part;

break loop;

}

}

}

} catch (Exception e) {

System.out.println("Error communicating with Broker [" + seed

+ "] to find Leader for [" + a_topic + ", "

+ a_partition + "] Reason: " + e);

} finally {

if (consumer != null)

consumer.close();

}

}

if (returnMetaData != null) {

m_replicaBrokers.clear();

for (BrokerEndPoint replica : returnMetaData.replicas()) {

m_replicaBrokers.add(replica.host());

}

}

return returnMetaData;

}

}

 

 

四、消费结果如下:



 

 



 

  • 大小: 75.2 KB
  • 大小: 13.4 KB
  • 大小: 43.8 KB
0
1
分享到:
评论

相关推荐

    大数据Kafka入门--理论+实践

    Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它最初是由LinkedIn公司开发,之后成为...总体而言,Kafka凭借其高性能、高可靠性和高可用性的特性,已经成为了大数据领域不可或缺的组件之一。

    kafka-2.12-3.6.1.tgz

    /usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...

    kafka_2.12-2.4.1.zip

    - 使用`./kafka-console-producer.sh --broker-list localhost:9092 --topic test`和`./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning`进行消息的生产和消费,操作...

    kafka_2.11-2.2.2.tgz

    Kafka提供了丰富的命令行工具,如`kafka-topics.sh`、`kafka-consumer-groups.sh`等,用于查看和管理Topics、Partitions、Consumer Groups等。 十一、注意事项 1. Kafka默认使用9092端口,确保没有其他服务占用。 ...

    最新版kafka kafka_2.12-2.5.1.tgz

    - 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning` 7. **最佳实践** - 为了保证高可用性,推荐至少配置 3 个 broker。 - 合理设置分区数和...

    kafka-eagle-bin-2.1.0.tar.gz

    4. 启动:执行`bin/kafka-eagle-start.sh`启动服务,使用`bin/kafka-eagle-stop.sh`停止服务。 5. 访问:在Web浏览器中输入`http://服务器IP:端口`,按照提示进行登录和操作。 ### 使用注意事项 1. 确保运行环境已...

    kafka_2.12-3.3.1.tgz

    - `bin`目录:包含了启动和管理Kafka服务的脚本,如`kafka-server-start.sh`、`kafka-console-producer.sh`和`kafka-console-consumer.sh`。 - `config`目录:包含默认配置文件,如`server.properties`(配置Kafka ...

    kafka-python-2.0.2.tar.gz

    《Python中的Kafka库:kafka-python-2.0.2深入解析》 在Python编程环境中,Kafka作为分布式消息系统的接口,对于实时数据处理和流处理应用至关重要。`kafka-python`是Python社区开发的一个非常流行的Kafka客户端库...

    kafka_2.11-2.0.0.tgz

    - 使用bin/kafka-topics.sh脚本创建主题,指定主题名、分区数量和副本数量。 4. **生产者与消费者编程** - **生产者示例**:使用Kafka的Java API,编写生产者代码,通过new Producer()创建实例,然后使用send()...

    最新版windows kafka_2.12-2.4.1.tgz

    9. Kafka提供了一些内置的命令行工具,如`kafka-topics.sh`和`kafka-consumer-groups.sh`,用于检查主题信息和消费者组状态。此外,还可以使用Kafka的管理界面Kafka Manager或者Kafka监控工具如Kafka Connect、Kafka...

    kafka-2.12-0.10.2.0文件安装包

    7. **生产与消费消息**:使用kafka-console-producer.sh和kafka-console-consumer.sh脚本进行消息的生产和消费测试。 需要注意的是,由于Kafka最初设计时更多考虑的是Linux环境,因此在Windows上部署可能会遇到一些...

    kafka资源下载kafka_2.11-2.0.0.tgz

    - 使用Kafka自带的监控工具,如`kafka-topics.sh`、`kafka-consumer-groups.sh`等。 - 集成第三方监控系统,如Prometheus和Grafana,实现可视化监控。 总之,Kafka作为一种成熟的消息队列解决方案,在大数据处理...

    下载慢?给你Kafka 2.xx所有版本下载的百度网盘链接

    kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, kafka_2.11-2.3.0.tgz, kafka_2.11-2.3.1.tgz, ...

    最新版linux kafka_2.12-2.5.0.tgz

    5. 创建主题:`./kafka_2.12-2.5.0/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic` 6. 生产和消费消息:使用Kafka提供的命令行工具或...

    数据科学与大数据技术;kafka-linux安装包

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 了解这些基本操作后,你就可以开始探索Kafka的更高级特性,如Kafka Connect用于集成外部系统,Kafka ...

    kafka-2.13-3.5.1.tgz

    在“kafka-2.13-3.5.1.tgz”这个压缩包中,包含了Kafka的一个特定版本——3.5.1,该版本支持Scala 2.13的构建。下面我们将深入探讨Kafka的核心概念、功能以及如何使用这个版本。 1. **Kafka的基本概念** - **发布...

    kafka-2.13-3.4.0.tgz

    这种特性使得Kafka在大数据领域被广泛应用,用于日志收集、流式处理和实时分析。 3. **高吞吐量**: Kafka的设计允许它在单个节点上每秒处理数十万条消息,这得益于其高效的I/O模型和批量操作。在分布式环境中,...

    kafka-2.13-3.7.0.tgz

    1. 下载最新版本,例如“kafka-2.13-3.7.0.tgz”。 2. 解压文件,如“tar -xzf kafka_2.13-3.7.0.tgz”。 3. 配置环境变量,如在bashrc文件中添加KAFKA_HOME和PATH。 4. 启动Zookeeper服务,它是Kafka依赖的协调服务...

    最新版kafka kafka_2.13-2.6.0.tgz

    总之,Kafka 2.13-2.6.0是面向大数据实时处理的有力工具,它的高性能、高可用性和可扩展性使其在大数据领域占据重要地位。无论是日志收集、数据整合还是实时分析,Kafka都能提供可靠的解决方案。

Global site tag (gtag.js) - Google Analytics