`
gaojingsong
  • 浏览: 1217725 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【大数据Kafka之 high-level--Consumer 】

阅读更多

一、特点:

不用关心offset, 会自动的读zookeeper中该Consumer group的last offset

 

二、注意事项

1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,

   所以consumer数不要大于partition数 

2. 如果consumer比partition少,一个consumer会对应于多个partitions,

    这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 

    最好partiton数目是consumer数目的整数倍,所以partition数目很重要,

    比如取24,就很容易设定consumer数目 

3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,

    kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 

4. 增减consumer,broker,partition会导致rebalance,

     所以rebalance后consumer对应的partition会发生变化 

5. High-level接口中获取不到数据的时候是会block的 

 

三、代码如下:

package kafkatest.kakfademo;

 

import java.io.UnsupportedEncodingException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

public class ConsumerDemo1 {

public static void main(String[] args) {

ConsumerDemo1 demo = new ConsumerDemo1();

demo.test();

}

 

@SuppressWarnings("rawtypes")

public void test() {

String topicName = "test";

int numThreads = 1;

Properties properties = new Properties();

properties.put("zookeeper.connect", "hadoop0:2181");// 声明zk

properties.put("group.id", "group--demo");// 必须要使用别的组名称,

// 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据

 

ConsumerConnector consumer = Consumer

.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topicName, numThreads); // 一次从主题中获取一个数据

Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer

.createMessageStreams(topicCountMap);

// 获取每次接收到的这个数据

List<KafkaStream<byte[], byte[]>> streams = messageStreams

.get(topicName);

 

// now launch all the threads

ExecutorService executor = Executors.newFixedThreadPool(numThreads);

 

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.execute(new ConsumerMsgTask(stream, threadNumber));

threadNumber++;

}

}

 

class ConsumerMsgTask implements Runnable {

 

private KafkaStream m_stream;

private int m_threadNumber;

 

public ConsumerMsgTask(KafkaStream stream, int threadNumber) {

m_threadNumber = threadNumber;

m_stream = stream;

}

 

public void run() {

ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

long offset = 0;

try {

while (it.hasNext())

offset = it.next().offset();

byte[] bytes = it.next().message();

 

String msg = new String(bytes, "UTF-8");

 

System.out.print("offset: " + offset + ",msg:" + msg);

System.out.println("Shutting down Thread: " + m_threadNumber);

} catch (UnsupportedEncodingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

 

}

 

 

四、实验验证



 

 



 

 



 

  • 大小: 84.7 KB
  • 大小: 9.3 KB
  • 大小: 17.7 KB
0
1
分享到:
评论

相关推荐

    kafka-schema-registry-client-6.2.2.jar

    mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....

    大数据Kafka入门--理论+实践

    Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它最初是由LinkedIn公司开发,之后成为...总体而言,Kafka凭借其高性能、高可靠性和高可用性的特性,已经成为了大数据领域不可或缺的组件之一。

    kafka-2.12-2.7.0.tar

    kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-2.7.0.tar kafka_2.12-...

    kafka-2.12-3.6.1.tgz

    /usr/local/kafka_2.12-3.6.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning ``` 这将从主题的开头开始消费所有消息。 九、扩展与优化 Kafka支持集群部署,...

    kafka-clients-2.4.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    kafka-clients-2.0.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-clients-2.0.0-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-clients-2.4.1-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    kafka-clients-0.10.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...

    kafka-clients-2.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.1.jar; 赠送原API文档:kafka-clients-2.0.1-javadoc.jar; 赠送源代码:kafka-clients-2.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.1.pom; 包含翻译后的API文档...

    kafka-clients-2.2.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...

    kafka_2.12-2.4.1.zip

    - 使用`./kafka-console-producer.sh --broker-list localhost:9092 --topic test`和`./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning`进行消息的生产和消费,操作...

    kafka-eagle-bin-2.1.0.tar.gz

    4. 启动:执行`bin/kafka-eagle-start.sh`启动服务,使用`bin/kafka-eagle-stop.sh`停止服务。 5. 访问:在Web浏览器中输入`http://服务器IP:端口`,按照提示进行登录和操作。 ### 使用注意事项 1. 确保运行环境已...

    kafka_2.11-2.2.2.tgz

    Kafka提供了丰富的命令行工具,如`kafka-topics.sh`、`kafka-consumer-groups.sh`等,用于查看和管理Topics、Partitions、Consumer Groups等。 十一、注意事项 1. Kafka默认使用9092端口,确保没有其他服务占用。 ...

    最新版kafka kafka_2.12-2.5.1.tgz

    - 消费消息:`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning` 7. **最佳实践** - 为了保证高可用性,推荐至少配置 3 个 broker。 - 合理设置分区数和...

    kafka_2.12-3.3.1.tgz

    - `bin`目录:包含了启动和管理Kafka服务的脚本,如`kafka-server-start.sh`、`kafka-console-producer.sh`和`kafka-console-consumer.sh`。 - `config`目录:包含默认配置文件,如`server.properties`(配置Kafka ...

    最新版windows kafka_2.12-2.4.1.tgz

    9. Kafka提供了一些内置的命令行工具,如`kafka-topics.sh`和`kafka-consumer-groups.sh`,用于检查主题信息和消费者组状态。此外,还可以使用Kafka的管理界面Kafka Manager或者Kafka监控工具如Kafka Connect、Kafka...

    kafka-2.13-3.5.1.tgz

    在“kafka-2.13-3.5.1.tgz”这个压缩包中,包含了Kafka的一个特定版本——3.5.1,该版本支持Scala 2.13的构建。下面我们将深入探讨Kafka的核心概念、功能以及如何使用这个版本。 1. **Kafka的基本概念** - **发布...

    kafka-python-2.0.2.tar.gz

    《Python中的Kafka库:kafka-python-2.0.2深入解析》 在Python编程环境中,Kafka作为分布式消息系统的接口,对于实时数据处理和流处理应用至关重要。`kafka-python`是Python社区开发的一个非常流行的Kafka客户端库...

Global site tag (gtag.js) - Google Analytics