`

kafka常用操作大全

 
阅读更多
package anyec;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadata;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
/**
 * 
 * @author anyec
 *@description kafka低级消费
 */
public class KafkaSimpleConsumer {
	static Logger logger=LoggerFactory.getLogger(KafkaSimpleConsumer.class);
	private List<String> replicaBrokers = new ArrayList<String>();

	public KafkaSimpleConsumer() {
		replicaBrokers = new ArrayList<String>();
	}

	public static void main(String args[]) {
		KafkaSimpleConsumer example = new KafkaSimpleConsumer();
		// 最大读取消息数量
		long maxReads = Long.parseLong("3");
		// 要订阅的topic
		String topic = "anytopic";
		// 要查找的分区
		int partition = Integer.parseInt("0");
		// broker节点的ip
		List<String> seeds = new ArrayList<String>();
		seeds.add("ldap.anyec.cn");
		
		// 端口
		int port = Integer.parseInt("9092");
		try {
			example.run(maxReads, topic, partition, seeds, port);
		} catch (Exception e) {
			logger.error("error:" + e);
			e.printStackTrace();
		}
	}
/**
 * 
 * @param maxReads
 * @param topic
 * @param partition
 * @param seedBrokers
 * @param port
 * @throws Exception
 */
	public void run(long maxReads, String topic, int partition, List<String> seedBrokers, int port) throws Exception {
		// 获取指定Topic partition的元数据
		PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
		if (metadata == null) {
			logger.error("Can't find metadata for Topic and Partition. Exiting");
			return;
		}
		if (metadata.leader() == null) {
			logger.error("Can't find Leader for Topic and Partition. Exiting");
			return;
		}
		String leadBroker = metadata.leader().host();
		String clientName = "Client_" + topic + "_" + partition;

		SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
		long readOffset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
		int numErrors = 0;
		while (maxReads > 0) {
			if (consumer == null) {
				consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
			}
			FetchRequest req = new FetchRequestBuilder().clientId(clientName)
					.addFetch(topic, partition, readOffset, 100000).build();
			FetchResponse fetchResponse = consumer.fetch(req);

			if (fetchResponse.hasError()) {
				numErrors++;
				short code = fetchResponse.errorCode(topic, partition);
				logger.info("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
				if (numErrors > 5)
					break;
				if (code == ErrorMapping.OffsetOutOfRangeCode()) {
					readOffset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(),
							clientName);
					continue;
				}
				consumer.close();
				consumer = null;
				leadBroker = findNewLeader(leadBroker, topic, partition, port);
				continue;
			}
			numErrors = 0;

			long numRead = 0;
			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
				long currentOffset = messageAndOffset.offset();
				if (currentOffset < readOffset) {
					logger.info("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
					continue;
				}

				readOffset = messageAndOffset.nextOffset();
				ByteBuffer payload = messageAndOffset.message().payload();

				byte[] bytes = new byte[payload.limit()];
				payload.get(bytes);
				logger.info(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
				numRead++;
				maxReads--;
			}

			if (numRead == 0) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		if (consumer != null)
			consumer.close();
	}
/**
 * 
 * @param consumer
 * @param topic
 * @param partition
 * @param whichTime
 * @param clientName
 * @return
 */
	public static long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime,
			String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
		kafka.api.OffsetRequest.EarliestTime();
		kafka.api.OffsetRequest.LatestTime();
		kafka.api.OffsetRequest.CurrentVersion();
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
				kafka.api.OffsetRequest.CurrentVersion(), clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {

		return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
	}

	public static long getEarliestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {

		return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
	}

	public static long getCurrentOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {

		return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.CurrentVersion(), clientName);
	}

	/**
	 * @param oldLeader
	 * @param topic
	 * @param partition
	 * @param port
	 * @return String
	 * @throws Exception
	 * @author anyec 找一个leader broker
	 */
	private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {
		for (int i = 0; i < 3; i++) {
			boolean goToSleep = false;
			PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition);
			if (metadata == null) {
				goToSleep = true;
			} else if (metadata.leader() == null) {
				goToSleep = true;
			} else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
				goToSleep = true;
			} else {
				return metadata.leader().host();
			}
			if (goToSleep) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		logger.info("Unable to find new leader after Broker failure. Exiting");
		throw new Exception("Unable to find new leader after Broker failure. Exiting");
	}

	private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {
		PartitionMetadata returnMetaData = null;
		loop: for (String seed : seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
				List<String> topics = Collections.singletonList(topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				logger.error("Error communicating with Broker [" + seed + "] to find Leader for [" + topic + ", "
						+ partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
		if (returnMetaData != null) {
			replicaBrokers.clear();
			for (BrokerEndPoint replica : returnMetaData.replicas()) {
				replicaBrokers.add(replica.host());
			}
		}
		return returnMetaData;
	}

	public short saveOffsetInKafka(SimpleConsumer simpleConsumer, int partition, String topic, String kafkaClientId,long offset, short errorCode) throws Exception {
		short versionID = 0;
		int correlationId = 0;
		try {
			TopicAndPartition tp = new TopicAndPartition(topic, partition);
			OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(new OffsetMetadata(offset, OffsetMetadata.NoMetadata()), errorCode, errorCode);
			Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>();
			mapForCommitOffset.put(tp, offsetMetaAndErr);
			kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(kafkaClientId,mapForCommitOffset, correlationId, kafkaClientId, versionID);
			OffsetCommitResponse offsetCommitResp = simpleConsumer.commitOffsets(offsetCommitReq);
			return (Short) offsetCommitResp.errors().get(tp);
		} catch (Exception e) {
			 logger.error("Error when commiting Offset to Kafka: " +
			 e.getMessage(), e);
			throw e;
		}
	}
}

 

 

 

 

 

package anyec;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import kafka.admin.AdminUtils;
//import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import scala.collection.Map;

public class KafkaAdminMain {
	static String topicName = "uuu88";

	public static void main(String[] args) {
		createTopics();
		sendMsg(topicName);
		receiveMsg(topicName);
	}

	public static void listTopics() {

		ZkUtils zk = getZkClient();

		boolean exist = AdminUtils.topicExists(zk, topicName);
		if (!exist) {
			AdminUtils.createTopic(zk, topicName, 1, 1, new Properties());
//			AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$);
		}
		Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName);

	}

	public static ZkUtils getZkClient() {
		ZkConnection zkConnection = new ZkConnection("localhost:12181");
		// zkConnection.connect(null);
		ZkClient zkClient = new ZkClient(zkConnection, 300 * 1000);
		ZkUtils zk = new ZkUtils(zkClient, zkConnection, false);
		return zk;
	}

	public static void createTopics() {
		ZkUtils zk = getZkClient();
		boolean exist = AdminUtils.topicExists(zk, topicName);
		if (!exist) {
			AdminUtils.createTopic(zk, topicName, 1, 1, new Properties());
//			AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$);
		}
		Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName);
	}

	public static void deleteTopics() {
		ZkUtils zk = getZkClient();
		boolean exist = AdminUtils.topicExists(zk, topicName);
		if (exist) {
			AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zk, topicName);
			AdminUtils.deleteTopic(zk, topicName);
		}
	}

	public static Producer<String, String> getProducer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "kafka.anyec.cn:9092");
		props.put("acks", "0");
		props.put("retries", 1);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 1024*1024);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> producer = new KafkaProducer(props);
		return producer;
	}

	public static Consumer<String, String> getConsumer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "kafka.anyec.cn:9092");
		
	     /*配置group id*/
	     props.put("group.id", "test");
	     /*配置自动提交位置*/
	     props.put("enable.auto.commit", "true");
	     /*配置自动提交的时间,以毫秒为单位*/
	     props.put("auto.commit.interval.ms", "1000");
	     /*配置session timeout时间,以毫秒为单位*/
	     props.put("session.timeout.ms", "30000");
	     /*这两个deserializer一般不要动,直接拿来用就行了*/
	     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	     
	
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		// 订阅主题列表topic
		// consumer.subscribe(Arrays.asList("test01","mytopic"));
		return consumer;
	}

	public static void sendMsg(String topic) {
		Producer<String, String> producer = getProducer();
//		List list = producer.partitionsFor(topic);
		for (int i = 0; i < 100000; i++) {
			ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,i%3+"", "v" + i);
			
				try {
					producer.send(record).get(100, TimeUnit.MILLISECONDS);
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				} catch (TimeoutException e) {
					e.printStackTrace();
				}
			
		}
		producer.flush();
		producer.close();
	}
	static Callback callback=new Callback() {
		
		@Override
		public void onCompletion(RecordMetadata metadata, Exception e) {
			if(e!=null){
				System.out.println(e.getMessage());
			}
			
		}
	};
	public static void receiveMsg(String topic) {
		Consumer<String, String> consumer = getConsumer();
		consumer.subscribe(Arrays.asList(topic));
		ConsumerRecords<String, String> records= consumer.poll(10000);
		int count = records.count();
		Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
		for(ConsumerRecord<String, String> record:records){
			System.out.println(record.key()+" "+record.value());
		}
		consumer.commitSync();
		consumer.close();
	}
}

 

 

 

package anyec;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import kafka.admin.AdminUtils;
//import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import scala.collection.Map;

public class KafkaAdminMain2 {
	static String topicName = "uuu88";

	public static void main(String[] args) {
//		createTopics();
//		sendMsg(topicName);
		receiveMsg(topicName);
	}

	public static void listTopics() {

		ZkUtils zk = getZkClient();

		boolean exist = AdminUtils.topicExists(zk, topicName);
		if (!exist) {
			AdminUtils.createTopic(zk, topicName, 1, 1, new Properties());
//			AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$);
		}
		Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName);

	}

	public static ZkUtils getZkClient() {
		ZkConnection zkConnection = new ZkConnection("localhost:12181");
		// zkConnection.connect(null);
		ZkClient zkClient = new ZkClient(zkConnection, 300 * 1000);
		ZkUtils zk = new ZkUtils(zkClient, zkConnection, false);
		return zk;
	}

	public static void createTopics() {
		ZkUtils zk = getZkClient();
		boolean exist = AdminUtils.topicExists(zk, topicName);
		if (!exist) {
			AdminUtils.createTopic(zk, topicName, 1, 1, new Properties());
//			AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$);
		}
		Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName);
	}

	public static void deleteTopics() {
		ZkUtils zk = getZkClient();
		boolean exist = AdminUtils.topicExists(zk, topicName);
		if (exist) {
			AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zk, topicName);
			AdminUtils.deleteTopic(zk, topicName);
		}
	}

	public static Producer<String, String> getProducer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "kafka.anyec.cn:9092");
		props.put("acks", "all");
		props.put("retries", 2);
		props.put("batch.size", 16384);
		props.put("linger.ms", 30);
		props.put("buffer.memory", 30*1024*1024);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> producer = new KafkaProducer(props);
		return producer;
	}

	public static Consumer<String, String> getConsumer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "kafka.anyec.cn:9092");
		
	     /*配置group id*/
	     props.put("group.id", "test");
	     /*配置自动提交位置*/
	     props.put("enable.auto.commit", "true");
	     /*配置自动提交的时间,以毫秒为单位*/
	     props.put("auto.commit.interval.ms", "1000");
	     /*配置session timeout时间,以毫秒为单位*/
	     props.put("session.timeout.ms", "30000");
	     /*这两个deserializer一般不要动,直接拿来用就行了*/
	     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	     
	
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		// 订阅主题列表topic
		// consumer.subscribe(Arrays.asList("test01","mytopic"));
		return consumer;
	}

	public static void sendMsg(String topic) {
		Producer<String, String> producer = getProducer();
//		List list = producer.partitionsFor(topic);
		for (int i = 0; i < 100000; i++) {
			ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "v" + i);
			
				try {
					producer.send(record).get(100, TimeUnit.MILLISECONDS);
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				} catch (TimeoutException e) {
					e.printStackTrace();
				}
			
		}
		producer.flush();
		producer.close();
	}
	static Callback callback=new Callback() {
		
		@Override
		public void onCompletion(RecordMetadata metadata, Exception e) {
			if(e!=null){
				System.out.println(e.getMessage());
			}
			
		}
	};
	public static void receiveMsg(String topic) {
		Consumer<String, String> consumer = getConsumer();
		consumer.subscribe(Arrays.asList(topic));
		
		while(true){
			ConsumerRecords<String, String> records= consumer.poll(10000);
			int count = records.count();
			Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
			for(ConsumerRecord<String, String> record:records){
				System.out.println(record.key()+" "+record.value());
			}
			consumer.commitSync();
		}
	}
}

 

 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>anyec</groupId>
	<artifactId>anyec</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.9.0.1</version>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.9.0.1</version>
		</dependency>

	</dependencies>
	<build>
		<sourceDirectory>src</sourceDirectory>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.5.1</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

 

分享到:
评论

相关推荐

    kafka的常用命令集锦

    ### Kafka的常用命令集锦 ...以上就是Kafka常用的命令集锦,这些命令涵盖了Kafka的基本管理和操作流程,对于快速上手Kafka具有重要的参考价值。在实际使用过程中,可以根据具体需求进一步探索更多高级特性。

    kafka常用命令归纳

    ### Kafka常用命令归纳 #### 一、环境设置与概述 本文档主要针对Kafka的日常运维及开发过程中经常使用的命令进行归纳整理。所涉及的命令适用于以下环境: - **Zookeeper地址**: localhost:2181 - **Bootstrap ...

    kafka常用操作命令.txt

    ### Kafka 常用操作命令解析 #### 一、启动Kafka服务 **命令:** ```bash kafka-server-start.sh config/server.properties ``` **解释:** 此命令用于启动Kafka服务。其中`kafka-server-start.sh`是启动脚本,`...

    kafka3.2常用命令

    ### Kafka 3.2 常用命令详解 #### 一、启动 ZooKeeper 服务 在启动 Kafka 之前,必须先启动 ZooKeeper 服务。ZooKeeper 为 Kafka 提供了集群协调服务。 ##### 操作步骤: 1. **打开命令行窗口**: - 打开 cmd ...

    kafka常用命令

    以上是 Kafka 中一些常用的命令及其参数介绍,通过这些命令可以方便地管理和操作 Kafka 集群。在实际工作中,根据具体需求选择合适的命令进行操作是非常重要的。此外,还可以结合其他工具如 Kafka Manager 或 ...

    Kafka常用命令总结.docx

    本文主要总结了Apache Kafka的相关命令和操作,包括安装、服务启停、Topic管理、消息生产和消费以及一些实用工具。Kafka是一个分布式流处理平台,它被广泛用于实时数据管道和流应用。 1. **Kafka的安装与服务启停**...

    Kafka常用命令收录

    日记月累,收录kafka各种命令,会持续更新。  在0.9.0.0之后的Kafka,出现了几个新变动,一个是在Server端增加了GroupCoordinator这个角色,另一个较大的变动是将topic的offset 信息由之前存储在zookeeper上改为...

    Kafka常见运维命令.md

    Kafka常见运维命令文档,Kafka常见运维命令文档,Kafka常见运维命令文档

    kafka常用的基本命令行

    了解并熟练运用这些命令,可以帮助你更好地管理和监控Kafka集群,确保系统的稳定运行。在实际环境中,你可能还需要结合Zookeeper、JMX监控、日志分析等多种工具来全面掌握Kafka集群的健康状况。同时,对于大数据处理...

    使用客户端对kafka常用操作

    kafka:查看topic列表、创建topic、删除topic、消费topic、消费topic并指定消费组、消费组详情、消费组列表、设置偏移量

    Kafka的一些常用功能点

    以下是一些Kafka的常用功能点,将根据提供的文件名进行详细解释。 1. **创建Topic** 创建Kafka Topic是使用Kafka的第一步。Topic是消息的逻辑存储单元,可以理解为数据库中的表。在Kafka中,你可以使用命令行工具`...

    kafka使用命令.txt

    本人积累的一些Kafka调试的常用命令,主要包含:启动Kafka、创建Topic、 查看topic列表、创建生产者、创建消费者、修改分区数、删除Topic、自带生产者性能测试

    kafka集群搭建.pdf

    Kafka常用命令操作包括查看所有topic、查看topic详情、停止和启动Kafka等。 kafka集群搭建需要完成以下步骤: 1. 准备工作,关闭防火墙和配置hosts和hostname。 2. Java环境配置,解压安装Java安装包和配置环境...

    Kafka安装和快速入门 常用命令介绍

    除了这些基础操作,Kafka 还提供了丰富的 API 和工具,包括 Java、Python、Scala 等语言的客户端库,以及用于管理、监控和调试的工具。Kafka 还可以与其他大数据组件,如 Hadoop、Spark、Flink 集成,构建复杂的数据...

    kafka集群方案选型

    五、常用的Kafka操作命令 1、查询Linux操作系统版本信息 输入"uname -a ",可显示电脑以及操作系统的相关信息;输入"cat /proc/version",说明正在运行的内核版本;输入"cat /etc/issue", 显示的是发行版本信息;...

    Kafka 常用命令行详细介绍及整理

    它的命令行工具是管理 Kafka 集群和操作其组件的关键部分。以下是对 Kafka 常用命令行的详细说明: 1. **查看 Topic 详细信息**: 使用 `kafka-topics.sh` 命令行工具,配合 `-zookeeper` 参数指定 ZooKeeper 地址...

    Scala安装包和kafka安装包

    Scala是编写Kafka服务端和客户端程序的常用语言,因此两者在大数据领域的结合十分紧密。 Scala安装包: Scala2.11.4是Scala的一个版本,它的主要特点是提供了更好的类型推断和性能优化。Scala的安装过程通常包括...

    kafka全套视频教程

    #### 四、Kafka基本操作 - **生产者发送消息**: - 创建生产者实例。 - 设置消息发送的配置参数。 - 发送消息至指定的主题。 - **消费者消费消息**: - 创建消费者实例。 - 设置订阅的主题。 - 消费消息并...

Global site tag (gtag.js) - Google Analytics