`
yangyangmyself
  • 浏览: 233742 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Kafka 监控

阅读更多
背景概述 写道
     kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中。
    管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写)
    上述开源kafka管理工具对于配置有KAFKA SASL安全认证的集群,需要修改相应代码重新编译,也就是直接下载连接不上kafka集群。KafkaOffsetMonitor、kafka-manager都是采用scala语言编写,本节程序大都用Java程序编写,少部份采用JAVA调用了scala接口方法。如果有必要我们下节将KafkaOffsetMonitor升级,增加kafka安全性支持。
    Kafka SASL安全配置,本节暂不做讲解。

 

KafkaConsumer 接口描述 写道
KafkaConsumer 接口
1)Map<String, List<PartitionInfo>> listTopics() 返回所有主题分区列表信息
2)List<PartitionInfo> partitionsFor(String topic) 返回指定主题的分区列表信息
3)Map<TopicPartition, Long> beginningOffsets(List<TopicPartition> topicPartition) 返回指定分区列表中所有分区开始偏移Offset
4)Map<TopicPartition, Long> endOffsets(List<TopicPartition> topicPartition) 返回指定分区列表中所有分区结束偏移Offset(即为LogEndOffset)
KafkaConsumer 线程不安全,所在close方法上加上锁; 通下述接口可以获取主题及分区Offset信息。

 

package com.sunshine.kafka;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import com.sunshine.boot.MainBootApplication;
/**
 * Kafa meta informations
 * @author oy
 *
 */
public class KafkaConsumeMeta {
	
	private final KafkaConsumer<Integer, String> consumer;
	
	private final String topic;
	
	private ReentrantLock lock = new ReentrantLock();
	
	public KafkaConsumeMeta(String servers) {       
       this(null, servers);
    }
	
	public Properties getDefaultProperties(){
		Properties props = new Properties();
        // kafka server
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.178:9092");
        // group id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000");
        //props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30000");
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "670000");
        
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        
        // load props
        Map<String, String> map = MainBootApplication.getAllPropertiesByPrefix("kafka");
        for(Map.Entry<String, String> entry : map.entrySet()){
        	props.put(entry.getKey(), entry.getValue());
        }
    
        return props;
	}
	
	public KafkaConsumeMeta(String topic, String servers) {  
        Properties props = getDefaultProperties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }

	public KafkaConsumeMeta(String servers, boolean security) {
		this(null, servers, security);
	}
	
	public KafkaConsumeMeta(String topic, String servers, boolean security) {
        Properties props = getDefaultProperties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // 安全
        if(security){
	        props.put("security.protocol", "SASL_PLAINTEXT");
	        props.put("sasl.mechanism", "PLAIN");
        }
        consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }
	
	/**
	 * 查询所有主题分区信息
	 * key=topic
	 * value=List<PartitionInfo>
	 * @return
	 */
	public Map<String, List<PartitionInfo>> getAllTopicPartiions(){
		Map<String, List<PartitionInfo>>  map = consumer.listTopics();
		return map;
	}
	
	/**
	 * 查询指定主题分区列表信息
	 * @param topic
	 * @return
	 */
	public List<PartitionInfo> getPartitionInfo(String topic){
		return consumer.partitionsFor(topic);
	} 
	
	/**
	 * 
	 * @param info
	 * @return
	 */
	public TopicPartition transforTopicPartition(PartitionInfo info){
		return new TopicPartition(info.topic(),info.partition());
	}
	
	/**
	 * 
	 * @param list
	 * @return
	 */
	public Collection<TopicPartition> transforTopicPartition(List<PartitionInfo> list){
		List<TopicPartition> result = new ArrayList<TopicPartition>();
		for(PartitionInfo info : list){
			result.add(transforTopicPartition(info));
		}
		return result;
	}
	
	/**
	 * 获取分区开始的偏移量
	 * @param partitions
	 * @return
	 */
	public Map<TopicPartition, Long> getBeginningOffsets(List<TopicPartition> topicPartition){
		return consumer.beginningOffsets(topicPartition);
	}
	
	
	/**
	 * 获取分区结束的偏移量
	 * @param partitions
	 * @return
	 */
	public Map<TopicPartition, Long> getEndOffsets(List<TopicPartition> topicPartition){
		return consumer.endOffsets(topicPartition);
	}
	
	/**
	 * @return the consumer
	 */
	public KafkaConsumer<Integer, String> getConsumer() {
		return consumer;
	}
	
	public  OffsetAndMetadata getCurrentOffsetAndMetadata(TopicPartition tp){
		return consumer.committed(new TopicPartition(tp.topic(),tp.partition()));
	}
	
	public void close(){
		// consumer is unsafe thread object
		lock.lock();
		try {
			if(consumer!=null)
				consumer.close();
		} catch (Exception e) {
			e.printStackTrace();
		} finally{
			lock.unlock();
		}
	}
}

 

主题消费组Offfset信息 写道
管理监控中我们需要获取某个主题、使用某个组在各个分区消耗费位情况,以此来了解消费者的消费能力。

 

package com.sunshine.kafka;

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sunshine.boot.MainBootApplication;

import kafka.admin.AdminClient;
import kafka.admin.AdminClient.ConsumerSummary;
import kafka.common.TopicAndPartition;
import kafka.coordinator.GroupOverview;
import scala.Option;
import scala.collection.JavaConverters;

public class KafkaConsumeGroupMetaByAdminClient {
	
	private final AdminClient client ;
	
	private final String servers;
	
	private final boolean security;
	
	private final String groupId;
	
	private final String topic;
	
	private KafkaConsumer<String, String> consumer;
	
	private Logger log = LoggerFactory.getLogger(getClass());
	
	private ReentrantLock lock = new ReentrantLock();
	
	public KafkaConsumeGroupMetaByAdminClient(String servers, String topic, String groupId, boolean security){
		Properties properties = new Properties();
		String deserializer = new StringDeserializer().getClass().getName();
		// kafka servers
		properties.put(ExtConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
		properties.put(ExtConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		properties.put(ExtConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
		properties.put(ExtConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500000");
		properties.put(ExtConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
		properties.put(ExtConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
		if (security) {
			properties.put("security.protocol", "SASL_PLAINTEXT");
			properties.put("sasl.mechanism", "PLAIN");
		}
		this.security = security;
		this.servers = servers;
		this.groupId = groupId;
		this.topic = topic;
		this.client = AdminClient.create(properties);
	}
	
	public Set<GroupOverview> listAllConsumerGroups() {
		scala.collection.immutable.List<GroupOverview> list = client.listAllConsumerGroupsFlattened();
		List<GroupOverview> result = JavaConverters.asJavaListConverter(list).asJava();
		return new HashSet<GroupOverview>(result);
	}
	/**
	 * 使用Kafka自带管理接口AdminClient获取组消费情况
	 * 使用同一组名消费的所有主题及分区列表信息
	 * @return
	 */
	public List<CounsumeGroupMode> getConsumeGroup() {
		lock.lock();
		Option<scala.collection.immutable.List<ConsumerSummary>> option = client.describeConsumerGroup(groupId);
		try{
			if(option.isEmpty()){
				log.error("Consumer group "+groupId+" is rebalancing.");
			} else {
				List<ConsumerSummary> consumers = JavaConverters.asJavaListConverter(option.get()).asJava();
				KafkaConsumer<String, String> consumer = getConsumer();
		    	List<CounsumeGroupMode> result = new ArrayList<>();
		    	for(ConsumerSummary consumerSummary : consumers){
		    		List<TopicPartition> tps = JavaConverters.asJavaListConverter(consumerSummary.assignment()).asJava();
		    		List<TopicAndPartition> taps = new ArrayList<>();
		    		Map<TopicAndPartition, Long> partitionOffsets = new HashMap<TopicAndPartition, Long>();
		    		for(TopicPartition tp : tps){
		    			TopicAndPartition topicAndPartition = new TopicAndPartition(tp.topic(), tp.partition());
		    			taps.add(topicAndPartition);
		    			OffsetAndMetadata offsetAndMetadata = consumer.committed(new TopicPartition(tp.topic(),tp.partition()));
		    			partitionOffsets.put(topicAndPartition, (offsetAndMetadata == null)?0:offsetAndMetadata.offset());
		    		}
		    		List<CounsumeGroupMode> t = describeTopicPartition(groupId, taps, partitionOffsets,consumerSummary.clientId()+"_"+consumerSummary.clientHost());
		    		result.addAll(t);
		    	}
		    	return result;
			}
		} catch(Exception e){
			e.printStackTrace();
		} finally{
			lock.unlock();
		}
		return Collections.EMPTY_LIST;
	}
	
	/**
	 * 获取指定主题所有分区开始偏移Offset、最大Offset、当前组消费Offset
	 * @return
	 */
	public List<ExtPartitionInfo> getTopicPartitionsAllOffset(){
		List<ExtPartitionInfo> result = new ArrayList<ExtPartitionInfo>();
		try{
			lock.lock();
			KafkaConsumer<String, String> consumer = getConsumer();
			List<PartitionInfo> pis = consumer.partitionsFor(this.topic);
			List<TopicPartition> tps = transforTopicPartition(pis);
			Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps);
			Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps); //1
			for (PartitionInfo pi : pis) {
				TopicPartition _tp = new TopicPartition(pi.topic(), pi.partition());
				
				// 获取当前主题分区消息位置方法1
				//OffsetAndMetadata offsetAndMetadata = consumer.committed(_tp); 
				//long currentOffset = (offsetAndMetadata == null)?0:offsetAndMetadata.offset();
				// 获取当前主题分区消息位置方法2
				long currentOffset = getCurrentOffset(pi.topic(), pi.partition());
				
				// 统一使用endOffsets值
				// 原注释位置1先执行,所以注释位置2获取的值大于等于注释位置1的值
				//long logendOffset = getLogEndOffset(pi.topic(), pi.partition()); //2
				
				ExtPartitionInfo epi = new ExtPartitionInfo(pi.topic(), pi.partition(), pi.leader(), pi.replicas(),
						pi.inSyncReplicas(), beginOffsets.get(_tp), endOffsets.get(_tp), currentOffset, endOffsets.get(_tp));
				result.add(epi);
			}
			Collections.sort(result);
		} catch(Exception e){
			e.printStackTrace();
		} finally{
			lock.unlock();
		}
		return result;
	}
	
	private List<TopicPartition> transforTopicPartition(List<PartitionInfo> list){
		List<TopicPartition> result = new ArrayList<TopicPartition>();
		for(PartitionInfo info : list){
			result.add(transforTopicPartition(info));
		}
		return result;
	} 
	
	
	private TopicPartition transforTopicPartition(PartitionInfo info){
		return new TopicPartition(info.topic(),info.partition());
	}
	
	private List<CounsumeGroupMode> describeTopicPartition(String group, List<TopicAndPartition> topicPartitions,
    		Map<TopicAndPartition, Long> partitionOffsets,String owner){
    	List<CounsumeGroupMode> rs = new ArrayList<>();
    	for(TopicAndPartition tap : topicPartitions){
    		long logEndOffset = getLogEndOffset(tap.topic(), tap.partition());
    		long currentOffset = partitionOffsets.get(tap);
    		CounsumeGroupMode cgm = new CounsumeGroupMode(group, tap.topic(), tap.partition(), 
    				currentOffset, logEndOffset, owner);
    		rs.add(cgm);
    	}
    	return rs;
	}
	
	/**
	 * 查询当前主题分区消费Offset
	 * @param topic
	 * @param partition
	 * @return
	 */
	private long getCurrentOffset(String topic, int partition){
		KafkaConsumer<String, String> consumer = getConsumer();
		TopicPartition topicPartition = new TopicPartition(topic, partition);
		consumer.assign(Arrays.asList(topicPartition));
		long logEndOffset = consumer.position(topicPartition);
		return logEndOffset;
	}
	
	private long getLogEndOffset(String topic, int partition){
		KafkaConsumer<String, String> consumer = getConsumer();
		TopicPartition topicPartition = new TopicPartition(topic, partition);
		consumer.assign(Arrays.asList(topicPartition));
		consumer.seekToEnd(Arrays.asList(topicPartition));
		long logEndOffset = consumer.position(topicPartition);
		return logEndOffset;
	}
	
	public KafkaConsumer<String, String> getConsumer(){
		if (consumer == null)
			consumer = createNewConsumer();
		return consumer;
	}
	
	public List<Node> listAllBrokers() {
		return JavaConverters.asJavaListConverter(client.bootstrapBrokers()).asJava();
	}
	
    public void close() {
    	lock.lock();
        try {
			client.close();
			if (consumer != null) 
			  consumer.close();
		} catch (Exception e) {
			e.printStackTrace();
		} finally{
			lock.unlock();
		}
    }
    
	private KafkaConsumer<String, String> createNewConsumer() {
		Properties properties = new Properties();
		String deserializer = new StringDeserializer().getClass().getName();
		properties.put(ExtConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);
		if(groupId != null)
			properties.put(ExtConsumerConfig.GROUP_ID_CONFIG, this.groupId);
		properties.put(ExtConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		properties.put(ExtConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
		// New consumer api
		// 用新组查询主题分区当前消费位置时,需要设置为earliest,默认为lastest
		properties.put(ExtConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "lastest");
		properties.put(ExtConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "500000");
		properties.put(ExtConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
		properties.put(ExtConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
		if (security) {
			properties.put("security.protocol", "SASL_PLAINTEXT");
			properties.put("sasl.mechanism", "PLAIN");
		}
		return new KafkaConsumer<String, String>(properties);
	}

    public class CounsumeGroupMode implements Comparable<CounsumeGroupMode>{
		public String getGroup() {
			return group;
		}
		public void setGroup(String group) {
			this.group = group;
		}
		public String getTopic() {
			return topic;
		}
		public void setTopic(String topic) {
			this.topic = topic;
		}
		public int getPartition() {
			return partition;
		}

		public void setPartition(int partition) {
			this.partition = partition;
		}
		public long getCurrent_offset() {
			return current_offset;
		}

		public void setCurrent_offset(long current_offset) {
			this.current_offset = current_offset;
		}
	
		public long getLog_eng_offset() {
			return log_eng_offset;
		}

		public void setLog_eng_offset(long log_eng_offset) {
			this.log_eng_offset = log_eng_offset;
		}
		public long getLAG() {
			return LAG;
		}
		public void setLAG(long lAG) {
			LAG = lAG;
		}

		public String getOwner() {
			return owner;
		}

		public void setOwner(String owner) {
			this.owner = owner;
		}
		private String group;
    	private String topic;
    	private int partition;
    	private long current_offset;
    	private long log_eng_offset;
    	private long LAG;
    	private String owner;
		public CounsumeGroupMode(String group, String topic, int partition,
				long current_offset, long log_eng_offset, String owner) {
			this.group = group;
			this.topic = topic;
			this.partition = partition;
			this.current_offset = current_offset;
			this.log_eng_offset = log_eng_offset;
			LAG = (log_eng_offset-current_offset);
			this.owner = owner;
		}
				
		@Override
		public int compareTo(CounsumeGroupMode o) {
			if(this.partition > o.partition){
				return 1;
			} else if(this.partition == o.partition){
				return 0;
			}
			return -1;
		}
		
		@Override
		public int hashCode() {
			int code = 10;
			return group.hashCode()*code + 
					topic.hashCode()*code + 
					partition*code + 
					owner.hashCode();
		}		
    }
 
}

 

写道
扩展PartitionInfo,增加主题总数据量、已消费数据旱、剩余待消费数据量等信息

 

package com.sunshine.kafka;

import org.apache.kafka.common.Node;
/**
 * 扩展
 * @author oy
 *
 */
public class ExtPartitionInfo implements Comparable<ExtPartitionInfo>{
	
	//========主题分区===========
	private final String topic;
    private final int partition;
    //========主题据量统计============
	private final long beginOffsets;
	private final long endOffsets;
	private final long pcounts;
	//=======备份、ISR同步节点信息====
	private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
	private final String leaderStr;
	private final String relicasStr;
	private final String inSyncReplicasStr;
	//========消费统计==========
	private  long currentOffset;// 当前消费offset
	private  long logendOffset; // 分区最大offset
	private  long lag;// 待消费数据量
	

	public ExtPartitionInfo(String topic, int partition, Node leader,
			Node[] replicas, Node[] inSyncReplicas, long beginOffsets,
			long endOffsets) {
		this.topic = topic;
		this.partition = partition;
		this.leader = leader;
		this.replicas = replicas;
		this.inSyncReplicas = inSyncReplicas;
		this.beginOffsets = beginOffsets;
		this.endOffsets = endOffsets;
		//this.pcounts = (endOffsets-beginOffsets);
		this.pcounts = endOffsets;
		this.leaderStr = leader.toString();
		this.relicasStr = fmtNodeIds(replicas);
		this.inSyncReplicasStr = fmtNodeIds(inSyncReplicas);
	}
	
	public ExtPartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas,
			long beginOffsets, long endOffsets, long currentOffset, long logendOffset) {
		super();
		this.topic = topic;
		this.partition = partition;
		this.leader = leader;
		this.replicas = replicas;
		this.inSyncReplicas = inSyncReplicas;
		this.beginOffsets = beginOffsets;
		this.endOffsets = endOffsets;
		//this.pcounts = (endOffsets-beginOffsets);
		this.pcounts = endOffsets;
		this.leaderStr = leader.toString();
		this.relicasStr = fmtNodeIds(replicas);
		this.inSyncReplicasStr = fmtNodeIds(inSyncReplicas);
		this.currentOffset = currentOffset;
		this.logendOffset = logendOffset;
		this.lag = (logendOffset-currentOffset);
	}

	@Override
    public String toString() {
        return String.format("Partition(topic = %s, partition = %d, beginOffset=%d, endOffset=%d, counts=%d, leader = %s, replicas = %s, isr = %s)",
                             topic,
                             partition,
                             beginOffsets,
                             endOffsets,
                             pcounts,
                             leader == null ? "none" : leader.id(),
                             fmtNodeIds(replicas),
                             fmtNodeIds(inSyncReplicas));
    }

    /* Extract the node ids from each item in the array and format for display */
    private String fmtNodeIds(Node[] nodes) {
        StringBuilder b = new StringBuilder("[");
        for (int i = 0; i < nodes.length - 1; i++) {
            b.append(Integer.toString(nodes[i].id()));
            b.append(',');
        }
        if (nodes.length > 0) {
            b.append(Integer.toString(nodes[nodes.length - 1].id()));
            b.append(',');
        }
        b.append("]");
        return b.toString();
    }

	@Override
	public int compareTo(ExtPartitionInfo arg0) {
		if(arg0.partition < this.partition){
			return 1;
		} else if(arg0.partition > this.partition){
			return -1;
		}
		return 0;
	}
    
	/**
	 * @return the pcounts
	 */
	public long getPcounts() {
		return pcounts;
	}

	/**
	 * @return the topic
	 */
	public String getTopic() {
		return topic;
	}

	/**
	 * @return the partition
	 */
	public int getPartition() {
		return partition;
	}

	/**
	 * @return the leader
	 */
	public Node getLeader() {
		return leader;
	}

	/**
	 * @return the replicas
	 */
	public Node[] getReplicas() {
		return replicas;
	}

	/**
	 * @return the inSyncReplicas
	 */
	public Node[] getInSyncReplicas() {
		return inSyncReplicas;
	}

	/**
	 * @return the beginOffsets
	 */
	public long getBeginOffsets() {
		return beginOffsets;
	}

	/**
	 * @return the endOffsets
	 */
	public long getEndOffsets() {
		return endOffsets;
	}
	
	public String getLeaderStr() {
		return leaderStr;
	}

	public String getRelicasStr() {
		return relicasStr;
	}

	public String getInSyncReplicasStr() {
		return inSyncReplicasStr;
	}

	public long getCurrentOffset() {
		return currentOffset;
	}

	public long getLogendOffset() {
		return logendOffset;
	}

	public long getLag() {
		return lag;
	}
}

 

写道
单元测试如下:

 

package com.sunshine.kafka.test;

import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.sunshine.boot.MainBootApplication;
import com.sunshine.kafka.KafkaConsumeMeta;
import com.sunshine.kafka.KafkaJavaClient;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:/spring/applicationContext.xml")
//@SpringBootTest(classes=MainBootApplication.class)
public class KafkaMetaTest {
	
	public static final String HTTP = "http://127.0.0.1:8080/kmeta/api/kafka/meta/topic";

	public HttpClient client = HttpClients.createDefault();
	
	private String servers = "192.168.2.178:9092";
	
	@Before
	public void init(){
		URL url = MainBootApplication.class.getClassLoader().getResource("kafka_client_jaas.conf");
		System.setProperty("java.security.auth.login.config", url.getPath());
	}
	
	@Test
	public void testKafkaMetaList()throws Exception{
		long btime = System.currentTimeMillis();
		//String httpURL = HTTP  + "/430100-passrec";
		String httpURL = HTTP  + "/test";
		HttpPost post = new HttpPost(httpURL);
		List<NameValuePair> params=new ArrayList<NameValuePair>();
		params.add(new BasicNameValuePair("servers",servers));
		params.add(new BasicNameValuePair("security","true"));
		post.setEntity(new UrlEncodedFormEntity(params));
		HttpResponse response = client.execute(post);
		String sendPostResult = EntityUtils.toString(response.getEntity());
		long etime = System.currentTimeMillis();
		long ttime = (etime - btime);
		System.out.println("查询时间:" + ttime + "--- sendPostResult:" + sendPostResult);
	}
	
	@Test
	public void testTopicOffset()throws Exception{
		long btime = System.currentTimeMillis();
		KafkaConsumeMeta kafkaMeta = new KafkaConsumeMeta(servers, true);
		List<PartitionInfo> list = kafkaMeta.getPartitionInfo("topic1");
		for(PartitionInfo p : list){
			System.out.println("主题:"+p.topic() + "-->" + p.toString());
		}
		Map<TopicPartition, Long> offsets = kafkaMeta.getEndOffsets((List)kafkaMeta.transforTopicPartition(list));
		for(Entry<TopicPartition, Long> entry : offsets.entrySet()){
			TopicPartition tp = entry.getKey() ;
			System.out.println(tp.topic() + "-" + tp.partition() + "---" + entry.getValue());
		}
		long etime = System.currentTimeMillis();
		long ttime = (etime - btime);
		System.out.println("查询时间:" + ttime);
	}
	
	@Test
	public void testTopicPartition()throws Exception{
		long btime = System.currentTimeMillis();
		KafkaConsumeMeta kafkaMeta = new KafkaConsumeMeta(servers, true);
		List<PartitionInfo> list = kafkaMeta.getPartitionInfo("topic1");
		for(PartitionInfo p : list){
			System.out.println("主题:"+p.topic() + "-->" + p.toString());
		}
		long etime = System.currentTimeMillis();
		long ttime = (etime - btime);
		System.out.println("查询时间:" + ttime);
	}
	
	@Test
	public void testConsumGroup()throws Exception{
		try {
    		KafkaJavaClient kcg = new KafkaJavaClient(servers,true);
			//Node node = new Node(0,"68.28.6.104",9094);
    		List<ListGroupsResponse.Group>  result = kcg.listGroups(kcg.getLoadedNode());
    		//List<ListGroupsResponse.Group>  result = kcg.listGroups(node);
			for(ListGroupsResponse.Group group : result){
				System.out.println(group.groupId() + "--" + group.protocolType());
			}
			for(Node node : kcg.findAllBrokers()){
				System.out.println(node.toString());
			}
			System.out.println("coordinator:" + kcg.findCoordinator("sc_veh_group3").toString());
			System.out.println("consumer groups:" + kcg.listAllConsumerGroups());
			System.out.println("describe group:" + kcg.describeGroup("sc_veh_group3"));
    	} catch (Exception e) {
			e.printStackTrace();
		} catch (Throwable e) {
			e.printStackTrace();
		}
	}
}

 

package com.sunshine.kafka.test;

import java.net.URL;
import java.util.List;
import java.util.Set;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.sunshine.boot.MainBootApplication;
import com.sunshine.kafka.ExtPartitionInfo;
import com.sunshine.kafka.KafkaConsumeGroupMetaByAdminClient;
import com.sunshine.kafka.KafkaConsumeGroupMetaByAdminClient.CounsumeGroupMode;

import kafka.coordinator.GroupOverview;

@RunWith(SpringJUnit4ClassRunner.class)
public class KafkaConsumeGroupMetaByAdminClientTest {
	
	private String servers = "192.168.121.200:9092";
	
	@Before
	public void init(){
		URL url = MainBootApplication.class.getClassLoader().getResource("kafka_client_jaas.conf");
		System.setProperty("java.security.auth.login.config", url.getPath());
	}
	
	@Test
	public void testListAllConsumeGroup(){
		KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"","", true);
		Set<GroupOverview> set = client.listAllConsumerGroups();
		for (GroupOverview go : set) {
			System.out.println(go.groupId() + "-" + go.protocolType());
		}
	}
	
	@Test
	public void testGetConsumeGroup(){
		KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"","g1", true);
		List<CounsumeGroupMode> list = client.getConsumeGroup();
		for(CounsumeGroupMode cgm : list){
			System.out.println("GROUP:"+cgm.getGroup() + "-TOPIC:" + cgm.getTopic() + "-PARTITION:" + cgm.getPartition() + 
					"-CURRENTOFFSET:" + cgm.getCurrent_offset() + "-LOGENDOFFSET:" + cgm.getLog_eng_offset() + "-LAG:" + 
					cgm.getLAG());
		}
	}
	
	@Test
	public void testGetTopicPartitionOffset(){
		long btime = System.currentTimeMillis();
		KafkaConsumeGroupMetaByAdminClient client = new KafkaConsumeGroupMetaByAdminClient(servers,"440100_PASS","group1", true);
		List<ExtPartitionInfo> list = client.getTopicPartitionsAllOffset();
		for(ExtPartitionInfo epi : list){
			System.out.println("-TOPIC:" + epi.getTopic() + "-PARTITION:" + epi.getPartition() + 
					"-CURRENTOFFSET:" + epi.getCurrentOffset() + "-LOGENDOFFSET:" + epi.getLogendOffset() + "-LAG:" + 
					epi.getLag() + "-BEGINOFFSET:" + epi.getBeginOffsets() + "-ENDOFFSET:" + epi.getEndOffsets());
		}
		long etime = System.currentTimeMillis();
		long ttime = (etime - btime);
		System.out.println("查询时间:" + ttime);
	}
}

 

写道
Spring boot 构建的微服务,访问地址:http://localhost:8080/kmeta ,界面效果图如下:

 

 

写道
完整代码可以附件下载

 

  • 大小: 541.1 KB
分享到:
评论
1 楼 tonyyan 2018-08-29  
谢谢分享!

相关推荐

    5、kafka监控工具Kafka-Eagle介绍及使用

    Kafka-Eagle 是针对 Kafka 集群设计的一款高效、易用的监控工具,旨在提供对 Kafka 的深度监控和管理。 Kafka-Eagle 的主要特点包括: 1. **全面监控**:它能够监控 Kafka 的关键指标,如 Broker 的状态、Topic 的...

    kafka监控工具

    本文将详细探讨“kafka监控工具”,特别是通过Windows客户端连接Zookeeper来获取Kafka运行状态的方法。 Zookeeper是Apache Hadoop的一个子项目,它是一个分布式的、开放源码的协调服务,为分布式应用提供一致性服务...

    Windows环境下kafka监控工具之kafkaOffsetMonitor的部署

    资源介绍了kafka的监控工具-kafkaOffsetMonitor在windodws环境下的部署,以kafkaOffsetMonitor_0.2.1版本举例说明了该工具在windows环境下如何部署及对kafka参数进行监控。

    kafka监控安装包

    Kafka监控是确保分布式消息系统高效、稳定运行的关键环节,Kafka-Eagle是一款专为Kafka设计的可视化管理和监控工具。这款工具提供了丰富的界面展示,包括消费者、主题、 broker、集群的状态信息,以及性能指标等,...

    Kafka监控工具.zip

    为了更好地管理和监控Kafka集群,有两个重要的工具被广泛使用,即`KafkaOffsetMonitor`和`Kafka-Manager`,它们恰好在你提供的压缩包文件中。 `KafkaOffsetMonitor`是一个非常实用的工具,它能够帮助我们监控Kafka...

    offset kafka监控工具 免费

    而"offset kafka监控工具"则是针对Kafka集群进行管理和监控的重要辅助工具,它允许用户查看和管理Kafka主题中的消费偏移量,这对于理解和调试生产者和消费者的同步状态至关重要。 "offset explore"是这类工具的典型...

    实测可用kafka监控工具

    标题中的“实测可用kafka监控工具”指的是一个经过实际测试的工具,这个工具能够帮助管理员有效监控Kafka集群的状态,确保系统的稳定运行。这样的工具通常包含以下几个关键功能: 1. **基本信息仪表盘**:这是一个...

    Kafka 监控软件kafka-eagle-web使用手册

    Kafka 监控软件 Kafka-Eagle-Web 使用手册 Kafka 监控软件 Kafka-Eagle-Web 概述 Kafka-Eagle-Web 是一个基于 Kafka 的监控软件,旨在提供 Kafka 集群的实时监控和管理功能。该软件提供了便捷的安装和配置过程,使...

    Kafka监控程序附带测试代码

    **Kafka监控程序详解** Kafka是一款开源的分布式消息系统,由LinkedIn开发并贡献给了Apache软件基金会。它被设计为高吞吐量、低延迟的实时处理平台,用于处理和存储大量的流式数据。Kafka监控程序是针对Kafka集群...

    kafka监控工具kafka-eagle

    **Kafka Eagle** 是一款专为Apache Kafka设计的开源监控和管理工具,旨在提供更为直观、高效的监控解决方案。...尽管2018年的版本可能无法满足最新的需求,但它仍然是了解Kafka监控工具的一个良好起点。

    实测可用kafka监控工具(2)--kafka-manager

    《实测可用Kafka监控工具——Kafka-Manager深度解析》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的分布式流处理平台,扮演着至关重要的角色。然而,随着Kafka集群规模的扩大,管理和监控Kafka的复杂性也...

    kafka监控工具KafkaOffsetMnitor angularjs和css

    【Kafka监控工具KafkaOffsetMonitor详解】 KafkaOffsetMonitor是一款强大的开源监控工具,专为Apache Kafka设计,用于实时监控和分析Kafka集群中的消费者偏移量。它可以帮助管理员跟踪消费者的消费进度,确保数据的...

    kafka监控工具KafkaOffsetMonitor.rar

    **Kafka监控工具KafkaOffsetMonitor详解** 在大数据实时处理领域,Apache Kafka作为一个高效、可扩展的开源消息系统,扮演着重要角色。然而,为了确保Kafka集群的稳定运行和数据完整性,有效的监控至关重要。这就是...

    kafka监控工具KafkaOffsetMonitor.7z

    kafka监控工具KafkaOffsetMonitor afkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有...

    zabbix监控之kafka模板_zbx_kafka_templates

    Zabbix提供了名为"zbx_kafka_templates"的模板,使得我们可以轻松集成Kafka监控。这个模板包含了各种关键性能指标,如Brokers的CPU利用率、内存使用情况、磁盘I/O、网络流量,以及主题(Topic)级别的消息生产和消费...

    kafka 监控工具

    首先,Kafka Manager是一款开源的Kafka监控工具,它的源代码可以在名为"kafka-manager-master"的压缩包中找到。Kafka Manager提供了用户友好的界面,允许管理员轻松查看和管理Kafka集群。它能够帮助我们完成以下任务...

    Kafka监控工具KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar

    《Kafka监控工具KafkaOffsetMonitor的深度解析》 在大数据处理领域,Apache Kafka作为一款高效、可扩展的实时流处理平台,起着至关重要的作用。为了确保Kafka集群的稳定运行,有效的监控工具必不可少。...

    jeesuite统一管理平台。包括:定时任务监控、kafka监控、统一配置管理、性能监控管理等。.zip

    jeesuite统一管理平台。包括:定时任务监控、kafka监控、统一配置管理、性能监控管理等。

    linux系统的kafka监控

    在Linux系统中对Kafka进行监控是确保分布式消息系统稳定运行的关键步骤。Kafka是一款高吞吐、低延迟的开源消息系统,广泛应用于大数据实时处理和流数据平台。本篇文章将详细探讨如何在Linux环境下有效地监控Kafka。 ...

Global site tag (gtag.js) - Google Analytics