package anyec; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.BrokerEndPoint; import kafka.common.ErrorMapping; import kafka.common.OffsetAndMetadata; import kafka.common.OffsetMetadata; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetCommitResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; /** * * @author anyec *@description kafka低级消费 */ public class KafkaSimpleConsumer { static Logger logger=LoggerFactory.getLogger(KafkaSimpleConsumer.class); private List<String> replicaBrokers = new ArrayList<String>(); public KafkaSimpleConsumer() { replicaBrokers = new ArrayList<String>(); } public static void main(String args[]) { KafkaSimpleConsumer example = new KafkaSimpleConsumer(); // 最大读取消息数量 long maxReads = Long.parseLong("3"); // 要订阅的topic String topic = "anytopic"; // 要查找的分区 int partition = Integer.parseInt("0"); // broker节点的ip List<String> seeds = new ArrayList<String>(); seeds.add("ldap.anyec.cn"); // 端口 int port = Integer.parseInt("9092"); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { logger.error("error:" + e); e.printStackTrace(); } } /** * * @param maxReads * @param topic * @param partition * @param seedBrokers * @param port * @throws Exception */ public void run(long maxReads, String topic, int partition, List<String> seedBrokers, int port) throws Exception { // 获取指定Topic partition的元数据 PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition); if (metadata == null) { logger.error("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { logger.error("Can't find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + topic + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); long readOffset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); } FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(topic, partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; short code = fetchResponse.errorCode(topic, partition); logger.info("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { readOffset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; leadBroker = findNewLeader(leadBroker, topic, partition, port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { logger.info("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); logger.info(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } /** * * @param consumer * @param topic * @param partition * @param whichTime * @param clientName * @return */ public static long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.api.OffsetRequest.EarliestTime(); kafka.api.OffsetRequest.LatestTime(); kafka.api.OffsetRequest.CurrentVersion(); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); } public static long getEarliestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); } public static long getCurrentOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { return getOffset(consumer, topic, partition, kafka.api.OffsetRequest.CurrentVersion(), clientName); } /** * @param oldLeader * @param topic * @param partition * @param port * @return String * @throws Exception * @author anyec 找一个leader broker */ private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } logger.info("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { logger.error("Error communicating with Broker [" + seed + "] to find Leader for [" + topic + ", " + partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { replicaBrokers.clear(); for (BrokerEndPoint replica : returnMetaData.replicas()) { replicaBrokers.add(replica.host()); } } return returnMetaData; } public short saveOffsetInKafka(SimpleConsumer simpleConsumer, int partition, String topic, String kafkaClientId,long offset, short errorCode) throws Exception { short versionID = 0; int correlationId = 0; try { TopicAndPartition tp = new TopicAndPartition(topic, partition); OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(new OffsetMetadata(offset, OffsetMetadata.NoMetadata()), errorCode, errorCode); Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>(); mapForCommitOffset.put(tp, offsetMetaAndErr); kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(kafkaClientId,mapForCommitOffset, correlationId, kafkaClientId, versionID); OffsetCommitResponse offsetCommitResp = simpleConsumer.commitOffsets(offsetCommitReq); return (Short) offsetCommitResp.errors().get(tp); } catch (Exception e) { logger.error("Error when commiting Offset to Kafka: " + e.getMessage(), e); throw e; } } }
package anyec; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import kafka.admin.AdminUtils; //import kafka.admin.RackAwareMode; import kafka.utils.ZkUtils; import scala.collection.Map; public class KafkaAdminMain { static String topicName = "uuu88"; public static void main(String[] args) { createTopics(); sendMsg(topicName); receiveMsg(topicName); } public static void listTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (!exist) { AdminUtils.createTopic(zk, topicName, 1, 1, new Properties()); // AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$); } Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName); } public static ZkUtils getZkClient() { ZkConnection zkConnection = new ZkConnection("localhost:12181"); // zkConnection.connect(null); ZkClient zkClient = new ZkClient(zkConnection, 300 * 1000); ZkUtils zk = new ZkUtils(zkClient, zkConnection, false); return zk; } public static void createTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (!exist) { AdminUtils.createTopic(zk, topicName, 1, 1, new Properties()); // AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$); } Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName); } public static void deleteTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (exist) { AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zk, topicName); AdminUtils.deleteTopic(zk, topicName); } } public static Producer<String, String> getProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "kafka.anyec.cn:9092"); props.put("acks", "0"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 1024*1024); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer(props); return producer; } public static Consumer<String, String> getConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "kafka.anyec.cn:9092"); /*配置group id*/ props.put("group.id", "test"); /*配置自动提交位置*/ props.put("enable.auto.commit", "true"); /*配置自动提交的时间,以毫秒为单位*/ props.put("auto.commit.interval.ms", "1000"); /*配置session timeout时间,以毫秒为单位*/ props.put("session.timeout.ms", "30000"); /*这两个deserializer一般不要动,直接拿来用就行了*/ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅主题列表topic // consumer.subscribe(Arrays.asList("test01","mytopic")); return consumer; } public static void sendMsg(String topic) { Producer<String, String> producer = getProducer(); // List list = producer.partitionsFor(topic); for (int i = 0; i < 100000; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,i%3+"", "v" + i); try { producer.send(record).get(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } producer.flush(); producer.close(); } static Callback callback=new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e!=null){ System.out.println(e.getMessage()); } } }; public static void receiveMsg(String topic) { Consumer<String, String> consumer = getConsumer(); consumer.subscribe(Arrays.asList(topic)); ConsumerRecords<String, String> records= consumer.poll(10000); int count = records.count(); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); for(ConsumerRecord<String, String> record:records){ System.out.println(record.key()+" "+record.value()); } consumer.commitSync(); consumer.close(); } }
package anyec; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import kafka.admin.AdminUtils; //import kafka.admin.RackAwareMode; import kafka.utils.ZkUtils; import scala.collection.Map; public class KafkaAdminMain2 { static String topicName = "uuu88"; public static void main(String[] args) { // createTopics(); // sendMsg(topicName); receiveMsg(topicName); } public static void listTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (!exist) { AdminUtils.createTopic(zk, topicName, 1, 1, new Properties()); // AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$); } Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName); } public static ZkUtils getZkClient() { ZkConnection zkConnection = new ZkConnection("localhost:12181"); // zkConnection.connect(null); ZkClient zkClient = new ZkClient(zkConnection, 300 * 1000); ZkUtils zk = new ZkUtils(zkClient, zkConnection, false); return zk; } public static void createTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (!exist) { AdminUtils.createTopic(zk, topicName, 1, 1, new Properties()); // AdminUtils.createTopic(zk, topicName, 1, 1, new Properties(),RackAwareMode.Enforced$.MODULE$); } Map<String, Properties> map = AdminUtils.fetchAllEntityConfigs(zk, topicName); } public static void deleteTopics() { ZkUtils zk = getZkClient(); boolean exist = AdminUtils.topicExists(zk, topicName); if (exist) { AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zk, topicName); AdminUtils.deleteTopic(zk, topicName); } } public static Producer<String, String> getProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "kafka.anyec.cn:9092"); props.put("acks", "all"); props.put("retries", 2); props.put("batch.size", 16384); props.put("linger.ms", 30); props.put("buffer.memory", 30*1024*1024); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer(props); return producer; } public static Consumer<String, String> getConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "kafka.anyec.cn:9092"); /*配置group id*/ props.put("group.id", "test"); /*配置自动提交位置*/ props.put("enable.auto.commit", "true"); /*配置自动提交的时间,以毫秒为单位*/ props.put("auto.commit.interval.ms", "1000"); /*配置session timeout时间,以毫秒为单位*/ props.put("session.timeout.ms", "30000"); /*这两个deserializer一般不要动,直接拿来用就行了*/ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅主题列表topic // consumer.subscribe(Arrays.asList("test01","mytopic")); return consumer; } public static void sendMsg(String topic) { Producer<String, String> producer = getProducer(); // List list = producer.partitionsFor(topic); for (int i = 0; i < 100000; i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "v" + i); try { producer.send(record).get(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } producer.flush(); producer.close(); } static Callback callback=new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e!=null){ System.out.println(e.getMessage()); } } }; public static void receiveMsg(String topic) { Consumer<String, String> consumer = getConsumer(); consumer.subscribe(Arrays.asList(topic)); while(true){ ConsumerRecords<String, String> records= consumer.poll(10000); int count = records.count(); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); for(ConsumerRecord<String, String> record:records){ System.out.println(record.key()+" "+record.value()); } consumer.commitSync(); } } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>anyec</groupId> <artifactId>anyec</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.1</version> </dependency> </dependencies> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
相关推荐
### Kafka的常用命令集锦 ...以上就是Kafka常用的命令集锦,这些命令涵盖了Kafka的基本管理和操作流程,对于快速上手Kafka具有重要的参考价值。在实际使用过程中,可以根据具体需求进一步探索更多高级特性。
### Kafka常用命令归纳 #### 一、环境设置与概述 本文档主要针对Kafka的日常运维及开发过程中经常使用的命令进行归纳整理。所涉及的命令适用于以下环境: - **Zookeeper地址**: localhost:2181 - **Bootstrap ...
### Kafka 常用操作命令解析 #### 一、启动Kafka服务 **命令:** ```bash kafka-server-start.sh config/server.properties ``` **解释:** 此命令用于启动Kafka服务。其中`kafka-server-start.sh`是启动脚本,`...
### Kafka 3.2 常用命令详解 #### 一、启动 ZooKeeper 服务 在启动 Kafka 之前,必须先启动 ZooKeeper 服务。ZooKeeper 为 Kafka 提供了集群协调服务。 ##### 操作步骤: 1. **打开命令行窗口**: - 打开 cmd ...
以上是 Kafka 中一些常用的命令及其参数介绍,通过这些命令可以方便地管理和操作 Kafka 集群。在实际工作中,根据具体需求选择合适的命令进行操作是非常重要的。此外,还可以结合其他工具如 Kafka Manager 或 ...
本文主要总结了Apache Kafka的相关命令和操作,包括安装、服务启停、Topic管理、消息生产和消费以及一些实用工具。Kafka是一个分布式流处理平台,它被广泛用于实时数据管道和流应用。 1. **Kafka的安装与服务启停**...
日记月累,收录kafka各种命令,会持续更新。 在0.9.0.0之后的Kafka,出现了几个新变动,一个是在Server端增加了GroupCoordinator这个角色,另一个较大的变动是将topic的offset 信息由之前存储在zookeeper上改为...
Kafka常见运维命令文档,Kafka常见运维命令文档,Kafka常见运维命令文档
了解并熟练运用这些命令,可以帮助你更好地管理和监控Kafka集群,确保系统的稳定运行。在实际环境中,你可能还需要结合Zookeeper、JMX监控、日志分析等多种工具来全面掌握Kafka集群的健康状况。同时,对于大数据处理...
kafka:查看topic列表、创建topic、删除topic、消费topic、消费topic并指定消费组、消费组详情、消费组列表、设置偏移量
以下是一些Kafka的常用功能点,将根据提供的文件名进行详细解释。 1. **创建Topic** 创建Kafka Topic是使用Kafka的第一步。Topic是消息的逻辑存储单元,可以理解为数据库中的表。在Kafka中,你可以使用命令行工具`...
本人积累的一些Kafka调试的常用命令,主要包含:启动Kafka、创建Topic、 查看topic列表、创建生产者、创建消费者、修改分区数、删除Topic、自带生产者性能测试
Kafka常用命令操作包括查看所有topic、查看topic详情、停止和启动Kafka等。 kafka集群搭建需要完成以下步骤: 1. 准备工作,关闭防火墙和配置hosts和hostname。 2. Java环境配置,解压安装Java安装包和配置环境...
除了这些基础操作,Kafka 还提供了丰富的 API 和工具,包括 Java、Python、Scala 等语言的客户端库,以及用于管理、监控和调试的工具。Kafka 还可以与其他大数据组件,如 Hadoop、Spark、Flink 集成,构建复杂的数据...
五、常用的Kafka操作命令 1、查询Linux操作系统版本信息 输入"uname -a ",可显示电脑以及操作系统的相关信息;输入"cat /proc/version",说明正在运行的内核版本;输入"cat /etc/issue", 显示的是发行版本信息;...
它的命令行工具是管理 Kafka 集群和操作其组件的关键部分。以下是对 Kafka 常用命令行的详细说明: 1. **查看 Topic 详细信息**: 使用 `kafka-topics.sh` 命令行工具,配合 `-zookeeper` 参数指定 ZooKeeper 地址...
Scala是编写Kafka服务端和客户端程序的常用语言,因此两者在大数据领域的结合十分紧密。 Scala安装包: Scala2.11.4是Scala的一个版本,它的主要特点是提供了更好的类型推断和性能优化。Scala的安装过程通常包括...
#### 四、Kafka基本操作 - **生产者发送消息**: - 创建生产者实例。 - 设置消息发送的配置参数。 - 发送消息至指定的主题。 - **消费者消费消息**: - 创建消费者实例。 - 设置订阅的主题。 - 消费消息并...