`

kafka client端 producer

阅读更多

kafka producer客户端

 

KafkaProducer的send方法:

1.等待kafka要发送的topic的partition都在线

2.序列化key,value;
key:org.apache.kafka.common.serialization.IntegerSerializer
value:org.apache.kafka.common.serialization.StringSerializer

3.根据发送数据计算索要发送的topic的partition
使用record记录中的partition,若为空,用paritition类计算
partition:org.apache.kafka.clients.producer.internals.DefaultPartitioner

4.确保所要发送的信息的序列化大小不超过阈值
阈值:MAX_REQUEST_SIZE_CONFIG = "max.request.size"
      BUFFER_MEMORY_CONFIG = "buffer.memory"
      
5.实例化topic的partition,实例化发送对象result,添加accumulator中的topic队列中
封装为writable records,包含compresor压缩,再封装为batch
压缩参数:COMPRESSION_TYPE_CONFIG = "compression.type";

6.查看result的batch是否或是新建的,则唤醒sener发送消息

7.返回result的future

 

 

 

  @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        try {
            // @1 first make sure the metadata for the topic is available
            waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
            byte[] serializedKey;
            try {
            	// @2 序列化key
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
            	//序列化value
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }
            //@3 计算partition
            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            ensureValidRecordSize(serializedSize);//@4 确保发送请求不超过阈值
            TopicPartition tp = new TopicPartition(record.topic(), partition);
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            //@5 发送封装好的对象
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback);
            if (result.batchIsFull || result.newBatchCreated) {//@6
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;//@7
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            throw new InterruptException(e);
        } catch (KafkaException e) {
            this.errors.record();
            throw e;
        }
    }

 

其中第5步,添加到accumulator代码如下:

将record添加到topic的partition队列中,如果存在则添加;

如果不存在则创建队列,二次检查队列是否有值,如果有,则将record添加

                                                                                     如果没有,则封装writable records,包含compresor压缩

                                                                                                        和batch类;

record.append的时候调用compressor进行压缩

存在与否都将当record添加到队列中,并且进行压缩(如果配置压缩)

 

   public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        try {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            // check if we have an in-progress batch
            Deque<RecordBatch> dq = dequeFor(tp);
            synchronized (dq) {//添加到已经存在的topic队列中
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
                    if (future != null)
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }

            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //重复判断其他producer有没有放到dp中
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
                    if (future != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        free.deallocate(buffer);
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                    }
                }
                //没有当前topic的数据
                //封装writable records,包含compresor压缩
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//封装为recordBatch
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

   

   MemoryRecords

    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
        if (!this.records.hasRoomFor(key, value)) {//mem溢出
            return null;
        } else {
            this.records.append(0L, key, value);//添加到record中,又进行压缩操作
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

 

在第6步,sender类发送消息run方法:

1.检查record是否准备好:

   

The record set is full 
The record set has sat in the accumulator for at least lingerMs milliseconds 
The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready). 
The accumulator has been closed 

 2.获取accumulator中所有数据

 3. 生成request中

 4.填入selector的client中

 5.client selector nio发送数据

 

public void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send 获得数据的leader
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        //only for debug test
//        if(batches.size()>=1){
//        	System.out.println(batches.size());
//        }
        
        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);//生成request
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);//nio 发送数据
    }

 

 

 

1
3
分享到:
评论

相关推荐

    kafka_client_producer_consumer

    kafka_client_producer_consumer

    java开发kafka-clients所需要的所有jar包以及源码

    Java开发Kafka客户端是构建基于Apache Kafka的应用程序的关键步骤,Kafka-clients库提供了与Kafka服务器进行交互的API,支持生产者和消费者的实现。在Java中使用Kafka-clients,你需要包含相应的jar包,这些包包含了...

    jemter-kafka连接器

    《JMeter与Kafka连接器:构建高并发数据流测试》 在现代大数据处理系统中,Apache Kafka作为一款分布式消息中间件,广泛应用于实时数据流处理。为了验证和优化Kafka系统的性能,开发者通常需要进行大规模并发数据...

    C#_Kafka_Demo.rar

    2. 创建生产者:调用`KafkaNet.Producer`构造函数,传入KafkaClient实例。 3. 发布消息:使用`ProduceAsync`方法,指定主题和消息内容。 例如: ```csharp using KafkaNet; var broker = new Uri("localhost:9092...

    kafka的java依赖包

    在Java开发环境中,Kafka作为一个分布式流处理平台,被广泛用于构建实时数据管道和流应用。这个"Kafka的Java依赖包"包含了所有你需要在Java项目中与Kafka进行本地交互所需的jar包。这些jar包提供了完整的API,使得...

    kafka连接工具客户端.rar

    如`kafka-console-producer.sh`用于生产消息,`kafka-console-consumer.sh`用于消费消息,以及`kafka-topics.sh`用于管理主题等。这些工具可以帮助我们理解Kafka的基本工作原理。 2. **Kafka可视化工具**:压缩包中...

    Python-kafka集群搭建PythonAPI调用Producer和Consumer

    在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...

    scala-kafka-client:用于运行Apache Kafka客户端库的Scala帮助器模块(0.9.x-2.1.0)

    1. **简化API**:Scala-Kafka-Consumer和Producer API设计得更加简洁,使开发者能够更容易理解和使用。例如,通过简单的函数调用即可创建消费者和生产者实例,进行数据的发送和接收。 2. **类型安全**:Scala的静态...

    kafka2.12 linux版本下载.zip

    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test ``` 在新打开的终端中,创建一个消费者: ```bash ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test ...

    kafka-client-go:卡夫卡客户

    `kafka-client-go` 是一个针对 Apache Kafka 的 Go 语言客户端库,它允许开发者在 Go 程序中轻松地与 Kafka 集群进行交互。Apache Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。`kafka-...

    c#kafka 发送与接收

    await producer.ProduceAsync("my-topic", new Message, string&gt; { Value = "Hello, Kafka!" }); ``` **接收端** 1. **配置消费者**: 类似于生产者,我们需要创建一个`ConsumerConfig`对象,但还需要额外的配置...

    kafka-clients源码.zip

    1. **消息发送**:`Producer.send()`方法是生产者发送消息的核心接口。源码中,`RecordAccumulator`负责缓存消息,`Sender`负责实际的网络发送。 2. **幂等性**:2.*版本引入了幂等性生产者,避免了重复消息的问题...

    Python测试Kafka集群(pykafka)实例

    在提供的代码段中,我们看到如何创建一个连接到Kafka集群的`KafkaClient`对象。这里的`hosts`参数是Kafka服务器的IP地址和端口号列表,例如`'IP:9092, IP:9092, IP:9092'`。一旦客户端连接成功,我们可以查看其管理...

    Kafka 2.9 版本 jdk1.7+

    6. 启动生产者,通过`bin/kafka-console-producer.sh`向主题发送消息。 7. 启动消费者,使用`bin/kafka-console-consumer.sh`从主题读取消息。 Kafka还支持多语言客户端,如Java、Python、C++等,使得不同语言的...

    Kafka简介及使用PHP处理Kafka消息

    Producer,consumer 实现 Kafka 注册的接口,数据从 producer 发送到 broker,broker 承担一个中间缓存和分发的作用。broker 分发注册到系统中的 consumer。broker 的作用类似于缓存,即活跃的数据和离线处理系统...

    PyPI 官网下载 | opentracing-python-kafka-client-0.9.tar.gz

    Kafka Python客户端是Python与Kafka通信的桥梁,提供了生产者(Producer)和消费者(Consumer)接口,让开发者能够方便地发送和接收消息。而这个OpenTracing的Kafka客户端扩展,允许我们在发送和接收消息的过程中...

    StormStorm集成Kafka 从Kafka中读取数据

    1. 添加依赖:在项目中引入Storm和Kafka相关的库,如storm-kafka或storm-kafka-client。 2. 配置KafkaSpout:设置KafkaSpout的配置,包括Zookeeper地址、Kafka的Group ID、要消费的主题等。 3. 创建Spout实例:基于...

    Kafka安装包

    7. **生产与消费消息**:Kafka提供`kafka-console-producer.sh`和`kafka-console-consumer.sh`脚本用于测试消息生产与消费。例如,你可以通过`kafka-console-producer.sh --broker-list localhost:9092 --topic ...

    PyPI 官网下载 | kafka_transport-0.6.1-py3-none-any.whl

    使用`kafka_transport`时,开发者需要创建一个`KafkaClient`实例,然后通过这个实例来执行各种操作。例如,以下代码展示了如何连接到Kafka服务器并发送一条消息: ```python from kafka_transport import Kafka...

    星环大数据平台_Kafka消息发布与订阅.pdf

    实验中首先介绍了Kafka的基本信息和实验目的,包括掌握Kafka发布订阅消息的基本方法,以及下载并安装TDHClient,并使用TDH-Client集成的Kafka客户端。工作目录被设定在/mnt/disk1/{student_name},其中{student_name...

Global site tag (gtag.js) - Google Analytics