`

kafka 获取metadata

阅读更多

问题:

<Failed to update metadata after 3000 ms.>

 

sender类的发送数据时候,会

List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);//nio 发送数据

 

NetworkClient类,方法poll,检查metadata是否需要更新

方法:

    /**
     * Add a metadata request to the list of sends if we can make one
     */
    private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
        // Beware that the behavior of this method and the computation of timeouts for poll() are
        // highly dependent on the behavior of leastLoadedNode.
        // 最新的可用node
        Node node = this.leastLoadedNode(now);
        if (node == null) {
            log.debug("Give up sending metadata request since no node is available");
            // mark the timestamp for no node available to connect
            this.lastNoNodeAvailableMs = now;
            return;
        }

        log.debug("Trying to send metadata request to node {}", node.id());
        if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
            Set<String> topics = metadata.topics();
            this.metadataFetchInProgress = true;
            //生成metadata请求,加入到sends队列中
            ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);
            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
            sends.add(metadataRequest.request());
            this.inFlightRequests.add(metadataRequest);
        } else if (connectionStates.canConnect(node.id(), now)) {
            // we don't have a connection to this node right now, make one
            log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
            initiateConnect(node, now);
            // If initiateConnect failed immediately, this node will be put into blackout and we
            // should allow immediately retrying in case there is another candidate node. If it
            // is still connecting, the worst case is that we end up setting a longer timeout
            // on the next round and then wait for the response.
        } else { // connected, but can't send more OR connecting
            // In either case, we just need to wait for a network tevent to let us know the seleced
            // connection might be usable again.
            this.lastNoNodeAvailableMs = now;
        }
    }

 

    

选择一个请求最少,并且链接状态可用的host,作为获取metadata的host

这里

 

/**
     * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
     * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
     * connection if all existing connections are in use. This method will never choose a node for which there is no
     * existing connection and from which we have disconnected within the reconnect backoff period.
     * @return The node with the fewest in-flight requests.
     */
    public Node leastLoadedNode(long now) {
        List<Node> nodes = this.metadata.fetch().nodes();
        int inflight = Integer.MAX_VALUE;
        Node found = null;
        for (int i = 0; i < nodes.size(); i++) {
            int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
            Node node = nodes.get(idx);
            int currInflight = this.inFlightRequests.inFlightRequestCount(node.id());
            if (currInflight == 0 && this.connectionStates.isConnected(node.id())) {
                // if we find an established connection with no in-flight requests we can stop right away
                return node;
            } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) {
                // otherwise if this is the best we have found so far, record that
                inflight = currInflight;
                found = node;
            }
        }

        return found;
    }

 

 

 

 

分享到:
评论

相关推荐

    Flink无法获取Kafka Topic Metadata异常及解决.docx

    Flink 无法获取 Kafka Topic Metadata 异常及解决 一、问题现象 在使用 Kafka 0.11.0.1 和 Flink 1.4 进行实时计算时,Flink 无法获取 Kafka Topic Metadata,报以下异常:org.apache.kafka.common.errors....

    RdKafka::KafkaConsumer使用实例

    为了获取Kafka服务器的状态信息,我们可以使用`rd_kafka_metadata()`函数。这将返回一个元数据对象,其中包含了主题、分区和 brokers 的信息: ```cpp rd_kafka_metadata_t *metadata; rd_kafka.metadata(consumer,...

    kafka-clients源码.zip

    1. **元数据获取**:`Metadata`类维护了Kafka集群的元数据信息,包括 broker、topic、partition等。`MetadataRequest`用于向broker请求元数据更新。 2. **自动重试机制**:当元数据过期或失效时,Kafka-clients会...

    kafka 知识要点,基于0.9、 0.10版本,很全面

    ### Kafka核心概念与架构 #### 1. Kafka 架构概览 Kafka 是一款分布式流处理平台,广泛应用于实时数据处理场景。其核心组件包括Broker、Topic、Partition、Segment和Controller。 - **Broker**: 消息中间件处理...

    kafka大文件的代码

    在大数据处理领域,Apache Kafka是一种广泛使用的分布式流处理平台,尤其在实时数据传输和消息队列方面表现出色。本文将详细解析如何在处理大文件时,优化Kafka的...请参考Kafka官方文档和社区实践,获取更多深入知识。

    spark与kafka集成

    获取到DStream后,就可以利用Spark Streaming的各种操作,如`map`、`filter`、`reduceByKey`等,对数据进行处理和转换。处理后的结果可以写回到Kafka,或者保存到其他持久化存储中。 总结来说,Spark 1.3引入的...

    Kafka+Flume-ng搭建

    producer.sinks.k.metadata.broker.list=127.0.0.1:9092 producer.sinks.k.partition.key=0 producer.sinks.k.partitioner.class=org.apache.flume.plugins.SinglePartition ``` 3. **启动与验证**: - 启动...

    SparkStreaming和kafka的整合.pdf

    val zkTopicPath = s"${topicDirs.consumerOffsetDir}" // 获取ZooKeeper中的路径 println("zkTopicPath: " + zkTopicPath) val kafkaParams = Map( // Kafka参数配置 "metadata.broker.list" -&gt; brokerList, ...

    04、Kafka核心源码剖析.zip

    源码分析会讲解如何使用`poll`方法获取新消息,以及如何实现自动和手动的offset管理。 3. **主题与分区(Topic & Partition)** 主题是Kafka中的逻辑日志,而分区是这些日志的物理划分。每个分区都有一个唯一的ID...

    Kafka监控工具KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar

    在运行过程中,KafkaOffsetMonitor会定期轮询Kafka的Metadata API,获取消费者组的信息,并将其展示在Web界面中,供管理员查看和分析。 总的来说,KafkaOffsetMonitor是Kafka生态系统中不可或缺的监控工具,它提供...

    A Guide To The Kafka Protocol

    1. Metadata:描述当前可用的代理(Broker),包括它们的主机和端口信息,并提供哪些代理托管哪些分区的信息。 2. Send:向代理发送消息。 3. Fetch:从代理获取消息,包括获取数据、获取集群元数据和获取主题偏移量...

    librdkafka-1.9.0.zip kafka C++

    - **Metadata**:负责获取和处理Kafka集群的元数据信息。 - **Error handling**:错误处理机制,包括错误码、异常和回调。 - **Configurations**:配置选项,允许用户自定义客户端行为。 4. **关键特性** - **...

    kafka源码理解

    它通过定期与已知的 `metadata.broker.list` 中的 Broker 通信,获取集群的最新元数据,包括新添加的 Broker 信息。一旦发现新的 Broker,Producer 会创建新的 `SyncProducer` 实例,添加到 `ProducerPool` 中,以...

    asterix:用 Elixir 编写的正在进行中的 Kafka 客户端库

    阿斯特里克斯 用 Elixir 编写的正在进行中的 Kafka 客户端库。... 目前只显示来自 Kafka 的 MetadataResponse 的结构。 测试 mix test 执照 Asterix 源代码是在 MIT 许可证下发布的,有关更多信息,请参阅。

    Kafka 相关配置参数

    11. **metadata.fetch.timeout.ms**: 获取元数据(如分区 Leader 信息)的超时时间。 12. **max.block.ms**: 生产者在 `send()` 或 `partitionsFor()` 方法中允许阻塞的最大时间。 13. **max.request.size**: 允许...

    mykafka_C++

    1. **初始化配置**:配置生产者参数,如acks(确认策略)、metadata.broker.list(Kafka服务器列表)等。 2. **创建生产者实例**:基于配置创建生产者实例。 3. **发布消息**:调用produce方法将消息发送到指定主题...

    kafka_client:java kafka-client原始码精读,元分析,加阅读注释

    1. `Metadata`: 存储Broker元数据和主题分区信息,通过`Metadata.fetch()`获取最新的元数据。 2. `PartitionInfo`: 描述了主题的分区信息,包括分区ID、首领Broker和副本列表。 五、网络通信 1. `Selector`: 负责...

    kafka 生产者

    12. **request.timeout.ms** 和 **metadata.fetch.timeout.ms**:控制元数据请求的超时时间。 13. **max.block.ms**:在调用 send() 或获取元数据时,生产者阻塞的最长时间。 14. **max.request.size**:限制生产者...

    KafkaInterceptorDemo.zip

    - `onAcknowledgement(RecordMetadata metadata, Exception exception)`: 当消息发送成功或失败时调用,可以用来记录消息确认状态。 - `close()`: 在生产者关闭时调用,用于清理资源。 2. **ConsumerInterceptor*...

Global site tag (gtag.js) - Google Analytics