最近项目遇到一个特殊场景,需要kafka传递100万条数据过去,1个G左右,由于其他环节限制,不能进行拆包。
一开始生产者一直报网络问题,经过需要修改如下参数,为了探寻之前说的不能超过1G的说法,把所有参数上限都设置成了接近2G
config/server.properties
socket.request.max.bytes=2048576000
log.segment.bytes=2073741824
message.max.bytes=2048576000
replica.fetch.max.bytes=2048576000
fetch.message.max.bytes=2048576000 ---这个好像不应该设置在server.propeties里面,这个是consumer的参数,后续验证吧。
replica.socket.timeout.ms=300000 --这个参数好像也不需要设置,但是需要在生产者里设置request.timeout.ms。否则,发送时间过长导致发送失败。
参数说明:
socket.request.max.bytes =100*1024*1024 |
socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 |
log.segment.bytes =1024*1024*1024 |
topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 |
message.max.bytes =6525000 |
表示消息体的最大大小,单位是字节 |
replica.fetch.max.bytes =1024*1024 |
replicas每次获取数据的最大大小 |
fetch.message.max.bytes=1024*1024 |
每个拉取请求的每个topic分区尝试获取的消息的字节大小。这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 拉取请求的大小至少与服务器允许的最大消息的大小一样大,否则生产者可能发送大于消费者可以拉取的消息。 |
replica.socket.timeout.ms |
网络请求的socket超时,该值最少是replica.fetch.wait.max.ms |
生产者设定
props.put("max.request.size", 2073741824);
props.put("buffer.memory", 2073741824);
props.put("timeout.ms", 30000000);
props.put("request.timeout.ms", 30000000);
相关推荐
本文将深入探讨“Kafka 读取写入数据”的核心知识点,包括 Kafka 的基本架构、数据模型、生产者与消费者原理以及最佳实践。 ### 1. Kafka 基本架构 Kafka 是一个高吞吐量、分布式的发布订阅消息系统。它由以下几个...
3. **Kafka生产者**:将Netty接收到的数据写入Kafka,我们需要创建一个Kafka生产者。使用SpringBoot整合Kafka,可以利用`@KafkaProducer`注解配置生产者属性,如topic、key序列化器和value序列化器。生产者API允许...
3. **acks和linger.ms**:生产者配置项,acks 控制确认消息已写入的级别,linger.ms 设置批量发送的等待时间,平衡延迟和吞吐量。 4. **offset管理**:消费者负责维护自己的消费位置,Kafka 提供自动提交和手动提交...
【标签】:kafka 消费者插件与Pentaho的结合使用,使得企业能够更加灵活地管理和利用实时数据流,适用于大数据分析、实时报告和业务智能应用场景。Pentaho-kafka-consumer 文件可能包含了该插件的安装包、配置示例、...
Kafka的生产者是负责将消息写入Kafka集群的组件,而这款插件则使得Kettle可以扮演生产者的角色。 在文件“pentaho-kafka-producer”中,我们可以预期找到与这个插件相关的源代码、配置文件、文档或其他支持资源。...
**主题(Topic)**是Kafka中的逻辑存储分区,它可以被多个生产者写入,被多个消费者读取。每个主题可以被分成多个分区,分区内的消息按照顺序存储,而不同分区的消息可以并行消费,提高了处理效率。 在Java实现中,...
- **Producer示例**:展示如何创建一个Kafka生产者,设置必要的配置(如服务器地址、主题名等),以及如何发布消息到指定主题。 - **Consumer示例**:演示如何创建一个Kafka消费者,订阅特定主题,接收并处理来自...
Kafka生产者发送消息的过程主要包括以下几个步骤: 1. **消息封装**:首先,生产者将待发送的消息封装成`ProducerRecord`对象,该对象中包含了目标主题(topic)、消息的具体内容等关键信息。此外,用户还可以指定键...
Kafka生产者的代码实现一般包括创建Kafka生产者实例、构造消息并发送到指定主题等步骤。根据给定的代码片段,以下为生产者的关键知识点: 1. 创建KafkaProducer实例前需要配置生产者属性,包括Kafka集群的地址...
生产者负责将数据写入Kafka的主题(Topic),而消费者则从主题中读取并处理这些数据。对于处理大文件,我们需要关注的配置主要集中在以下几个方面: 1. **生产者配置**: - `batch.size`:此参数定义了生产者批量...
03-Kafka生产者--向Kafka写入数据(Java)-附件资源
1. **Kafka生产者**:生产者是向Kafka主题发布消息的应用程序。在"Kafka消息队列样例"中,生产者负责生成数据并将其发送到指定的主题。生产者可以决定如何分配消息到不同的分区,这可以通过自定义分区器实现。此外,...
Kafka作为数据生产者和消费者之间的桥梁,可以存储和转发大量的实时数据流。在本案例中,它负责收集和分发IP地址相关的数据。Flink则作为一个流处理引擎,可以从Kafka的 topic 中消费这些数据,对其进行实时分析,...
而Kafka是一款强大的分布式消息中间件,它允许在生产者、消费者和存储系统之间高效地传输大量数据。 首先,我们要理解Netty4如何实现推送。在Netty中,我们通常使用ChannelHandlerContext对象来处理网络事件,比如...
它充当消息中间件,允许生产者发布消息到主题,而消费者则可以从这些主题订阅并消费消息。在这个案例中,Kafka作为数据源,不断提供实时数据供Flink处理。 **实时读取与批量聚合** Flink可以通过`...
生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下代码 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8')...
Kafka 的生产者可以选择自己喜欢的序列化方法对消息内容编码。为了提高效率,生产者可以在一个发布请求中发送一组消息。消费者可以订阅话题,并从 Broker 拉数据,消费这些已发布的消息。 Kafka 同时支持点到点分发...
Kafka生产者是负责将数据发布到Kafka主题的组件。生产者可以是任何产生数据的应用程序,如日志记录系统、传感器数据收集器或用户行为追踪系统。生产者API允许开发者将数据以消息的形式发送到Kafka集群,这些消息随后...
Kafka 的生产者是负责将数据(消息)写入 Kafka 主题的组件。在高并发环境下,单独的生产者可能无法充分利用网络资源,而生产者池则可以通过负载均衡策略,将任务分散到多个生产者上,从而提高整体吞吐量和系统响应...