背景概述 写道
kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中。
管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写)
上述开源kafka管理工具对于配置有KAFKA SASL安全认证的集群,需要修改相应代码重新编译,也就是直接下载连接不上kafka集群。KafkaOffsetMonitor、kafka-manager都是采用scala语言编写,本节程序大都用Java程序编写,少部份采用JAVA调用了scala接口方法。如果有必要我们下节将KafkaOffsetMonitor升级,增加kafka安全性支持。
Kafka SASL安全配置,本节暂不做讲解。
管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写)
上述开源kafka管理工具对于配置有KAFKA SASL安全认证的集群,需要修改相应代码重新编译,也就是直接下载连接不上kafka集群。KafkaOffsetMonitor、kafka-manager都是采用scala语言编写,本节程序大都用Java程序编写,少部份采用JAVA调用了scala接口方法。如果有必要我们下节将KafkaOffsetMonitor升级,增加kafka安全性支持。
Kafka SASL安全配置,本节暂不做讲解。
KafkaConsumer 接口描述 写道
KafkaConsumer 接口
1)Map<String, List<PartitionInfo>> listTopics() 返回所有主题分区列表信息
2)List<PartitionInfo> partitionsFor(String topic) 返回指定主题的分区列表信息
3)Map<TopicPartition, Long> beginningOffsets(List<TopicPartition> topicPartition) 返回指定分区列表中所有分区开始偏移Offset
4)Map<TopicPartition, Long> endOffsets(List<TopicPartition> topicPartition) 返回指定分区列表中所有分区结束偏移Offset(即为LogEndOffset)
KafkaConsumer 线程不安全,所在close方法上加上锁; 通下述接口可以获取主题及分区Offset信息。
1)Map<String, List<PartitionInfo>> listTopics() 返回所有主题分区列表信息
2)List<PartitionInfo> partitionsFor(String topic) 返回指定主题的分区列表信息
3)Map<TopicPartition, Long> beginningOffsets(List<TopicPartition> topicPartition) 返回指定分区列表中所有分区开始偏移Offset
4)Map<TopicPartition, Long> endOffsets(List<TopicPartition> topicPartition) 返回指定分区列表中所有分区结束偏移Offset(即为LogEndOffset)
KafkaConsumer 线程不安全,所在close方法上加上锁; 通下述接口可以获取主题及分区Offset信息。
package com.sunshine.kafka; import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.locks.ReentrantLock; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import com.sunshine.boot.MainBootApplication; /** * Kafa meta informations * @author oy * */ public class KafkaConsumeMeta { private final KafkaConsumer<Integer, String> consumer; private final String topic; private ReentrantLock lock = new ReentrantLock(); public KafkaConsumeMeta(String servers) { this(null, servers); } public Properties getDefaultProperties(){ Properties props = new Properties(); // kafka server //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.178:9092"); // group id props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000"); //props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30000"); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "670000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // load props Map<String, String> map = MainBootApplication.getAllPropertiesByPrefix("kafka"); for(Map.Entry<String, String> entry : map.entrySet()){ props.put(entry.getKey(), entry.getValue()); } return props; } public KafkaConsumeMeta(String topic, String servers) { Properties props = getDefaultProperties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); consumer = new KafkaConsumer<>(props); this.topic = topic; } public KafkaConsumeMeta(String servers, boolean security) { this(null, servers, security); } public KafkaConsumeMeta(String topic, String servers, boolean security) { Properties props = getDefaultProperties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); // 安全 if(security){ props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); } consumer = new KafkaConsumer<>(props); this.topic = topic; } /** * 查询所有主题分区信息 * key=topic * value=List<PartitionInfo> * @return */ public Map<String, List<PartitionInfo>> getAllTopicPartiions(){ Map<String, List<PartitionInfo>> map = consumer.listTopics(); return map; } /** * 查询指定主题分区列表信息 * @param topic * @return */ public List<PartitionInfo> getPartitionInfo(String topic){ return consumer.partitionsFor(topic); } /** * * @param info * @return */ public TopicPartition transforTopicPartition(PartitionInfo info){ return new TopicPartition(info.topic(),info.partition()); } /** * * @param list * @return */ public Collection<TopicPartition> transforTopicPartition(List<PartitionInfo> list){ List<TopicPartition> result = new ArrayList<TopicPartition>(); for(PartitionInfo info : list){ result.add(transforTopicPartition(info)); } return result; } /** * 获取分区开始的偏移量 * @param partitions * @return */ public Map<TopicPartition, Long> getBeginningOffsets(List<TopicPartition> topicPartition){ return consumer.beginningOffsets(topicPartition); } /** * 获取分区结束的偏移量 * @param partitions * @return */ public Map<TopicPartition, Long> getEndOffsets(List<TopicPartition> topicPartition){ return consumer.endOffsets(topicPartition); } /** * @return the consumer */ public KafkaConsumer<Integer, String> getConsumer() { return consumer; } public OffsetAndMetadata getCurrentOffsetAndMetadata(TopicPartition tp){ return consumer.committed(new TopicPartition(tp.topic(),tp.partition())); } public void close(){ // consumer is unsafe thread object lock.lock(); try { if(consumer!=null) consumer.close(); } catch (Exception e) { e.printStackTrace(); } finally{ lock.unlock(); } } }
主题消费组Offfset信息 写道
管理监控中我们需要获取某个主题、使用某个组在各个分区消耗费位情况,以此来了解消费者的消费能力。
package com.sunshine.kafka; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sunshine.boot.MainBootApplication; import kafka.admin.AdminClient; import kafka.admin.AdminClient.ConsumerSummary; import kafka.common.TopicAndPartition; import kafka.coordinator.GroupOverview; import scala.Option; import scala.collection.JavaConverters; public class KafkaConsumeGroupMetaByAdminClient { private final AdminClient client ; private final String servers; private final boolean security; private final String groupId; private final String topic; private KafkaConsumer<String, String> consumer; private Logger log = LoggerFactory.getLogger(getClass()); private ReentrantLock lock = new ReentrantLock(); public KafkaConsumeGroupMetaByAdminClient(String servers, String topic, String groupId, boolean security){ Properties properties = new Properties(); String deserializer = new StringDeserializer().getClass().getName(); // kafka servers properties.put(ExtConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); properties.put(ExtConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ExtConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); properties.put(ExtConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500000"); properties.put(ExtConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer); properties.put(ExtConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); if (security) { properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); } this.security = security; this.servers = servers; this.groupId = groupId; this.topic = topic; this.client = AdminClient.create(properties); } public Set<GroupOverview> listAllConsumerGroups() { scala.collection.immutable.List<GroupOverview> list = client.listAllConsumerGroupsFlattened(); List<GroupOverview> result = JavaConverters.asJavaListConverter(list).asJava(); return new HashSet<GroupOverview>(result); } /** * 使用Kafka自带管理接口AdminClient获取组消费情况 * 使用同一组名消费的所有主题及分区列表信息 * @return */ public List<CounsumeGroupMode> getConsumeGroup() { lock.lock(); Option<scala.collection.immutable.List<ConsumerSummary>> option = client.describeConsumerGroup(groupId); try{ if(option.isEmpty()){ log.error("Consumer group "+groupId+" is rebalancing."); } else { List<ConsumerSummary> consumers = JavaConverters.asJavaListConverter(option.get()).asJava(); KafkaConsumer<String, String> consumer = getConsumer(); List<CounsumeGroupMode> result = new ArrayList<>(); for(ConsumerSummary consumerSummary : consumers){ List<TopicPartition> tps = JavaConverters.asJavaListConverter(consumerSummary.assignment()).asJava(); List<TopicAndPartition> taps = new ArrayList<>(); Map<TopicAndPartition, Long> partitionOffsets = new HashMap<TopicAndPartition, Long>(); for(TopicPartition tp : tps){ TopicAndPartition topicAndPartition = new TopicAndPartition(tp.topic(), tp.partition()); taps.add(topicAndPartition); OffsetAndMetadata offsetAndMetadata = consumer.committed(new TopicPartition(tp.topic(),tp.partition())); partitionOffsets.put(topicAndPartition, (offsetAndMetadata == null)?0:offsetAndMetadata.offset()); } List<CounsumeGroupMode> t = describeTopicPartition(groupId, taps, partitionOffsets,consumerSummary.clientId()+"_"+consumerSummary.clientHost()); result.addAll(t); } return result; } } catch(Exception e){ e.printStackTrace(); } finally{ lock.unlock(); } return Collections.EMPTY_LIST; } /** * 获取指定主题所有分区开始偏移Offset、最大Offset、当前组消费Offset * @return */ public List<ExtPartitionInfo> getTopicPartitionsAllOffset(){ List<ExtPartitionInfo> result = new ArrayList<ExtPartitionInfo>(); try{ lock.lock(); KafkaConsumer<String, String> consumer = getConsumer(); List<PartitionInfo> pis = consumer.partitionsFor(this.topic); List<TopicPartition> tps = transforTopicPartition(pis); Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps); //1 for (PartitionInfo pi : pis) { TopicPartition _tp = new TopicPartition(pi.topic(), pi.partition()); // 获取当前主题分区消息位置方法1 //OffsetAndMetadata offsetAndMetadata = consumer.committed(_tp); //long currentOffset = (offsetAndMetadata == null)?0:offsetAndMetadata.offset(); // 获取当前主题分区消息位置方法2 long currentOffset = getCurrentOffset(pi.topic(), pi.partition()); // 统一使用endOffsets值 // 原注释位置1先执行,所以注释位置2获取的值大于等于注释位置1的值 //long logendOffset = getLogEndOffset(pi.topic(), pi.partition()); //2 ExtPartitionInfo epi = new ExtPartitionInfo(pi.topic(), pi.partition(), pi.leader(), pi.replicas(), pi.inSyncReplicas(), beginOffsets.get(_tp), endOffsets.get(_tp), currentOffset, endOffsets.get(_tp)); result.add(epi); } Collections.sort(result); } catch(Exception e){ e.printStackTrace(); } finally{ lock.unlock(); } return result; } private List<TopicPartition> transforTopicPartition(List<PartitionInfo> list){ List<TopicPartition> result = new ArrayList<TopicPartition>(); for(PartitionInfo info : list){ result.add(transforTopicPartition(info)); } return result; } private TopicPartition transforTopicPartition(PartitionInfo info){ return new TopicPartition(info.topic(),info.partition()); } private List<CounsumeGroupMode> describeTopicPartition(String group, List<TopicAndPartition> topicPartitions, Map<TopicAndPartition, Long> partitionOffsets,String owner){ List<CounsumeGroupMode> rs = new ArrayList<>(); for(TopicAndPartition tap : topicPartitions){ long logEndOffset = getLogEndOffset(tap.topic(), tap.partition()); long currentOffset = partitionOffsets.get(tap); CounsumeGroupMode cgm = new CounsumeGroupMode(group, tap.topic(), tap.partition(), currentOffset, logEndOffset, owner); rs.add(cgm); } return rs; } /** * 查询当前主题分区消费Offset * @param topic * @param partition * @return */ private long getCurrentOffset(String topic, int partition){ KafkaConsumer<String, String> consumer = getConsumer(); TopicPartition topicPartition = new TopicPartition(topic, partition); consumer.assign(Arrays.asList(topicPartition)); long logEndOffset = consumer.position(topicPartition); return logEndOffset; } private long getLogEndOffset(String topic, int partition){ KafkaConsumer<String, String> consumer = getConsumer(); TopicPartition topicPartition = new TopicPartition(topic, partition); consumer.assign(Arrays.asList(topicPartition)); consumer.seekToEnd(Arrays.asList(topicPartition)); long logEndOffset = consumer.position(topicPartition); return logEndOffset; } public KafkaConsumer<String, String> getConsumer(){ if (consumer == null) consumer = createNewConsumer(); return consumer; } public List<Node> listAllBrokers() { return JavaConverters.asJavaListConverter(client.bootstrapBrokers()).asJava(); } public void close() { lock.lock(); try { client.close(); if (consumer != null) consumer.close(); } catch (Exception e) { e.printStackTrace(); } finally{ lock.unlock(); } } private KafkaConsumer<String, String> createNewConsumer() { Properties properties = new Properties(); String deserializer = new StringDeserializer().getClass().getName(); properties.put(ExtConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers); if(groupId != null) properties.put(ExtConsumerConfig.GROUP_ID_CONFIG, this.groupId); properties.put(ExtConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ExtConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // New consumer api // 用新组查询主题分区当前消费位置时,需要设置为earliest,默认为lastest properties.put(ExtConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "lastest"); properties.put(ExtConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500000"); properties.put(ExtConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer); properties.put(ExtConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); if (security) { properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); } return new KafkaConsumer<String, String>(properties); } public class CounsumeGroupMode implements Comparable<CounsumeGroupMode>{ public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getPartition() { return partition; } public void setPartition(int partition) { this.partition = partition; } public long getCurrent_offset() { return current_offset; } public void setCurrent_offset(long current_offset) { this.current_offset = current_offset; } public long getLog_eng_offset() { return log_eng_offset; } public void setLog_eng_offset(long log_eng_offset) { this.log_eng_offset = log_eng_offset; } public long getLAG() { return LAG; } public void setLAG(long lAG) { LAG = lAG; } public String getOwner() { return owner; } public void setOwner(String owner) { this.owner = owner; } private String group; private String topic; private int partition; private long current_offset; private long log_eng_offset; private long LAG; private String owner; public CounsumeGroupMode(String group, String topic, int partition, long current_offset, long log_eng_offset, String owner) { this.group = group; this.topic = topic; this.partition = partition; this.current_offset = current_offset; this.log_eng_offset = log_eng_offset; LAG = (log_eng_offset-current_offset); this.owner = owner; } @Override public int compareTo(CounsumeGroupMode o) { if(this.partition > o.partition){ return 1; } else if(this.partition == o.partition){ return 0; } return -1; } @Override public int hashCode() { int code = 10; return group.hashCode()*code + topic.hashCode()*code + partition*code + owner.hashCode(); } } }
写道
扩展PartitionInfo,增加主题总数据量、已消费数据旱、剩余待消费数据量等信息
package com.sunshine.kafka; import org.apache.kafka.common.Node; /** * 扩展 * @author oy * */ public class ExtPartitionInfo implements Comparable<ExtPartitionInfo>{ //========主题分区=========== private final String topic; private final int partition; //========主题据量统计============ private final long beginOffsets; private final long endOffsets; private final long pcounts; //=======备份、ISR同步节点信息==== private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas; private final String leaderStr; private final String relicasStr; private final String inSyncReplicasStr; //========消费统计========== private long currentOffset;// 当前消费offset private long logendOffset; // 分区最大offset private long lag;// 待消费数据量 public ExtPartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, long beginOffsets, long endOffsets) { this.topic = topic; this.partition = partition; this.leader = leader; this.replicas = replicas; this.inSyncReplicas = inSyncReplicas; this.beginOffsets = beginOffsets; this.endOffsets = endOffsets; //this.pcounts = (endOffsets-beginOffsets); this.pcounts = endOffsets; this.leaderStr = leader.toString(); this.relicasStr = fmtNodeIds(replicas); this.inSyncReplicasStr = fmtNodeIds(inSyncReplicas); } public ExtPartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, long beginOffsets, long endOffsets, long currentOffset, long logendOffset) { super(); this.topic = topic; this.partition = partition; this.leader = leader; this.replicas = replicas; this.inSyncReplicas = inSyncReplicas; this.beginOffsets = beginOffsets; this.endOffsets = endOffsets; //this.pcounts = (endOffsets-beginOffsets); this.pcounts = endOffsets; this.leaderStr = leader.toString(); this.relicasStr = fmtNodeIds(replicas); this.inSyncReplicasStr = fmtNodeIds(inSyncReplicas); this.currentOffset = currentOffset; this.logendOffset = logendOffset; this.lag = (logendOffset-currentOffset); } @Override public String toString() { return String.format("Partition(topic = %s, partition = %d, beginOffset=%d, endOffset=%d, counts=%d, leader = %s, replicas = %s, isr = %s)", topic, partition, beginOffsets, endOffsets, pcounts, leader == null ? "none" : leader.id(), fmtNodeIds(replicas), fmtNodeIds(inSyncReplicas)); } /* Extract the node ids from each item in the array and format for display */ private String fmtNodeIds(Node[] nodes) { StringBuilder b = new StringBuilder("["); for (int i = 0; i < nodes.length - 1; i++) { b.append(Integer.toString(nodes[i].id())); b.append(','); } if (nodes.length > 0) { b.append(Integer.toString(nodes[nodes.length - 1].id())); b.append(','); } b.append("]"); return b.toString(); } @Override public int compareTo(ExtPartitionInfo arg0) { if(arg0.partition < this.partition){ return 1; } else if(arg0.partition > this.partition){ return -1; } return 0; } /** * @return the pcounts */ public long getPcounts() { return pcounts; } /** * @return the topic */ public String getTopic() { return topic; } /** * @return the partition */ public int getPartition() { return partition; } /** * @return the leader */ public Node getLeader() { return leader; } /** * @return the replicas */ public Node[] getReplicas() { return replicas; } /** * @return the inSyncReplicas */ public Node[] getInSyncReplicas() { return inSyncReplicas; } /** * @return the beginOffsets */ public long getBeginOffsets() { return beginOffsets; } /** * @return the endOffsets */ public long getEndOffsets() { return endOffsets; } public String getLeaderStr() { return leaderStr; } public String getRelicasStr() { return relicasStr; } public String getInSyncReplicasStr() { return inSyncReplicasStr; } public long getCurrentOffset() { return currentOffset; } public long getLogendOffset() { return logendOffset; } public long getLag() { return lag; } }
写道
单元测试如下:
package com.sunshine.kafka.test; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Map.Entry; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.HttpClient; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.requests.ListGroupsResponse; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.sunshine.boot.MainBootApplication; import com.sunshine.kafka.KafkaConsumeMeta; import com.sunshine.kafka.KafkaJavaClient; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:/spring/applicationContext.xml") //@SpringBootTest(classes=MainBootApplication.class) public class KafkaMetaTest { public static final String HTTP = "http://127.0.0.1:8080/kmeta/api/kafka/meta/topic"; public HttpClient client = HttpClients.createDefault(); private String servers = "192.168.2.178:9092"; @Before public void init(){ URL url = MainBootApplication.class.getClassLoader().getResource("kafka_client_jaas.conf"); System.setProperty("java.security.auth.login.config", url.getPath()); } @Test public void testKafkaMetaList()throws Exception{ long btime = System.currentTimeMillis(); //String httpURL = HTTP + "/430100-passrec"; String httpURL = HTTP + "/test"; HttpPost post = new HttpPost(httpURL); List<NameValuePair> params=new ArrayList<NameValuePair>(); params.add(new BasicNameValuePair("servers",servers)); params.add(new BasicNameValuePair("security","true")); post.setEntity(new UrlEncodedFormEntity(params)); HttpResponse response = client.execute(post); String sendPostResult = EntityUtils.toString(response.getEntity()); long etime = System.currentTimeMillis(); long ttime = (etime - btime); System.out.println("查询时间:" + ttime + "--- sendPostResult:" + sendPostResult); } @Test public void testTopicOffset()throws Exception{ long btime = System.currentTimeMillis(); KafkaConsumeMeta kafkaMeta = new KafkaConsumeMeta(servers, true); List<PartitionInfo> list = kafkaMeta.getPartitionInfo("topic1"); for(PartitionInfo p : list){ System.out.println("主题:"+p.topic() + "-->" + p.toString()); } Map<TopicPartition, Long> offsets = kafkaMeta.getEndOffsets((List)kafkaMeta.transforTopicPartition(list)); for(Entry<TopicPartition, Long> entry : offsets.entrySet()){ TopicPartition tp = entry.getKey() ; System.out.println(tp.topic() + "-" + tp.partition() + "---" + entry.getValue()); } long etime = System.currentTimeMillis(); long ttime = (etime - btime); System.out.println("查询时间:" + ttime); } @Test public void testTopicPartition()throws Exception{ long btime = System.currentTimeMillis(); KafkaConsumeMeta kafkaMeta = new KafkaConsumeMeta(servers, true); List<PartitionInfo> list = kafkaMeta.getPartitionInfo("topic1"); for(PartitionInfo p : list){ System.out.println("主题:"+p.topic() + "-->" + p.toString()); } long etime = System.currentTimeMillis(); long ttime = (etime - btime); System.out.println("查询时间:" + ttime); } @Test public void testConsumGroup()throws Exception{ try { KafkaJavaClient kcg = new KafkaJavaClient(servers,true); //Node node = new Node(0,"68.28.6.104",9094); List<ListGroupsResponse.Group> result = kcg.listGroups(kcg.getLoadedNode()); //List<ListGroupsResponse.Group> result = kcg.listGroups(node); for(ListGroupsResponse.Group group : result){ System.out.println(group.groupId() + "--" + group.protocolType()); } for(Node node : kcg.findAllBrokers()){ System.out.println(node.toString()); } System.out.println("coordinator:" + kcg.findCoordinator("sc_veh_group3").toString()); System.out.println("consumer groups:" + kcg.listAllConsumerGroups()); System.out.println("describe group:" + kcg.describeGroup("sc_veh_group3")); } catch (Exception e) { e.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); } } }
package com.sunshine.kafka.test; import java.net.URL; import java.util.List; import java.util.Set; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.sunshine.boot.MainBootApplication; import com.sunshine.kafka.ExtPartitionInfo; import com.sunshine.kafka.KafkaConsumeGroupMetaByAdminClient; import com.sunshine.kafka.KafkaConsumeGroupMetaByAdminClient.CounsumeGroupMode; import kafka.coordinator.GroupOverview; @RunWith(SpringJUnit4ClassRunner.class) public class KafkaConsumeGroupMetaByAdminClientTest { private String servers = "192.168.121.200:9092"; @Before public void init(){ URL url = MainBootApplication.class.getClassLoader().getResource("kafka_client_jaas.conf"); System.setProperty("java.security.auth.login.config", url.getPath()); } @Test public void testListAllConsumeGroup(){ KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"","", true); Set<GroupOverview> set = client.listAllConsumerGroups(); for (GroupOverview go : set) { System.out.println(go.groupId() + "-" + go.protocolType()); } } @Test public void testGetConsumeGroup(){ KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"","g1", true); List<CounsumeGroupMode> list = client.getConsumeGroup(); for(CounsumeGroupMode cgm : list){ System.out.println("GROUP:"+cgm.getGroup() + "-TOPIC:" + cgm.getTopic() + "-PARTITION:" + cgm.getPartition() + "-CURRENTOFFSET:" + cgm.getCurrent_offset() + "-LOGENDOFFSET:" + cgm.getLog_eng_offset() + "-LAG:" + cgm.getLAG()); } } @Test public void testGetTopicPartitionOffset(){ long btime = System.currentTimeMillis(); KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"440100_PASS","group1", true); List<ExtPartitionInfo> list = client.getTopicPartitionsAllOffset(); for(ExtPartitionInfo epi : list){ System.out.println("-TOPIC:" + epi.getTopic() + "-PARTITION:" + epi.getPartition() + "-CURRENTOFFSET:" + epi.getCurrentOffset() + "-LOGENDOFFSET:" + epi.getLogendOffset() + "-LAG:" + epi.getLag() + "-BEGINOFFSET:" + epi.getBeginOffsets() + "-ENDOFFSET:" + epi.getEndOffsets()); } long etime = System.currentTimeMillis(); long ttime = (etime - btime); System.out.println("查询时间:" + ttime); } }
写道
Spring boot 构建的微服务,访问地址:http://localhost:8080/kmeta ,界面效果图如下:
写道
完整代码可以附件下载
相关推荐
Kafka-Eagle 是针对 Kafka 集群设计的一款高效、易用的监控工具,旨在提供对 Kafka 的深度监控和管理。 Kafka-Eagle 的主要特点包括: 1. **全面监控**:它能够监控 Kafka 的关键指标,如 Broker 的状态、Topic 的...
本文将详细探讨“kafka监控工具”,特别是通过Windows客户端连接Zookeeper来获取Kafka运行状态的方法。 Zookeeper是Apache Hadoop的一个子项目,它是一个分布式的、开放源码的协调服务,为分布式应用提供一致性服务...
资源介绍了kafka的监控工具-kafkaOffsetMonitor在windodws环境下的部署,以kafkaOffsetMonitor_0.2.1版本举例说明了该工具在windows环境下如何部署及对kafka参数进行监控。
Kafka监控是确保分布式消息系统高效、稳定运行的关键环节,Kafka-Eagle是一款专为Kafka设计的可视化管理和监控工具。这款工具提供了丰富的界面展示,包括消费者、主题、 broker、集群的状态信息,以及性能指标等,...
为了更好地管理和监控Kafka集群,有两个重要的工具被广泛使用,即`KafkaOffsetMonitor`和`Kafka-Manager`,它们恰好在你提供的压缩包文件中。 `KafkaOffsetMonitor`是一个非常实用的工具,它能够帮助我们监控Kafka...
而"offset kafka监控工具"则是针对Kafka集群进行管理和监控的重要辅助工具,它允许用户查看和管理Kafka主题中的消费偏移量,这对于理解和调试生产者和消费者的同步状态至关重要。 "offset explore"是这类工具的典型...
标题中的“实测可用kafka监控工具”指的是一个经过实际测试的工具,这个工具能够帮助管理员有效监控Kafka集群的状态,确保系统的稳定运行。这样的工具通常包含以下几个关键功能: 1. **基本信息仪表盘**:这是一个...
Kafka 监控软件 Kafka-Eagle-Web 使用手册 Kafka 监控软件 Kafka-Eagle-Web 概述 Kafka-Eagle-Web 是一个基于 Kafka 的监控软件,旨在提供 Kafka 集群的实时监控和管理功能。该软件提供了便捷的安装和配置过程,使...
**Kafka监控程序详解** Kafka是一款开源的分布式消息系统,由LinkedIn开发并贡献给了Apache软件基金会。它被设计为高吞吐量、低延迟的实时处理平台,用于处理和存储大量的流式数据。Kafka监控程序是针对Kafka集群...
**Kafka Eagle** 是一款专为Apache Kafka设计的开源监控和管理工具,旨在提供更为直观、高效的监控解决方案。...尽管2018年的版本可能无法满足最新的需求,但它仍然是了解Kafka监控工具的一个良好起点。
《实测可用Kafka监控工具——Kafka-Manager深度解析》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的分布式流处理平台,扮演着至关重要的角色。然而,随着Kafka集群规模的扩大,管理和监控Kafka的复杂性也...
【Kafka监控工具KafkaOffsetMonitor详解】 KafkaOffsetMonitor是一款强大的开源监控工具,专为Apache Kafka设计,用于实时监控和分析Kafka集群中的消费者偏移量。它可以帮助管理员跟踪消费者的消费进度,确保数据的...
**Kafka监控工具KafkaOffsetMonitor详解** 在大数据实时处理领域,Apache Kafka作为一个高效、可扩展的开源消息系统,扮演着重要角色。然而,为了确保Kafka集群的稳定运行和数据完整性,有效的监控至关重要。这就是...
kafka监控工具KafkaOffsetMonitor afkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有...
Zabbix提供了名为"zbx_kafka_templates"的模板,使得我们可以轻松集成Kafka监控。这个模板包含了各种关键性能指标,如Brokers的CPU利用率、内存使用情况、磁盘I/O、网络流量,以及主题(Topic)级别的消息生产和消费...
首先,Kafka Manager是一款开源的Kafka监控工具,它的源代码可以在名为"kafka-manager-master"的压缩包中找到。Kafka Manager提供了用户友好的界面,允许管理员轻松查看和管理Kafka集群。它能够帮助我们完成以下任务...
《Kafka监控工具KafkaOffsetMonitor的深度解析》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的实时流处理平台,起着至关重要的作用。为了确保Kafka集群的稳定运行,有效的监控工具必不可少。...
jeesuite统一管理平台。包括:定时任务监控、kafka监控、统一配置管理、性能监控管理等。
在Linux系统中对Kafka进行监控是确保分布式消息系统稳定运行的关键步骤。Kafka是一款高吞吐、低延迟的开源消息系统,广泛应用于大数据实时处理和流数据平台。本篇文章将详细探讨如何在Linux环境下有效地监控Kafka。 ...