`
fish_no7
  • 浏览: 28024 次
文章分类
社区版块
存档分类
最新评论

kafka-producer端-系统设计关注点的源码探究

阅读更多

    目前我对kafka producer的关注点大概有这三个:1.producer如何去支持分区(分布式);2. 如何保证消息的可靠发送(ACK);3. 如何保证可用性(异步之后,如何对内存进行管理(缓存消息在内存中的存储结构),以及OOM后发送线程的状态,以及IO线程的实现)。所以今天着重这三个点去了解源码的实现,其余的点如producer端对有序的保证,协议层,网络层的实现等后期有时间再写。

 

基本概念
先从一张图上看下什么是分区


 
分区是什么,结构?
每一个分区都是一组有顺序的,不可改变的,但是能被持续添加的消息序列。分区上的每条消息都有一个唯一的序列ID(offset)来唯一的标识在这个partition中定位这条消息。

为什么要有分区?
1. 分布式,允许存储超过单台更多的数据量
2. 并行处理,加快发送和消费的速度

注意:分区是broker去负责实现的概念,但是本身并不支持对提交过的消息做分区。也就是说broker支持分区的功能,但是怎么保证消息去哪个分区broker是不关心的。这就需要producer端在消息发送的时候需要指定发送到哪个分区。

Producer端
1. Kafka producer如何(分区)

/**
     * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
     * See {@link #send(ProducerRecord, Callback)} for details.
     */
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
…
            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
…
}
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
if (partition != null) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
            int lastPartition = partitions.size() - 1;
            // they have given us a partition, use it
            if (partition < 0 || partition > lastPartition) {
                throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
            }
            return partition;
        }
        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
            cluster);
    }

 
每次在dosend的时候都去为当前的日志选择partition,如果在发送的时候已指定好partition,并且partition在Custer中存储的partition列表中,则以指定的partition为主。否则,则使用自定的partition class的分区函数进行分区。
2. 每条日志在内存中存储的大小

int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

/**
* A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
* for the in-memory representation.
*/
public interface Records extends Iterable<LogEntry> {

    int SIZE_LENGTH = 4;
    int OFFSET_LENGTH = 8;
    int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;

    /**
     * The size of these records in bytes
     */
    int sizeInBytes();

}

 
每条日志都是4个byte的size和8个byte的offset,然后再加上具体日志的内容
3. Kafka producer端的内存队列
producer端产生的日志都会存储到一个叫做RecordAccumulator的存储器中,其中   
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
可以看到数据结构是一个线程安全的map,其中key为分区信息,其中
TopicPartition的结构如下:

public TopicPartition(String topic, int partition) {
        this.partition = partition;
        this.topic = topic;
}

 
可以看到,RecordAccumulator为每一个topic的的每一个分区都维护了一个双端队列。
看下消息是如何存放到队列中的:

         

   // check if we have an in-progress batch
            Deque<RecordBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        

 
获取队列,加锁,然后进行tryAppend,再看tryAppend的代码:

/**
     * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
     * resources (like compression streams buffers).
     */
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
        RecordBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
            if (future == null)
                last.records.close();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
        }
        return null;
}

 
正常情况下我们直接把新的日志直接放到队列中退出就好了,但是此处producer并不是直接这么简单的处理的,我们可以继续往下分析看下,producer在此处的设计巧妙之处。
先从队列取队尾的一个元素,数据结构为RecordBatch,看下构造方法

public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
        this.createdMs = now;
        this.lastAttemptMs = now;
        this.records = records;
        this.topicPartition = tp;
        this.produceFuture = new ProduceRequestResult();
        this.thunks = new ArrayList<Thunk>();
        this.lastAppendTime = createdMs;
        this.retry = false;
    }

 
其中records就是对日志消息的封装,也就是说deque中并不是直接存放的消息,而是对消息的封装,进入到下一行,如果队尾不为空,则FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
貌似是直接塞到队列peekLast获取到的这个对象中的,而不是新产生一个RecordBatch,看方法tryAppend,

/**
     * Append the record to the current record set and return the relative offset within that record set
     * 
     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     */
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
}

 
看方法说明,直接把新的消息追加到当前的这个消息中,果不其然。等会再看看这样设计的出发点,这样设计最起码可以减少对象的产生,但更重要的原因目测应该是和网络IO的考量有关。继续往下看
this.records.hasRoomFor(key, value)
判断当前的memoryRecords是否有足够的空间,如果没有则直接返回空;
如果有,则调用memoryRecords的append方法,
public long append(long offset, long timestamp, byte[] key, byte[] value) {
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");

        int size = Record.recordSize(key, value);
        compressor.putLong(offset);
        compressor.putInt(size);
        long crc = compressor.putRecord(timestamp, key, value);
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        return crc;
    }
Append方法中主要调用的是compressor的api来写入数据,根据命名来看应该是叫做压缩器,以前听说kafka的消息占用的字节数小,和数据压缩有很大的关系,现在我们就进这个compressor中看下如何实现的,先看构造函数

public Compressor(ByteBuffer buffer, CompressionType type) {
        this.type = type;
        this.initPos = buffer.position();

        this.numRecords = 0;
        this.writtenUncompressed = 0;
        this.compressionRate = 1;
        this.maxTimestamp = Record.NO_TIMESTAMP;

        if (type != CompressionType.NONE) {
            // for compressed records, leave space for the header and the shallow message metadata
            // and move the starting position to the value payload offset
            buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
        }

        // create the stream
        bufferStream = new ByteBufferOutputStream(buffer);
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}

 
关注bufferStream,自定义的字节缓冲区输出流,

public ByteBufferOutputStream(ByteBuffer buffer) {
        this.buffer = buffer;
    }

 
包装了一个自己缓冲区,

public void write(int b) {
        if (buffer.remaining() < 1)
            expandBuffer(buffer.capacity() + 1);
        buffer.put((byte) b);
    }

 

并且在write的时候能够自动对缓冲区进行扩容。
再关注appendStream,是一个DataOutputStream,其中

public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
        try {
            switch (type) {
                case NONE:
                    return new DataOutputStream(buffer);
                case GZIP:
                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                case SNAPPY:
                    try {
                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                        return new DataOutputStream(stream);
                    } catch (Exception e) {
                        throw new KafkaException(e);
                    }
                case LZ4:
                    try {
                        OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                        return new DataOutputStream(stream);
                    } catch (Exception e) {
                        throw new KafkaException(e);
                    }
                default:
                    throw new IllegalArgumentException("Unknown compression type: " + type);
            }
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }

 
提供了对gzip的支持。
现在我们在回到memoryRecord的append方法中来看下,其实这里也就是往compressor中的dataoutputstream中按照固定的格式写入日志的header信息,metadata信息,日志的内容以及日志摘要信息。
再来到append的方法上层,是recordBatch的tryAppend方法

long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;

 
在执行玩append方法后,构造了一个FutureRecordMetadata对象,FutureRecordMetadata实现了future接口,可以获取异步计算的结果。
再回到RecordAccumulator的append方法中,
如果              
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
如果在tryAppend失败的情况下

// we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    free.deallocate(buffer);
                    return appendResult;
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);

 
接下来进行的工作就是申请内存,创建日志的各种包装数据结构。
至此,日志已经存放在内存成功。
4. 日志发送
接下来我们再看看backup的iothread如何进行日志的发送。
Kafka主要的发送逻辑由一个叫做sender的线程去实现的,看代码

       

 Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);

 
    感觉代码注释的很清楚了。
首先获取Cluster中和当前producer相关的每个topic的每个partition的leader节点,如果集群中存在有leader节点未知的情况,则请求更新medadata信息,然后遍历这些leader节点,如果物理连接没有建立,则从当前leader列表中去除这些实际上还未准备好的leader,下次再发送;
然后对所有的消息根绝nodeId进行分组(之前在内存中是根据TopicPartition分组进行存储的,这次是根据物理节点的情况进行分组,做最后发送前的准备)。如果配置了guaranteeMessageOrder属性,则需要保证发送顺序。然后取消一些已经超时了的消息。然后开始生成ClientRequest的列表,调用kafkaClient进行发送。然后循环此逻辑,进行下一次发送。
5. 对ACK的处理
 

 /**
     * Transfer the record batches into a list of produce requests on a per-node basis
     */
    private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
        List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
        return requests;
}

 
我们分析sender在doSend的时候通过调用createProduceRequests方法的细节。进入到produceRequest看具体是如何生成ClientRequest对象的。首先分析ClientRequest的数据结构

/**
     * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
     * @param expectResponse Should we expect a response message or is this request complete once it is sent?
     * @param request The request
     * @param callback A callback to execute when the response has been received (or null if no callback is necessary)
     * @param isInitiatedByNetworkClient Is request initiated by network client, if yes, its
     *                                   response will be consumed by network client
     */
    public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request,
                         RequestCompletionHandler callback, boolean isInitiatedByNetworkClient) {
        this.createdTimeMs = createdTimeMs;
        this.callback = callback;
        this.request = request;
        this.expectResponse = expectResponse;
        this.isInitiatedByNetworkClient = isInitiatedByNetworkClient;
    }

 
其中需要传入具体的RequestSend对象,和RequestCompletionHandler对象,RequestCompletionHandler是一个接口,其中只有一个方法

    public void onComplete(ClientResponse response);

 此接口应该充当的是回调函数,其中在Request发送成功后onComplete方法被调用。
再回到createProduceRequests方法中,我们看下produceRequest的方法

/**
     * Create a produce request from the given record batches
     */
    private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
        Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
        for (RecordBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            produceRecordsByPartition.put(tp, batch.records.buffer());
            recordsByPartition.put(tp, batch);
        }
        ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
        RequestSend send = new RequestSend(Integer.toString(destination),
                                           this.client.nextRequestHeader(ApiKeys.PRODUCE),
                                           request.toStruct());
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        return new ClientRequest(now, acks != 0, send, callback);
}

 
其中重点关注callback的实现逻辑handleProduceResponse,

/**
     * Handle a produce response
     */
    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
        int correlationId = response.request().request().header().correlationId();
        if (response.wasDisconnected()) {
            log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
                                                                                                  .request()
                                                                                                  .destination());
            for (RecordBatch batch : batches.values())
                completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
        } else {
            log.trace("Received produce response from node {} with correlation id {}",
                      response.request().request().destination(),
                      correlationId);
            // if we have a response, parse it
            if (response.hasResponse()) {
                ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
                for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                    TopicPartition tp = entry.getKey();
                    ProduceResponse.PartitionResponse partResp = entry.getValue();
                    Errors error = Errors.forCode(partResp.errorCode);
                    RecordBatch batch = batches.get(tp);
                    completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
                }
                this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
                this.sensors.recordThrottleTime(response.request().request().destination(),
                                                produceResponse.getThrottleTime());
            } else {
                // this is the acks = 0 case, just complete all requests
                for (RecordBatch batch : batches.values())
                    completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now);
            }
        }
    }

 
其中主要是根据response的情况进行一些简单的逻辑处理,然后主要的完成逻辑都是在completeBatch方法中。其中如果acks=0的话,直接
for (RecordBatch batch : batches.values())
                    completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now);

如果ack不为0 的情况下,则                    completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
进入到completeBatch方法中,

/**
     * Complete or retry the given batch of records.
     * 
     * @param batch The record batch
     * @param error The error (or null if none)
     * @param baseOffset The base offset assigned to the records if successful
     * @param timestamp The timestamp returned by the broker for this batch
     * @param correlationId The correlation id for the request
     * @param now The current POSIX time stamp in milliseconds
     */
    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) {
        if (error != Errors.NONE && canRetry(batch, error)) {
            // retry
            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                     correlationId,
                     batch.topicPartition,
                     this.retries - batch.attempts - 1,
                     error);
            this.accumulator.reenqueue(batch, now);
            this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
        } else {
            RuntimeException exception;
            if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                exception = new TopicAuthorizationException(batch.topicPartition.topic());
            else
                exception = error.exception();
            // tell the user the result of their request
            batch.done(baseOffset, timestamp, exception);
            this.accumulator.deallocate(batch);
            if (error != Errors.NONE)
                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
        }
        if (error.exception() instanceof InvalidMetadataException)
            metadata.requestUpdate();
        // Unmute the completed partition.
        if (guaranteeMessageOrder)
            this.accumulator.unmutePartition(batch.topicPartition);
    }

 
其中,如果response中有error信息,配且配置了重试,则直接进行reenqueue操作,            this.accumulator.reenqueue(batch, now);
否则,就进行异常的包装,当前request的后续收尾工作,以及消息占用的内存的释放

 

  • 大小: 18 KB
分享到:
评论

相关推荐

    java开发kafka-clients所需要的所有jar包以及源码

    Java开发Kafka客户端是构建基于Apache Kafka的应用程序的关键步骤,Kafka-clients库提供了与Kafka服务器进行交互的API,支持生产者和消费者的实现。在Java中使用Kafka-clients,你需要包含相应的jar包,这些包包含了...

    kafka 2.10-0.9.0.1 源码

    总结,Kafka 2.10-0.9.0.1 的源码分析涵盖了 Mirror Maker 工具的实现,多机房消息异步处理的架构设计,以及 Kafka 内部组件的工作原理。深入研究这些源码将帮助我们更好地理解和优化 Kafka 系统,为大数据实时处理...

    免费kafka和zookeeper安装包.zip

    **Kafka与Zookeeper简介** Kafka是一款高吞吐量的分布式消息系统,由...理解这些知识点对于有效使用Kafka构建实时数据处理系统至关重要。在实际应用中,还需要关注性能调优、安全性配置以及与其他系统的集成等问题。

    kafka_上归古_源码笔记资料.zip

    Kafka的源码结构清晰,设计精巧,是学习分布式系统设计的绝佳实例。笔记中涵盖了Producer、Consumer、Broker、Partition等核心概念,以及他们之间的交互过程。Producer是数据的生产者,负责将消息发送到Kafka集群;...

    Kafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是郑奇煌撰写的一本深入解析Apache Kafka的书籍,特别关注的是0.10版本。这本书提供了丰富的知识,涵盖了Kafka的核心概念、架构设计以及源码分析。下面将详细讨论书...

    kafka使用手册

    主要关注点包括: 1. **消息格式**: 确定消息的格式和编码方式。 2. **异步发送**: 利用异步发送功能提高性能。 3. **错误处理**: 处理发送过程中可能出现的错误情况。 #### 九、Kafka Consumer 的使用 **...

    kafka linux C++ 动态库

    2. **获取库文件**:从源码编译Kafka的C++客户端库,或者直接下载预编译的库文件。在这个例子中,已经提供了编译后的动态库,可以直接使用。 3. **配置编译**:在C++项目中,需要指定链接到Kafka动态库的路径。可以...

    很全面的kafka技术文档

    Producer 端的主要任务是生成并发送消息到 Kafka。自定义消息通常包含以下几个部分: - **消息键(Key)**:用于分区的依据。 - **消息值(Value)**:消息的实际内容。 - **消息头(Headers)**:包含元数据。 ##...

    Storm流计算项目:1号店电商实时数据分析系统-07.Kafka Java API 简单开发测试.pptx

    1. **Kafka安装与配置**:包括下载Kafka源码,编译构建,以及配置Kafka服务器,理解broker、topic、partition等核心概念。 2. **创建Producer**:使用Java API创建Kafka生产者,设置配置参数如bootstrap servers,...

    kafka分布式发布订阅消息系统 v3.3.1.tgz

    《Kafka分布式发布订阅消息系统 v3.3.1 深度解析》 Apache Kafka是一款高吞吐量、分布式、基于发布/订阅的消息系统,它最初由LinkedIn开发,并最终捐赠给了Apache软件基金会。Kafka v3.3.1作为其最新的版本,带来了...

    Analyticshut-Kafka:analyticshut文章中使用的Kafka代码

    在这个名为"Analyticshut-Kafka"的项目中,我们很显然关注的是如何在Java环境中使用Kafka进行数据分析。以下是对这个项目中可能涉及的Kafka与Java相关知识点的详细解释。 1. **Kafka基本概念**: - **主题(Topic...

    典型的多线程--生产和消费

    在IT领域,多线程是并发编程中的一个关键概念,特别是在服务器端应用和高并发系统设计中。"典型的多线程--生产和消费"这个主题,通常指的是生产者-消费者问题,这是一种经典的设计模式,用于解决如何在一个数据结构...

    用JMS手工实现私聊和公聊

    标题“用JMS手工实现私聊和公聊”揭示了这个话题主要关注的是如何使用Java消息服务(Java Message Service,简称JMS)来构建一个聊天系统,包括私聊和公聊的功能。JMS是Java平台中用于在分布式环境中传递消息的标准...

Global site tag (gtag.js) - Google Analytics