若要转载,请标明出处,谢谢!
http://raising.iteye.com/blog/2252456
=========================================================
首先,容我吐一口老血。。。。。。
kafka算是很麻烦的一件事儿,起因是最近需要采集大量的数据,原先是只用了典型的high-level Consumer的API,最经典的不过如下:
Properties props = new Properties(); props.put("zookeeper.connect", "xxxx:2181"); props.put("zookeeper.connectiontimeout.ms", "1000000"); props.put("group.id", "test_group"); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("test", new Integer(1)); //key--topic Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); StringBuffer sb = new StringBuffer(); while(it.hasNext()) { try { String msg = new String(it.next().message(), "utf-8").trim(); System.out.println("receive:" + msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } }
这是典型的kafka消费端消费数据的代码,但可以看出这是十分典型的单线程消费。在本地玩玩熟悉kafka还行,(就跟入门java学会写main方法打印hello world一样~~~~),问题是学的东西必须真正应用到实际中,你不可能只在单线程采集里原地打转吧。。so,多线程采集迫在眉急啊!!
本人研究卡夫卡多线程消费还是耗了一段时间的,希望把过程尽可能完整地记录下来,以便各位同行有需要可以参考。。
首先,最好理解kafka的基本原理和一些基本概念,阅读官网文档很有必要,这样才会有一个比较清晰的概念,而不是跟无头苍蝇一样乱撞——出了错去网上查是灰常痛苦滴!!
http://kafka.apache.org/documentation.html
好了,大概说下卡夫卡的“分区·”的概念吧:
这张图比较清晰地描述了“分区”的概念,对于某一个topic的消息来说,我们可以把这组消息发送给若干个分区,就相当于一组消息分发一样。
分区、Offset、消费线程、group.id的关系
1)一组(类)消息通常由某个topic来归类,我们可以把这组消息“分发”给若干个分区(partition),每个分区的消息各不相同;
2)每个分区都维护着他自己的偏移量(Offset),记录着该分区的消息此时被消费的位置;
3)一个消费线程可以对应若干个分区,但一个分区只能被具体某一个消费线程消费;
4)group.id用于标记某一个消费组,每一个消费组都会被记录他在某一个分区的Offset,即不同consumer group针对同一个分区,都有“各自”的偏移量。
说完概念,必须要注意的一点是,必须确认卡夫卡的server.properties里面的一个属性num.partitions必须被设置成大于1的值,否则消费端再怎么折腾,也用不了多线程哦。我这里的环境下,该属性值被设置成10了。
重构一下上述经典的消费端代码:
public class KafakConsumer implements Runnable { private ConsumerConfig consumerConfig; private static String topic="blog"; Properties props; final int a_numThreads = 6; public KafakConsumer() { props = new Properties(); props.put("zookeeper.connect", "xxx:2181,yyy:2181,zzz:2181"); // props.put("zookeeper.connect", "localhost:2181"); // props.put("zookeeper.connectiontimeout.ms", "30000"); props.put("group.id", "blog"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); consumerConfig = new ConsumerConfig(props); } @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); ExecutorService executor = Executors.newFixedThreadPool(a_numThreads); for (final KafkaStream stream : streams) { executor.submit(new KafkaConsumerThread(stream)); } } public static void main(String[] args) { System.out.println(topic); Thread t = new Thread(new KafakConsumer()); t.start(); } }
从这段重构的代码可以看出,KafkaStream<byte[], byte[]> stream = consumerMap.get("test").get(0); 这行代码已被废弃,得到List<KafkaStream<byte[], byte[]>>之后不再是得到他的头元素get(0)就ok了,而且topicCountMap.put(topic, new Integer(a_numThreads));的第二个参数也不再是1.
其中,具体消费线程KafkaConsumerThread代码为:
public class KafkaConsumerThread implements Runnable { private KafkaStream<byte[], byte[]> stream; public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) { this.stream = stream; } @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> mam = it.next(); System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "]," + "offset[" + mam.offset() + "], " + new String(mam.message())); } } }
编写生产端(Producer)的代码:
public class KafkaProducer implements Runnable { private Producer<String, String> producer = null; private ProducerConfig config = null; public KafkaProducer() { Properties props = new Properties(); props.put("zookeeper.connect", "*****:2181,****:2181,****:2181"); // props.put("zookeeper.connect", "localhost:2181"); // 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[] props.put("serializer.class", "kafka.serializer.StringEncoder"); // 同步还是异步,默认2表同步,1表异步。异步可以提高发送吞吐量,但是也可能导致丢失未发送过去的消息 props.put("producer.type", "sync"); // 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。 props.put("compression.codec", "1"); // 指定kafka节点列表,用于获取metadata(元数据),不必全部指定 props.put("broker.list", "****:6667,***:6667,****:6667"); config = new ProducerConfig(props); } @Override public void run() { producer = new Producer<String, String>(config); // for(int i=0; i<10; i++) { // String sLine = "I'm number " + i; // KeyedMessage<String, String> msg = new KeyedMessage<String, String>("group1", sLine); // producer.send(msg); // } for(int i = 1; i <= 6; i++){ //往6个分区发数据 List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>(); for(int j = 0; j < 6; j++){ //每个分区6条讯息 messageList.add(new KeyedMessage<String, String> //String topic, String partition, String message ("blog", "partition[" + i + "]", "message[The " + i + " message]")); } producer.send(messageList); } } public static void main(String[] args) { Thread t = new Thread(new KafkaProducer()); t.start(); } }
上述生产端代码相对传统的发送端代码也做了改进,首先是用了批量发送(源码):
public void send(List<KeyedMessage<K, V>> messages) { underlying().send(JavaConversions..MODULE$.asScalaBuffer(messages).toSeq()); }
而不是:
public void send(KeyedMessage<K, V> message) { underlying().send(Predef..MODULE$.wrapRefArray((Object[])new KeyedMessage[] { message })); }
第二,KeyedMessage用的构造函数:
public KeyedMessage(String topic, K key, V message) { this(topic, key, key, message); }
第二个参数表示分区的key。
而非:
public KeyedMessage(String topic, V message) { this(topic, null, null, message); }
分别run一下生产和消费的代码,可以看到消费端打印结果:
pool-2-thread-5: partition[5],offset[0], message[The 5 message]
pool-2-thread-1: partition[2],offset[0], message[The 2 message]
pool-2-thread-2: partition[1],offset[0], message[The 1 message]
pool-2-thread-5: partition[4],offset[0], message[The 4 message]
pool-2-thread-1: partition[3],offset[0], message[The 3 message]
pool-2-thread-4: partition[6],offset[0], message[The 6 message]
可以看到,6个分区的数据全部被消费了,但是不妨看下消费线程:pool-2-thread-1线程同时消费了partition[2]和partition[3]的数据;pool-2-thread-2消费了partiton[1]的数据;pool-2-thread-4消费了partiton[6]的数据;而pool-2-thread-5则消费了partitoin[4]和partition[5]的数据。
从上述消费情况来看,验证了消费线程和分区的对应情况——即:一个分区只能被一个线程消费,但一个消费线程可以消费多个分区的数据!虽然我指定了线程池的线程数为6,但并不是所有的线程都去消费了,这当然跟线程池的调度有关系了。并不是一个消费线程对应地去消费一个分区的数据。
我们不妨仔细看下消费端启动日志部分,这对我们理解kafka的启动生成和消费的原理有益:
【限于篇幅,启动日志略,只分析关键部分】
消费端的启动日志表明:
1)Consumer happy_Connor-PC-1445916157267-b9cce79d rebalancing the following partitions: ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for topic blog with consumers: List(happy_Connor-PC-1445916157267-b9cce79d-0, happy_Connor-PC-1445916157267-b9cce79d-1, happy_Connor-PC-1445916157267-b9cce79d-2, happy_Connor-PC-1445916157267-b9cce79d-3, happy_Connor-PC-1445916157267-b9cce79d-4, happy_Connor-PC-1445916157267-b9cce79d-5)
happy_Connor-PC-1445916157267-b9cce79d表示一个消费组,该topic可以使用10个分区:the following partitions: ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) for topic blog。然后定义了6个消费线程,List(happy_Connor-PC-1445916157267-b9cce79d-0, happy_Connor-PC-1445916157267-b9cce79d-1, happy_Connor-PC-1445916157267-b9cce79d-2, happy_Connor-PC-1445916157267-b9cce79d-3, happy_Connor-PC-1445916157267-b9cce79d-4, happy_Connor-PC-1445916157267-b9cce79d-5)。消费线程的个数由topicCountMap.put(String topic, Integer count)的第二个参数决定。但真正去消费的线程还是由线程池的调度机制来决定;
2)线程由zookeeper来声明它拥有1个或多个分区;
3)真正有数据存在的分区是由生产发送端来决定,即使你的kafka设置了10个分区,消费端在消费的时候,消费线程虽然会根据zookeeper的某种机制来声明它所消费的分区,但实际消费过程中,还是会消费真正存在数据的分区。(本例中,你只往6个分区push了数据,所以即使你声明了10个分区,你也只能消费6个分区的数据)。
如果把topicCountMap的第二个参数Integer值改成1,发送端改成往7个分区发数据再测试,可得到消费端的打印结果:
pool-2-thread-1: partition[6],offset[0], message[The 6 message] pool-2-thread-1: partition[3],offset[0], message[The 3 message] pool-2-thread-1: partition[2],offset[0], message[The 2 message] pool-2-thread-1: partition[5],offset[0], message[The 5 message] pool-2-thread-1: partition[4],offset[0], message[The 4 message] pool-2-thread-1: partition[7],offset[0], message[The 7 message] pool-2-thread-1: partition[1],offset[0], message[The 1 message]
可以看出,如果你topicCountMap的值改成1,而 List<KafkaStream<byte[], byte[]>>的size由Integer值决定,此时为1,可以看出,线程池中只能使用一个线程来发送,还是单线程的效果。若要用多线程消费,Integer的值必须大于1.
下面再来模拟一些状况:
状况一:往大于实际分区数的分区发数据,比如发送端的第一层循环设为11:
可看到消费端此时虽能正常的完全消费这10个分区的数据,但生产端会报异常:
No partition metadata for topic blog4 due to kafka.common.LeaderNotAvailableException}] for topic [blog4]: class kafka.common.LeaderNotAvailableException
这说明,你往partition11发送失败,因为卡夫卡已经设置了10个分区,你再往不存在的分区数发当然会报错了。
状况二:发送端用传统的发送方法,即KeyedMessage的构造函数只有topic和Message
//针对topic创建一个分区并发送数据 List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>(); for(int i = 1; i <= 10; i++){ messageList.add(new KeyedMessage<String, String>("blog6", "我是发送的内容message"+i)); } producer.send(messageList);
消费端打印结果:
pool-2-thread-1: partition[7],offset[0], 我是发送的内容message1
pool-2-thread-1: partition[7],offset[1], 我是发送的内容message2
pool-2-thread-1: partition[7],offset[2], 我是发送的内容message3
pool-2-thread-1: partition[7],offset[3], 我是发送的内容message4
pool-2-thread-1: partition[7],offset[4], 我是发送的内容message5
pool-2-thread-1: partition[7],offset[5], 我是发送的内容message6
pool-2-thread-1: partition[7],offset[6], 我是发送的内容message7
pool-2-thread-1: partition[7],offset[7], 我是发送的内容message8
pool-2-thread-1: partition[7],offset[8], 我是发送的内容message9
pool-2-thread-1: partition[7],offset[9], 我是发送的内容message10
这表明,只用了1个消费线程消费1个分区的数据。这说明,如果发送端发送数据没有指定分区,即用的是
public KeyedMessage(String topic,V message) { this(topic, key, key, message); } sendMessage(KeyedMessage(String topic,V message))
的话,同样达不到多线程消费的效果!
状况三:将线程池的大小设置成比topicCountMap的value值小?
topicCountMap.put(topic, new Integer(7)); //...................... ExecutorService executor = Executors.newFixedThreadPool(5);
发送端往9个分区发送数据,run一下,会发现消费端打印结果:
pool-2-thread-3: partition[7],offset[0], message[The 7 message]
pool-2-thread-5: partition[1],offset[0], message[The 1 message]
pool-2-thread-4: partition[4],offset[0], message[The 4 message]
pool-2-thread-2: partition[3],offset[0], message[The 3 message]
pool-2-thread-4: partition[5],offset[0], message[The 5 message]
pool-2-thread-1: partition[8],offset[0], message[The 8 message]
pool-2-thread-2: partition[2],offset[0], message[The 2 message]
你会发现:虽然我生产发送端往9个分区发送了数据,但实际上只消费掉了7个分区的数据。(如果你再跑一边,可能又是6个分区的数据)——这说明,有的分区的数据没有被消费,原因只可能是线程不够。so,当线程池中的大小小于分区数时,会出现有的分区没有被采集的情况。建议设置:实际发送分区数(一般就等于设置的分区数)= topicCountMap的value = 线程池大小 否则极易出现reblance的异常!!!
好了,折腾这么久。我们可以看出,卡夫卡如果想要多线程消费提高效率的话,就可以从分区数上下手,分区数就是用来做并行消费的而且生产端的发送代码也很有讲究。(这只是针对某一个topic而言,当然实际情况中,你可以一个topic一个线程,同样达到多线程效果,当然这是后话了)
========================The end=====================
相关推荐
在这个示例中,我们将关注如何使用Java API在Kafka中实现多线程消费,以及单个消费者组内的多线程消费。 首先,我们了解Kafka的基本概念。Kafka是一个发布/订阅模型的消息队列,它包含生产者(Producer)、消费者...
本文将深入探讨"Kafka分区消费策略",以及如何实现"发送到指定分区"。 首先,我们需要了解Kafka的基本架构。Kafka是一种高吞吐量、低延迟的分布式流处理平台,它将数据以主题(Topic)的形式存储,并将每个主题划分...
在本文中,我们将深入探讨 Kafka 的多线程消费机制以及如何确保数据的顺序性。 Kafka 是 Apache 开源项目,它提供了一个高吞吐量、低延迟的消息发布订阅系统。Kafka 的核心设计包括生产者、消费者、主题(Topic)和...
然后,将Partitions数量除以消费者线程总数,确定每个线程消费的Partitions数。如果不能整除,前面的线程将多消费一个分区。例如,若有10个Partition和3个线程,C1-0将消费前4个,C2-0和C2-1各消费3个。 4. **Range...
在本文中,我们将深入探讨Apache Kafka的Java编程实践,特别是关注单线程和多线程在Kafka生产者与消费者中的应用,以及多线程管理器的实现。Apache Kafka是一个分布式流处理平台,广泛用于实时数据管道和消息传递。...
总结,这个“kafka生产消费demo”涵盖了Kafka 0.10.2版本的基本使用,包括生产者和消费者的创建、消息的发布与消费,以及可能的多线程处理。了解这些核心概念和API对于理解和构建基于Kafka的数据处理系统至关重要。
在多线程或多进程环境下,消费者组内的多个实例可以并行处理消息,提高数据处理速度。 在 Kafka 中,主题被分割成多个分区(Partition),每个分区有唯一的顺序保证。生产者可以根据消息的键(Key)进行分区,确保...
在本文中,我们将深入探讨Kafka 2.10_0.10.1.0版本的代码示例,以帮助开发者更好地理解和应用这个强大的工具。 Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和分区(Partition)。...
简单消费者适用于单线程场景,而消费者群组则允许多个消费者协同工作,共同消费一个主题的所有分区,实现负载均衡和高可用性。消费者通过offset(偏移量)跟踪其在每个分区中的位置,从而实现数据的可靠消费。 本书...
librdkafka的工作依赖于多线程模型。其中包括以下三种线程: 1. 应用线程:这是运行业务代码的线程,也是与librdkafka进行交互的主线程。 2. KafkaBroker线程:负责与Kafka代理(Broker)通信的线程,可以有多个,...
- **多线程消费**:每个线程维护一个 KafkaConsumer,速度较快,但可能导致 TCP 连接开销增大,扩展性和消息顺序维护困难。 - **单个消费者与多个 worker 线程**:更易扩展,但实现复杂,可能难以维护消息顺序和...
3. 多线程消费:可以配置多个消费者实例在同一组内并行消费,提高消费效率。 六、错误处理与容错 Spring Kafka提供了重试和死信队列机制,当消息处理失败时,可以将消息放入重试队列或死信队列,避免消息丢失。 ...
《Kafka源码解析与实战》是一本深入探讨Apache Kafka这一分布式流处理平台的专业书籍。通过对Kafka源码的解析,读者可以深入了解其内部工作原理,从而更好地掌握和运用Kafka进行实时数据处理和消息传递。 Kafka的...
- **消费者(Consumer)**:订阅主题并处理消息的进程,可以是基于消费者组的多线程消费。 - **集群(Cluster)**:由多个服务器组成的Kafka实例集合,提供容错能力。 2. **安装步骤:** - **解压文件**:首先,...
可以通过设置消费者组、调整批处理大小、使用多线程等方式优化性能。同时,合理设计分区和键策略,确保数据均衡分配到各个消费者。 10. **监控与日志** 使用 Spring Boot Actuator 可以监控 Kafka 生产者和消费者...
6. **网络配置**: 包括客户端连接超时、请求最大等待时间、I/O线程数等,这些参数影响Kafka的服务质量和性能。 7. **日志管理**: 如`log.retention.hours`或`log.retention.bytes`用于设置消息保留的期限或大小,`...
- **Consumer**:消费者从Kafka集群中读取消息,支持多线程并行消费。 - **Offset**:消息在分区内的唯一标识,用于跟踪消费者的消费位置。 3. **配置说明** Kafka 的配置涉及多个方面,包括Broker配置、...
在本项目"Kafka_study.zip"中,我们探讨了如何使用SpringBoot框架集成Apache Kafka,构建一个简单的生产者-消费者模型。Kafka是一款高吞吐量、分布式的消息中间件,常用于实时数据流处理和大数据分析。以下是关于...
本篇将深入探讨Kafka配置参数,帮助你理解和优化Kafka集群的运行。 1. **broker.id**: 这个参数是每个Kafka broker的唯一标识,它必须在整个集群中是唯一的。值可以是任意整数,通常从0开始。 2. **zookeeper....