对于kafka的consumer接口,提供两种版本,
high-level
一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset
参考, https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
不过要注意一些注意事项,对于多个partition和多个consumer
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的
import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroupExample {private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置 createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int a_numThreads) { // 创建并发的consumers Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream // now launch all the threads// executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages// int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } }
SimpleConsumer
另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
参考, https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
什么时候用这个接口?
- Read a message multiple times
- Consume only a subset of the partitions in a topic in a process
- Manage transactions to make sure a message is processed once and only once
当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦
所以不是一定要用,最好别用
- You must keep track of the offsets in your application to know where you left off consuming.
- You must figure out which Broker is the lead Broker for a topic and partition
- You must handle Broker leader changes
使用SimpleConsumer的步骤:
- Find an active Broker and find out which Broker is the leader for your topic and partition
- Determine who the replica Brokers are for your topic and partition
- Build the request defining what data you are interested in
- Fetch the data
- Identify and recover from leader changes
首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变
逐步来看,
Finding the Lead Broker for a Topic and Partition
思路就是,遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata,如果找到我们要找的partition就返回
根据返回的PartitionMetadata.leader().host()找到leader broker
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { //遍历每个broker SimpleConsumer consumer = null; try { //创建Simple Consumer, //class SimpleConsumer(val host: String,val port: Int,val soTimeout: Int // ,val bufferSize: Int,val clientId: String) consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); // kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); //发送TopicMetadata Request请求 List<TopicMetadata> metaData = resp.topicsMetadata(); //取到Topic的Metadata for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) {//遍历每个partition的metadata if (part.partitionId() == a_partition) { //确认是否是我们要找的partition returnMetaData = part; break loop; //找到就返回 } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } return returnMetaData; }
Finding Starting Offset for Reads
request主要的信息就是 Map <TopicAndPartition, PartitionOffsetRequestInfo>
TopicAndPartition就是对topic和partition信息的封装
PartitionOffsetRequestInfo的定义
case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
其中参数time,表示where to start reading data,两个取值
kafka.api.OffsetRequest.EarliestTime(),the beginning of the data in the logs
kafka.api.OffsetRequest.LatestTime(),will only stream new messages
不要认为起始的offset一定是0,因为messages会过期,被删除
另外一个参数不清楚什么含义,代码中取的是1
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); //build offset fetch request info kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response = consumer.getOffsetsBefore(request); //取到offsets if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); //取到的一组offset return offsets[0]; //取第一个开始读 }
Reading the Data
首先在FetchRequest上加上Fetch,指明topic,partition,开始的offset,读取的大小
如果producer在写入很大的message时,也许这里指定的1000000是不够的,会返回an empty message set,这时需要增加这个值,直到得到一个非空的message set。
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.// Setting the replicaId incorrectly will cause the brokers to behave incorrectly. FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) // 1000000bytes .build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { // See Error Handling } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { // 必要判断,因为对于compressed message,会返回整个block,所以可能包含old的message System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); // 获取下一个readOffset ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } }
Error Handling
if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // 处理offset非法的问题,用最新的offset // We asked for an invalid offset. For simple case ask for the last element to reset readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); // 更新leader broker continue; }
没有特别的逻辑,只是重新调用findLeader获取leader broker
并且防止在切换过程中,取不到leader信息,加上sleep逻辑
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue// goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); }
Full Source Code
package com.test.simple; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class SimpleExample {public static void main(String args[]) { SimpleExample example = new SimpleExample(); long maxReads = Long.parseLong(args[0]); String topic = args[1]; int partition = Integer.parseInt(args[2]); List<String> seeds = new ArrayList<String>(); seeds.add(args[3]); int port = Integer.parseInt(args[4]); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } private List<String> m_replicaBrokers = new ArrayList<String>(); public SimpleExample() { m_replicaBrokers = new ArrayList<String>(); } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // find the meta data about the topic and partition we are interested in// PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka .build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for the last element to reset readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover// second time, assume the broker did recover before failover, or it was a non-Broker issue// goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == a_partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (kafka.cluster.Broker replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; } }
相关推荐
1. **创建Consumer实例**:首先,我们需要配置一个`Properties`对象,设置必要的参数如bootstrap servers、group id等,然后使用`KafkaConsumer`类的构造函数创建消费者实例。 2. **订阅主题**:消费者通过调用`...
《Go-consumergroup:构建基于Golang的Kafka消费者库》 在现代软件开发中,消息队列系统如Apache Kafka扮演着至关重要的角色,它提供了高效、可靠的异步通信能力。而Go语言以其简洁的语法和高性能特性,成为了编写...
【标题】"kafka_hdfs_consumer"涉及到的关键技术是将数据从Kafka消费并存储到HDFS(Hadoop Distributed File System)中。这个过程通常在大数据处理和流处理场景下非常常见,它允许实时或近实时的数据从消息队列流向...
4. 消费消息:创建消费者实例,订阅主题,使用KafkaConsumer接口获取并处理消息。 四、最佳实践 1. 数据保留策略:设置合理的保留时间和大小,避免数据过量积累占用磁盘空间。 2. 集群扩展:根据业务需求,适时...
"php_rdkafka"是PHP与librdkafka库的接口,librdkafka是一个高效的C语言实现的Kafka客户端。这个库的核心优势在于其性能和稳定性。而"php-kafka-consumer"在此基础上添加了与Zookeeper的交互功能,Zookeeper是Apache...
4. **动态创建消费者**:在Spring Boot中,我们通常通过监听器接口(如`KafkaListener`)来创建消费者。然而,如果你需要动态创建消费者,可以使用`@KafkaListener`配合`@ConditionalOnProperty`注解,根据特定的...
KafkaConsumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, String> records = consumer.poll(Duration.ofMillis(100)); ...
- KafkaConsumer:是创建消费者实例的静态接口。 - Consumer:是消费者结构,处理消费相关的逻辑。 - KafkaConsumerImpl:是消费者实现的具体封装。 librdkafka还提供了消费者回调机制,如ConsumeCb和EventCb,它们...
Kafka插件则为这些功能提供了接口和便利。 ### Kafka插件的类型 1. **生产者插件**: 这类插件允许应用程序作为Kafka的数据生产者,将消息发布到Kafka主题。例如,Java或Python的Kafka生产者库,它们提供了API来...
KafkaConsumer的API和配置细节可以参考KafkaConsumer相关文档。消费者的迭代器会返回ConsumerRecords,这是简单命名元组,用于暴露基本消息属性,如topic、partition等。 开发者在使用kafka-python时需要注意版本...
KafkaConsumer, String> consumer = new KafkaConsumer(consumerProps); ``` 订阅感兴趣的主题,然后开始循环消费: ```java consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, ...
标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...
这个库提供了Producer和Consumer接口,使得我们能够方便地编写生产消息和消费消息的Java应用。在"Kafka-java-demo"中,你将看到如何使用这些接口来实现一个简单的生产者和消费者示例。 【Kafka Producer】 Kafka...
这里,我们创建了`KafkaConsumer`类,配置了消费组ID和自动重置偏移量的策略。`StartConsuming`方法启动消费过程,当接收到消息时,会调用传入的`handleMessage`回调函数。 现在,我们可以在主程序中创建生产者和...
consumer = KafkaConsumer('my-topic', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest') for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message....
KafkaConsumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, String> records = consumer.poll(100); for (ConsumerRecord, ...
KafkaConsumer, String> consumer = new KafkaConsumer(props); ``` 2. **订阅 Topic**: ```java consumer.subscribe(Arrays.asList("my-topic")); ``` 3. **读取消息**: ```java while (true) { ...
这些API为开发者提供了在Python中操作Kafka的便捷接口,允许他们实现生产者和消费者的逻辑,包括消息的发布、订阅、提交位移以及查询offset等功能。通过熟练掌握这些API,开发人员可以在Python环境中高效地利用Kafka...
KafkaConsumer, String> consumer = new KafkaConsumer(consumerProps); ``` 接下来,订阅主题并启动消费循环: ```java consumer.subscribe(Collections.singletonList("my-topic")); while (true) { ...
Consumer, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords, String> records = consumer.poll(100); for (ConsumerRecord, ...