Kafka Consumer的底层API- SimpleConsumer
1.Kafka提供了两套API给Consumer
- The high-level Consumer API
- The SimpleConsumer API
第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情
- 一个消息读取多次
- 在一个处理过程中只消费Partition其中的一部分消息
- 添加事务管理机制以保证消息被处理且仅被处理一次
2.使用SimpleConsumer有哪些弊端呢?
- 从所有活跃的broker中找出哪个是指定Topic Partition中的leader broker
- 找出指定Topic Partition中的所有备份broker
- 构造请求
- 发送请求查询数据
- 处理leader broker变更
4.代码实例
package bonree.consumer; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; public class SimpleExample { private List<String> m_replicaBrokers = new ArrayList<String>(); public SimpleExample() { m_replicaBrokers = new ArrayList<String>(); } public static void main(String args[]) { SimpleExample example = new SimpleExample(); // 最大读取消息数量 long maxReads = Long.parseLong("3"); // 要订阅的topic String topic = "mytopic"; // 要查找的分区 int partition = Integer.parseInt("0"); // broker节点的ip List<String> seeds = new ArrayList<String>(); seeds.add("192.168.4.30"); seeds.add("192.168.4.31"); seeds.add("192.168.4.32"); // 端口 int port = Integer.parseInt("9092"); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // 获取指定Topic partition的元数据 PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) { break; } if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for // the last element to reset readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) { consumer.close(); } } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /** * @param a_oldLeader * @param a_topic * @param a_partition * @param a_port * @return String * @throws Exception 找一个leader broker */ private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give // ZooKeeper a second to recover // second time, assume the broker did recover before failover, // or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == a_partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) { consumer.close(); } } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (kafka.cluster.Broker replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; } }
相关推荐
包含翻译后的API文档:kafka-clients-2.4.1-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.4.1; 标签:apache、kafka、clients、中英对照文档、jar包、java; 使用...
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
包含翻译后的API文档:kafka-clients-2.0.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.0.1; 标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后...
包含翻译后的API文档:kafka-clients-2.0.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.0.0; 标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后...
包含翻译后的API文档:kafka-clients-2.4.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.4.1; 标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后...
包含翻译后的API文档:kafka-clients-2.2.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.2.0; 标签:apache、kafka、clients、中文文档、jar包、java; 使用方法:解压翻译后...
包含翻译后的API文档:kafka-clients-2.0.0-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.0.0; 标签:apache、kafka、clients、中英对照文档、jar包、java; 使用...
包含翻译后的API文档:flink-connector-kafka_2.12-1.14.3-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.flink:flink-connector-kafka_2.12:1.14.3; 标签:apache、flink、connector、...
包含翻译后的API文档:flink-connector-kafka-base_2.11-1.10.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.flink:flink-connector-kafka-base_2.11:1.10.0; 标签:flink、11、apache、base_2、...
包含翻译后的API文档:flink-connector-kafka-0.9_2.11-1.10.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.flink:flink-connector-kafka-0.9_2.11:1.10.0; 标签:flink、0、11、apache、connector...
- 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning` 7. **最佳实践** - 为了保证高可用性,推荐至少配置 3 个 broker。 - 合理设置分区数和...
包含翻译后的API文档:kafka-clients-0.10.0.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:0.10.0.1; 标签:apache、clients、kafka、jar包、java、API文档、中文版; 使用...
包含翻译后的API文档:kafka-clients-2.0.1-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:org.apache.kafka:kafka-clients:2.0.1; 标签:apache、kafka、clients、中英对照文档、jar包、java; 使用...
kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-...
包含翻译后的API文档:kafka-clients-0.10.1.1-javadoc-API文档-中文(简体)版.zip; Maven坐标:org.apache.kafka:kafka-clients:0.10.1.1; 标签:apache、clients、kafka、jar包、java、API文档、中文版; 使用...
4. 启动:执行`bin/kafka-eagle-start.sh`启动服务,使用`bin/kafka-eagle-stop.sh`停止服务。 5. 访问:在Web浏览器中输入`http://服务器IP:端口`,按照提示进行登录和操作。 ### 使用注意事项 1. 确保运行环境已...
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=your-password]' --entity-type users --entity-name your-user ``` 3. **配置客户端**:在...
/usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...
包含翻译后的API文档:flink-connector-kafka_2.12-1.14.3-javadoc-API文档-中文(简体)版.zip 对应Maven信息:groupId:org.apache.flink,artifactId:flink-connector-kafka_2.12,version:1.14.3 使用方法:...
kafka-schema-registry-client-3.2.0.jar包,亲测可用,在aliyun仓库内找不到,可以下载此jar包来进行手动安装