`

Kafka 编写自己的producer、partitioner和consumer

 
阅读更多

 

 1. 简单的 Producer

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;

public class MyProducer {
	
	@Test
	public void testProducer(){
		 Properties props = new Properties();
		 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos1:9092");
		 props.put(ProducerConfig.ACKS_CONFIG, "all");
		 props.put(ProducerConfig.RETRIES_CONFIG, 0);
		 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
		 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
		 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.kafka.practice.MyPartitioner");

		 Producer<String, String> producer = new KafkaProducer<>(props);
		 for(int i = 0; i < 100; i++)
		     producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), "7777-"+i));
		 producer.close();
	}
}

 

简单的partitioner

package org.kafka.practice;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class MyPartitioner implements Partitioner{

	@Override
	public void configure(Map<String, ?> configs) {
	}

	@Override
	public int partition(String topic, Object key, byte[] keyBytes,
			Object value, byte[] valueBytes, Cluster cluster) {
		return 1;
	}

	@Override
	public void close() {
	}
}

 

结果:

所发送的消息全部写道编号为1的分区上,查看log文件 /tmp/kafka-logs/mytopic-1/0000000000.log

 

 

2. 实现了callback函数的producer

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;

public class MyProducer {
	
	@Test
	public void testProducer(){
		 Properties props = new Properties();
		 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos1:9092");
		 props.put(ProducerConfig.ACKS_CONFIG, "all");
		 props.put(ProducerConfig.RETRIES_CONFIG, 0);
		 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
		 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
		 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.kafka.practice.MyPartitioner");

		 Producer<String, String> producer = new KafkaProducer<>(props);
		 for(int i = 0; i < 5; i++){
			 ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytopic", Integer.toString(i), "222-"+i);
		     producer.send(record, new Callback(){
				@Override
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					System.out.println("received ack!!!");
				}
		     });
		     System.out.println("send message!!!");
		 }
		 producer.close();
	}
}

运行结果: 

send message!!!

send message!!!

send message!!!

send message!!!

send message!!!

17/05/18 15:23:40 INFO producer.KafkaProducer: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

received ack!!!

received ack!!!

received ack!!!

received ack!!!

received ack!!!

 

 

3. 简单的consumer- 自动提交

	@Test
	public void testConsumer() throws Exception{
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos1:9092");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //自动提交
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("mytopic"));
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records){
				Date now = new Date();
				System.out.printf(now + " offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
				Thread.sleep(3000);
			}
		}
	}

 

 3. 简单的consumer- 手动提交

	public void testConsumer2() {
	     Properties props = new Properties();
	     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
	     props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
	     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
	     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
	     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
	     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
	     consumer.subscribe(Arrays.asList("mytopic"));
	     final int minBatchSize = 200;
	     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
	     while (true) {
	         ConsumerRecords<String, String> records = consumer.poll(100);
	         for (ConsumerRecord<String, String> record : records) {
	             buffer.add(record);
	         }
	         if (buffer.size() >= minBatchSize) {
	             //insertIntoDb(buffer);
	             consumer.commitSync();
	             buffer.clear();
	         }
	     }
	}

 

 

 

参考:

Kafka参数说明: http://www.cnblogs.com/rilley/p/5391268.html

分享到:
评论

相关推荐

    SpringBoot使用Kafka详解含完整代码

    可以通过实现`org.apache.kafka.clients.producer.Partitioner`接口来自定义分区策略,并在配置中指定。 ```java public class CustomPartitioner implements Partitioner { // 实现分区逻辑 } ``` 在配置文件中...

    Kafka+Flume-ng搭建

    - Kafka中的Producer、Consumer与Broker之间均采用TCP连接。 - 通信基于NIO实现,保证了高吞吐量和低延迟。 #### 四、Kafka+Flume-ng搭建步骤 1. **安装与配置依赖**: - 在系统运行环境或Flume-ng的lib目录...

    kafka-2.5.0-src.zip

    5. **core**:核心模块,包含了Kafka的主要功能实现,如Producer、Consumer、Broker、Partitioner等组件的源码,是理解Kafka工作原理的重点。 6. **log4j-appender**:基于Log4j的日志处理器,用于记录Kafka运行时...

    kafka简单指导

    3. **分布式系统架构**:Kafka采用了分布式的架构设计,允许Producer、Broker和Consumer等多个组件分布在不同的机器上,从而提高了系统的扩展性和容错能力。 4. **可扩展性**:Kafka通过ZooKeeper实现了动态的集群...

    Learning Apache Kafka

    当我们转向编写Kafka的生产者(Producer)和消费者(Consumer)时,Kafka的Java API是开发者经常使用的方式。Java生产者API允许开发者发送消息到Kafka集群。简单Java生产者通过定义属性、导入类、构建消息并发送。...

    kafka_2.12-2.1.1.tgz

    4. **消费者(Consumer)**:消费者从 Kafka 集群读取数据,可以订阅一个或多个主题,并通过消费组(Consumer Group)实现负载均衡和容错。 5. **消费组(Consumer Group)**:消费组是一组消费者,每个主题的每个...

    04、Kafka核心源码剖析.zip

    `KafkaConsumer`类是消费者的实现,它提供了订阅主题、拉取消息和提交消费偏移等功能。源码分析会讲解如何使用`poll`方法获取新消息,以及如何实现自动和手动的offset管理。 3. **主题与分区(Topic & Partition)*...

    kafka开发运维实战分享.pptx

    Kafka 的架构主要包括 **生产者(Producer)**、**Broker** 和 **消费者(Consumer)**: - **生产者**:使用用户主线程封装消息到 ProducerRecord,经过序列化后,由 Partitioner 决定目标分区并放入内存缓冲区。...

    第二课+Kafka架构.pdf

    Kafka架构的核心概念包括Topic、Partition、Broker、Producer、Consumer、Partitioner以及生产者与消费者的同步与异步模式。 首先,我们来介绍Topic和Partition的概念。在Kafka中,Topic是指消息的逻辑分类,可以被...

    消息队列kafka源码详细讲解分析

    通过研究`Producer`和`Consumer`的交互、消息的存储和检索、分区策略等关键部分,我们可以优化性能,提高系统的稳定性和可靠性。同时,对Kafka源码的深入理解也是提升大数据处理能力、解决复杂问题的关键步骤。

    Kafka视频教程-从入门到实战轻松学Kafka系统教程(13讲)

    Kafka支持自定义分区策略,通过实现`org.apache.kafka.clients.producer.Partitioner`接口,可以控制消息被分配到哪个分区。 **4.2 事务管理** Kafka支持事务性的消息发送,确保消息的一致性。通过配置`enable....

    Kafka完整教程学习

    生产者(Producer)和消费者(Consumer)只与Leader进行交互,Leader负责将数据同步给Follower。当Leader出现故障时,其中一个Follower会被选举为新的Leader。 - **高并发**:通过分区设计,消费者可以通过Consumer ...

    看完这篇Kafka,你也许就会了Kafka

    - **生产者(Producer)**: 生产者负责将数据发布到Kafka的主题中,可以通过配置决定消息如何被分配到不同的分区。 - **消费者(Consumer)**: 消费者从Kafka的主题中读取数据,可以是单个进程或一组进程形成的消费...

    kafka基础.zip

    - ** broker**:Kafka集群中的服务器节点,负责存储主题的分区数据和处理来自生产者和消费者的请求。 2. **Kafka的工作流程** - 生产者将消息发送到指定主题的分区。 - 消息按顺序写入分区日志,并持久化到磁盘...

    kafka最初版0.10.1源码阅读

    在源码中,我们可以看到这些组件的实现细节,例如`Producer`类如何发送消息,`Consumer`类如何进行数据拉取,以及`Partitioner`类如何决定消息的分区分配。 在分布式特性上,Kafka的0.10.1版本已经具备了高可用性和...

    Kafka高频面试题系列之四(30道).docx

    Kafka 是一个分布式流处理平台,广泛用于大数据实时处理、日志聚合、消息传递等场景。以下是一些关于Kafka的重要知识点,结合提供的面试题...Kafka 的高效性能和灵活性使其成为现代大数据和实时处理系统的首选组件。

    kafka数据可靠性深度解读

    3. **分区策略**:Producer可以根据消息的key和自定义的partitioner策略决定消息应发送到哪个partition,从而实现负载均衡和水平扩展。 4. **副本机制**:Kafka的高可靠性依赖于其partition级别的复制。每个...

    kafka_ex:Elixir的Kafka客户端库

    KafkaEx.producer_start(brokers: [{"localhost", 9092}], client_id: "custom_partitioner", partitioner: MyPartitioner) ``` 此外,`kafka_ex`还支持Kafka的事务特性,可以在多个操作之间提供原子性和一致性。 #...

    Spark:Apache Spark是一个快速的内存数据处理引擎,具有优雅且富有表现力的开发API,可让数据工作者高效执行需要快速迭代访问数据集的流,机器学习或SQL工作负载。该项目将在Scala中提供Spark的示例程序语

    使用Spark-2.1实现自定义UDF,UDAF,Partitioner 使用数据框(ComplexSchema,DropDuplicates,DatasetConversion,GroupingAndAggregation) 使用数据集处理Parquet文件按特定列对数据进行分区并按分区进行存储使用...

Global site tag (gtag.js) - Google Analytics