`

kafka获得最新partition offset

阅读更多

kafka获得partition下标,需要用到kafka的simpleconsumer

 

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.Map.Entry;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.SimpleConsumer;

public class KafkaOffsetTools {

	public static void main(String[] args) {
		// 读取kafka最新数据
		// Properties props = new Properties();
		// props.put("zookeeper.connect",
		// "192.168.6.18:2181,192.168.6.20:2181,192.168.6.44:2181,192.168.6.237:2181,192.168.6.238:2181/kafka-zk");
		// props.put("zk.connectiontimeout.ms", "1000000");
		// props.put("group.id", "dirk_group");
		//
		// ConsumerConfig consumerConfig = new ConsumerConfig(props);
		// ConsumerConnector connector =
		// Consumer.createJavaConsumerConnector(consumerConfig);

		String topic = "dirkz";
		String seed = "118.26.148.18";
		int port = 9092;
		if (args.length >= 3) {
			topic = args[0];
			seed = args[1];
			port = Integer.valueOf(args[2]);
		}
		List<String> seeds = new ArrayList<String>();
		seeds.add(seed);
		KafkaOffsetTools kot = new KafkaOffsetTools();

		TreeMap<Integer,PartitionMetadata> metadatas = kot.findLeader(seeds, port, topic);
		
		int sum = 0;
		
		for (Entry<Integer,PartitionMetadata> entry : metadatas.entrySet()) {
			int partition = entry.getKey();
			String leadBroker = entry.getValue().leader().host();
			String clientName = "Client_" + topic + "_" + partition;
			SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000,
					64 * 1024, clientName);
			long readOffset = getLastOffset(consumer, topic, partition,
					kafka.api.OffsetRequest.LatestTime(), clientName);
			sum += readOffset;
			System.out.println(partition+":"+readOffset);
			if(consumer!=null)consumer.close();
		}
		System.out.println("总和:"+sum);

	}

	public KafkaOffsetTools() {
//		m_replicaBrokers = new ArrayList<String>();
	}

//	private List<String> m_replicaBrokers = new ArrayList<String>();

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
				whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
				requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
				clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
//		long[] offsets2 = response.offsets(topic, 3);
		return offsets[0];
	}

	private TreeMap<Integer,PartitionMetadata> findLeader(List<String> a_seedBrokers,
			int a_port, String a_topic) {
		TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
		loop: for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
						"leaderLookup"+new Date().getTime());
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						map.put(part.partitionId(), part);
//						if (part.partitionId() == a_partition) {
//							returnMetaData = part;
//							break loop;
//						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", ] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
//		if (returnMetaData != null) {
//			m_replicaBrokers.clear();
//			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
//				m_replicaBrokers.add(replica.host());
//			}
//		}
		return map;
	}

}

 

2
2
分享到:
评论
4 楼 dsxwjhf 2016-11-07  
Good job !!
3 楼 SherJamYu 2016-09-04  
15899231727 写道
新手讨厌不贴整个工程代码的大侠

需要什么工程?maven把库加载了不就行了,还要手把手教hello world吗
2 楼 15899231727 2016-09-01  
新手讨厌不贴整个工程代码的大侠
1 楼 liubey 2015-06-16  
看zk不行吗

相关推荐

    kafka中partition和消费者对应关系1

    3. **消费状态监控**:通过`kafka-consumer-groups.sh --describe`命令可以查看消费者组的消费状态,包括每个Partition的当前偏移量(CURRENT-OFFSET)、日志结束偏移量(LOG-END-OFFSET)、滞后量(LAG)以及消费者...

    kafka tool offset explorer 2.2

    通过此工具,我们可以查看每个分区(partition)的最新offset、最小offset以及每个消费者组的committed offset,这对于监控消费者的状态和数据处理进度至关重要。 该工具的用户界面简洁明了,分为多个主要部分:...

    springboot中如何实现kafa指定offset消费

    在 Kafka 中,每个主题(Topic)存在多个分区(Partition),每个分区自己维护一个偏移量(Offset)。我们的目标是实现 Kafka Consumer 指定 Offset 消费。在这里,我们使用 Consumer--&gt;Partition 一对一的消费模型...

    RdKafka::KafkaConsumer使用实例

    假设我们希望从最新的偏移量开始,我们可以使用`rd_kafka_offset_store()`存储当前的高水印: ```cpp rd_kafka_topic_partition_list_t *offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_...

    kafka可视化工具--kafkatool

    4. **查看与管理Offsets**:Kafkatool允许用户查看每个partition的最小和最大offset,以及consumer group的当前offset。用户还可以手动设置或重置offset,这对于调试和测试是非常有用的。 5. **创建与删除Topic**:...

    kafka原理文档1

    *Offset:Offset是消息在Partition中的唯一编号,用于标识消息的顺序。 五、日志策略 kafka的日志策略主要包括以下几个方面: *日志保留策略:kafka使用保留策略来删除陈旧的消息,避免磁盘被占满。 *日志压缩...

    kafka集群部署文档(部署,运维,FAQ)

    - **Partition**:Topic下的子单元,每个Partition内的消息是有序的,并且拥有唯一的偏移量(offset)标识每一则消息。 - **消息存储策略**:Kafka集群会保存所有未过期的消息,用户可以根据需求设置消息的过期时间,...

    Kafka工作原理详解

    ### Kafka工作原理详解 #### 一、Kafka的角色与组件 ...通过对Topic、Partition、Offset等概念的理解,以及对Zookeeper在Kafka中的关键作用的认识,可以帮助我们更好地掌握Kafka的工作原理及其应用场景。

    kafka存储机制.docx

    Kafka 文件存储机制可以分为四个步骤:topic 中 partition 存储分布、partition 中文件存储方式、partition 中 segment 文件存储结构、在 partition 中如何通过 offset 查找 message。 Topic、Partition、Segment ...

    kafka-manager-1.3.0.8.zip

    latestOffset值表示Partition中的最新消息的Offset,这在监控数据流入和追踪消息消费进度时非常有用。通过Kafka Manager,用户可以实时查看各个Partition的latestOffset,从而了解数据流的实时状态。 更值得一提的...

    kafka简介

    - **Offset管理**: Kafka不提供额外的索引机制来存储Offset,这是因为Kafka假设消息的读取是顺序的。Consumer需要自己管理Offset,以便追踪已经读取的消息。 **2. 分布式特性** - **Partition分布**: Kafka将一个...

    kafka集群搭建与使用

    每个 partition 中的消息都有一个唯一的编号,称之为 offset,用来唯一标示某个分区中的 message。 Kafka 的性能和数据保留 Kafka 的性能与保留的数据量的大小没有关系,因此保存大量的数据(日志信息)不会有什么...

    Kafka简介及使用PHP处理Kafka消息

    partition 中的每条消息都会被分配一个有序的 id(offset)。 3. Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。 4. Producers:消息和数据生产者,向 Kafka 的一个 topic...

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...

    kafka-java-demo 基于java的kafka生产消费者示例

    主题可以被分成多个分区(Partition),每个分区可以有多个副本(Replica)以实现数据冗余和高可用性。 【多线程与Kafka】 在"Kafka-java-demo"中,你可能会看到如何利用多线程来并行处理生产或消费任务,提升数据...

    kafka基础学习ppt

    2. **指定Partition**:客户端可以选择特定的Partition来发送消息,如果没有指定,则Kafka会根据配置自动选择一个Partition。 3. **消息发送**:一旦选择了Partition,消息就会被发送到该Partition,并分配一个唯一...

    kafkatool的window64版本

    6. **集群监控**:Kafkatool还可以显示Kafka集群的整体状态,包括Brokers的状态、Partition的状态等,帮助运维人员及时发现并解决问题。 7. **配置管理**:对于Kafka的配置文件,Kafkatool也提供了便捷的编辑和管理...

    kafka读取写入数据

    Kafka 的数据模型主要包括 Topic 和 Partition。Topic 是消息的主题,用户可以根据业务需求创建不同的 Topic。Partition 是 Topic 的物理分片,每个 Partition 是一个有序且不可变的消息序列,消息按照生产顺序添加...

    kafka eagle 1.4.8安装包kafka eagle 1.4.8

    Kafka Eagle是一款由国内开发者智能洛里(smartloli)开源的Kafka监控系统,它为Apache Kafka提供了一种直观且功能丰富的管理界面,帮助用户更好地监控、管理和优化Kafka集群。在1.4.8版本中,该系统继续提升了用户...

Global site tag (gtag.js) - Google Analytics