1)Read a message multiple times
2)Consume only a subset of the partitions in a topic in a process
3)Manage transactions to make sure a message is processed once and only once
1)Find an active Broker and find out which Broker is the leader for your topic and partition
2)Determine who the replica Brokers are for your topic and partition
3)Build the request defining what data you are interested in
4)Fetch the data ,Identify and recover from leader changes
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
最终,还要注意需要识别和处理broker leader的改变
package kafkatest.kakfademo;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
public class SimpleExample {
public static void main(String args[]) {
SimpleExample example = new SimpleExample();
long maxReads = 10;
String topicName = "test";
int partition = 0;
List<String> seeds = new ArrayList<String>();
int port = 9092;
try {
example.run(maxReads, topicName, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
private List<String> m_replicaBrokers = new ArrayList<String>();
public SimpleExample() {
m_replicaBrokers = new ArrayList<String>();
public void run(long a_maxReads, String a_topic, int a_partition,
List<String> a_seedBrokers, int a_port) throws Exception {
// find the meta data about the topic and partition we are interested in
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
if (metadata == null) {
.println("Can't find metadata for Topic and Partition. Exiting");
if (metadata.leader() == null) {
.println("Can't find Leader for Topic and Partition. Exiting");
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000,
64 * 1024, clientName);
// Note: this fetchSize of 100000 might need to be increased if
// large batches are written to Kafka
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
if (numErrors > 5)
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset
+ " Expecting: " + readOffset);
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
+ ": " + new String(bytes, "UTF-8"));
if (numRead == 0) {
try {
} catch (InterruptedException ie) {
if (consumer != null)
public static long getLastOffset(SimpleConsumer consumer, String topic,
int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
return 0;
long[] offsets = response.offsets(topic, partition);
return offsets[0];
private String findNewLeader(String a_oldLeader, String a_topic,
int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&& i == 0) {
// first time through if the leader hasn't changed give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
goToSleep = true;
} else {
return metadata.leader().host();
if (goToSleep) {
try {
} catch (InterruptedException ie) {
.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception(
"Unable to find new leader after Broker failure. Exiting");
private PartitionMetadata findLeader(List<String> a_seedBrokers,
int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop: for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed
+ "] to find Leader for [" + a_topic + ", "
+ a_partition + "] Reason: " + e);
} finally {
if (consumer != null)
if (returnMetaData != null) {
for (BrokerEndPoint replica : returnMetaData.replicas()) {
return returnMetaData;
《Python中的Kafka库:kafka-python-2.0.2深入解析》 在Python编程环境中,Kafka作为分布式消息系统的接口,对于实时数据处理和流处理应用至关重要。`kafka-python`是Python社区开发的一个非常流行的Kafka客户端库
