producer 的工作逻辑:
启动 producer 的线程将待发送消息封装成 ProducerRecord. 然后将其序列化发送给 partitioner, 再由后者确定了目标分区后一同发送给位于 producer 程序中的一块内存缓冲区. 而 producer 的另一个线程负责实时从缓冲区中提取出来准备就绪的消息封装成一个批次,发送给对应的 broker.
public class SimpleKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
//broker地址
props.put("bootstrap.servers", "localhost:9092");
//请求时候需要验证
props.put("acks", "all");
//请求失败时候需要重试
props.put("retries", 0);
//内存缓存区大小
props.put("buffer.memory", 33554432);
//指定消息key序列化方式
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//指定消息本身的序列化方式
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
}
}
至少要指定 bootstrap.servers、key.serializer 及 value.serializer 这三个值.
bootstrap.servers
用于创建向 Kafka broker 服务器的链接,比如 k1:9092,k2:9092,k3:9092. 该参数指定多台机器只是为了故障转移使用. 这样即使某一台 broker 挂掉了,producer 重启后依然能通过该参数指定的其他 broker 连入 Kafka 集群(如果说我们这里只指定一台机器,那么如果这台 broker 挂了,你就算重启producer 也连不到kafka集群).
key.serializer
被发送到 broker 端的任何消息格式都必须是字节数组,因此消息的各个组件必须首先序列化.
value.serializer
对消息体进行序列化
2.构造 KafkaProducer 对象
Producer<String, String> producer = new KafkaProducer<>(props);
3.构造 ProducerRecord 对象.
new ProducerRecord<>(“my-topic”, Integer.valueOf(1));
发送消息
异步发送是通过 Callback(){} 回调来实现的.
同步发送是通过 Future.get() 一直等待结果来实现的.
异常分类
1.可重试异常:LeaderNotAvailableException、NotControllerException、NetworkException
2.不可重试异常:所有非继承 RetriableException 类的其他异常都属于不可重试异常.
RecordTooLargeException、SerializationException、KafkaException
关闭 producer
producer 程序结束时一定要关闭 producer!如果调用的是普通的无参 close 方法,producer 会被允许先处理完之前的发送请求后再关闭,即所谓的“优雅”退出。还有一种是待超时的关闭,时间一到,producer 会被强制结束,并立即丢弃所有未发送以及未应答的发送请求
producer 主要参数
acks 用于控制 producer 生产消息的持久性.
当 producer 发送一条消息给 Kafka 集群时,这条消息会被发送到指定 topic 分区 leader 所在的 broker 上,producer 等待从该 leader broker 返回写入的结果以确定消息是否被成功写入. 这一切完成后 producer 可以继续发送新的消息.
显然 leader 何时发送写入结果返给 producer 直接会影响消息的持久性甚至是 producer 端的吞吐量.acks 指定了再给 producer 发送响应前,leader broker 必须要确保已成功写入该消息的副本数. 当前 acks 有三个值:0、1、all.
acks=0——producer 不理睬 leader broker 端的处理结果,
acks=all或者-1,会等待 ISR 中所有副本都成功写入他们各自的本地日志后,才发送响应结果给 producer.
acks=1 leader broker 仅将该消写入本地日志后便发送响应结果 producer,而不需等待 ISR 中其他副本写入该消息.
buffer.memory 指定 producer 端用于缓存消息的缓冲区大小,单位是字节. 默认值是 32M.
compression.type 设置 producer 端是否压缩消息. 默认值是 none. 目前支持 GZIP、Snappy 和 LZ4.
retries 在发送失败时重试的次数,默认值是 0.
batch.size 默认值是 16KB 批次的大小.
linger.ms 控制消息发送时延行为的,默认值为 0. 表示该消息立即被发送,无须关心 batch 是否被填满.
max.request.size 用于控制 producer 发送请求的大小(控制 producer 端能发送的最大消息的大小).
request.timeout.ms 当 producer 发送请求给 broker 后,broker 需要在规定的时间范围内将处理结果返还给 producer.
消息分区机制
Java 版的 producer 会根据 murmur2 算法计算消息 key 的哈希值,然后对总分区数求模得到消息要发送到的目标分区号.
问题:Kafka 允许用户动态扩展分区不?
对于这个问题,首先需要明确一点的是 kafka 支持扩展分区,但是不支持减少分区. 新增分区后,各个分区上的消息不会重新分配. 比如说以前分区A上有30条消息,分区比上有30条消息,现在新增分区C,不会说消息均衡下,ABC三个分区各20条数据,而是说在后面的时候,消息通过模运算,可能落到A上,可能落到B上,也可能落到C上. 新增分区后,会多消息的顺序造成影响,也会影响消息落到那个分区. 现在说下为啥不能减少分区了,第一个问题是,减少分区上的消息怎么办了?如果保留的话直接插入到某个分区的尾部?还是说消息重平衡?这些都是需要考虑的问题.
参考:https://blog.csdn.net/u013256816/article/details/82804564
自定义分区:
1.创建一个类继承 Partitioner 接口,主要分区逻辑在 Partitioner.partition 中实现.
2.在用于构造 KafkaProducer 的 Properties 对象中设置 partitioner.class 参数.
消息序列化
目前内置的序列化类有:StringSerializer、DoubleSerializer 等
自定义序列化
1.定义数据对象格式
2.创建自定义序列化类,实现 Serializer 接口
3.在用于构造 KafkaProducer 的 Properties 对象中设置 key.serializer 或 value.serializer.
producer 拦截器
interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等. producer 允许用户指定多个 producer 按序作用于同一条消息从而形成拦截链. interceptor 的实现接口:ProducerInterceptor.
onSend:该方法封装进 KafkaProducer.send 方法中,即它允许在用户主线程中. producer 确保消息被序列化以计算分区前调用该方法. 用户可以在该方法中对消息做任何操作,但最好不要修改消息所属的 topic 和分区,否则会影响目标分区的计算.
onAcknowledgement:该方法会在消息被应答之前或消息发送失败时调用,并且通常是在 producer 回调逻辑触发之前. onAcknowledgement 运行在 producer 的 I/O 线程中,因此不要在该方法中放入很“重”的逻辑,否则会拖慢 producer 的消息发送效率.
close:关闭 interceptor,主要用于执行一些资源清理工作.
无消息丢失配置
block.on.buffer.full=true
acks=all
retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=1
使用 KafkaProducer.send(record,callback)
unclean.leader.election.enable=false
replication.factory=3
min.insync.replicas=2
replication.factory>min.insync.replicas
enable.auto.commit=false
producer 端配置
block.on.buffer.full=true 内存缓冲区填满时 producer 处于阻塞状态并停止接收新的消息而不是抛出异常.
acks=all 必须等到所有 follower 都响应了发送消息才能认为提交成功.
retries=Integer.MAX_VALUE producer 不会重试那些肯定无法恢复的错误.
max.in.flight.requests.per.connection=1 防止 topic 同分区下的消息乱序问题.
broker 端配置
unclean.leader.election.enable=false 不允许非 ISR 中的副本被选举为 leader,从而避免 broker 端因为日志水位被截断导致消息丢失
replication.factory>=3 强调使用多分区保存消息.
min.insync.replicas > 1 用于控制某条消息被写入到 ISR 中的多少个副本才算成功,设置成大于 1 是为了提升 producer 端发送语义的持久性. 只有在 producer 端 acks 被设置成 all 时,这个参数才有意义.
确保 replication.factory>min.insync.replicas. 推荐配置成 replication.factory = min.insync.replicas + 1
消息压缩
producer 端压缩——broker 端保持——consumer 端解压缩.
压缩算法
GZIP LZ4 SNAPPY
对于 Kafka 而言 LZ4 > Snappy > GZip
多线程处理
多线程单 KafkaProducer 实例
多线程多 KafkaProducer 实例
如果是分区数不是很多的 Kafka 集群而言,第一种比较合适,若是有超多分区的集群,第二种方式更合适.
分享到:
相关推荐
Kafka客户端producer/consumer样例
#### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并确保消息可以被消费者消费。而Topic则是消息的分类...
5. **使用Kafka Producer插件**:在Kettle的工作流或转换中,你可以在“步骤”库中找到新的Kafka Producer步骤。添加并配置它,包括设置Kafka服务器地址(bootstrap servers)、生产的数据主题(topic)、以及要发送...
Kafka Producer拦截器是Kafka 0.10版本引入的主要用于实现clients端的定制化控制逻辑。 Producer拦截器使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,如修改消息等。同时,producer允许...
1. **源代码**:可能包含Java或Groovy代码,实现了Kettle插件的逻辑,包括连接Kafka集群、构建及发送消息等功能。 2. **配置文件**:如`kafka.properties`,可能包含Kafka服务器的地址、端口、认证信息等配置,以便...
实现双方消息通讯
《JMeter与Kafka连接器:构建高并发数据流测试》 在现代大数据处理系统中,Apache Kafka作为一款分布式消息中间件,广泛应用于实时数据流处理。为了验证和优化Kafka系统的性能,开发者通常需要进行大规模并发数据...
在Kafka中,Producer(生产者)负责将数据流发送到Kafka集群中。生产者的配置参数非常关键,因为它们会直接影响到生产者的行为和性能。以下是根据提供的文件内容整理的Kafka生产者配置参数的知识点: 1. bootstrap....
在这个项目中,`kafka_producer.zip`包含了实现这一功能的所有必要文件:一个配置文件`config.conf`和一个Python脚本`producer.py`。 首先,让我们了解`config.conf`文件的作用。这个配置文件通常包含Kafka生产者...
3. **Kafka主题与分区**:主题是逻辑上的分类或通道,而分区是物理上的存储单位。分区允许多个消费者并行消费,从而实现更高的吞吐量。每个分区都有一个主副本和多个备份副本,用于提供容错性。 4. **Kafka的高可用...
在这个"Kafka监听样例.rar"中,我们将探讨如何配置Kafka的生产者和消费者,以及如何在SpringBoot应用中有效地使用它们。 首先,我们需要在SpringBoot项目中添加Kafka的依赖。这通常通过在`pom.xml`或`build.gradle`...
在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...
标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...
在项目“kafka_producer.sln”中,我们可以看到一个Visual Studio解决方案,包含了Kafka生产者的实现。通常,这个解决方案会包含生产者类的定义,以及与protobuf和avro相关的代码。例如,生产者类可能包含一个方法,...
在分布式消息系统中,Kafka是一个广泛使用的高吞吐量、低延迟的开源消息队列。为了优化性能和提高效率,开发人员常常会利用连接池技术来管理Kafka生产者的连接。本文将深入探讨"Kafka生产者连接池"的概念、实现原理...
通过这个样例,我们可以理解Kafka的基本工作流程。 1. **生产者(Producer)**:生产者是将数据发布到Kafka主题的应用。在KafkaDemo中,生产者会创建一个消息并将其发送到指定的主题。生产者通常需要配置一些参数,...
Kafka的消息模型是基于发布/订阅模式,Producer将消息发布到特定的Topic,Consumer订阅这些Topic并消费消息。由于Consumer Group的存在,Kafka实现了类似队列的消费行为,但同时允许多消费者并行消费,提高了处理...
Apache Kafka 是一个分布式流处理平台,常用于构建实时的数据管道和应用。Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效...
通过`kafka-console-consumer.sh`和`kafka-console-producer.sh`命令,可以测试消息的生产和消费。 此外,使用JMX工具(如JConsole或VisualVM)以及Kafka自带的`kafka-server-metrics.jmx`模板,可以监控Kafka集群...