`
gaozzsoft
  • 浏览: 432429 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类

Kafka数据结构与分段加锁-Kafka源码精华解读

 
阅读更多

 

1.分段加锁

 

生产者客户端每发送一条消息,都会调用org.apache.kafka.clients.producer.internals.RecordAccumulator#append,因此它是高并发方法,需要保证线程安全。在高并发海量吞吐的场景下,如何才能保证消息有序、高吞吐地发送是值得思考的问题。

 

public  RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
appendsInProgress.incrementAndGet();
    try {
        // check if we have an in-progress batch
        /**
         * 步骤一:先根据分区找到应该插入到哪个队列里面。
         * 如果有已经存在的队列,那么我们就使用存在队列
         * 如果队列不存在,那么我们新创建一个队列
         *
         * 我们肯定是有了存储批次的队列,但是大家一定要知道一个事
         * 我们代码第一次执行到这儿,获取其实就是一个空的队列。
         *
         * 现在代码第二次执行进来。
         * 假设 分区还是之前的那个分区。
         *
         * 这个方法里面我们之前分析,里面就是针对batchs进行的操作
         * 里面kafka自己封装了一个数据结构:CopyOnWriteMap (这个数据结构本来就是线程安全的)
         *
         *
         * 根据当前消息的信息,获取一个队列
         *
         *
         * 线程一,线程二,线程三
         */
Deque<RecordBatch> dq = getOrCreateDeque(tp);
        /**
         * 假设我们现在有线程一,线程二,线程三
         *
         */
synchronized (dq) {
            //线程一进来了
            //线程二进来了
if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            /**
             * 步骤二:
             *      尝试往队列里面的批次里添加数据
             *
             *      一开始添加数据肯定是失败的,我们目前只是以后了队列
             *      数据是需要存储在批次对象里面(这个批次对象是需要分配内存的)
             *      我们目前还没有分配内存,所以如果按场景驱动的方式,
             *      代码第一次运行到这儿其实是不成功的。
             */
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            //线程一 进来的时候,
            //第一次进来的时候appendResult的值就为null
if (appendResult != null)
                return appendResult;
        }//释放锁
        // we don't have an in-progress record batch try to allocate a new batch
        /**
         * 步骤三:计算一个批次的大小
         * 在消息的大小和批次的大小之间取一个最大值,用这个值作为当前这个批次的大小。
         * 有可能我们的一个消息的大小比一个设定好的批次的大小还要大。
         * 默认一个批次的大小是16K。
         * 所以我们看到这段代码以后,应该给我们一个启示。
         * 如果我们生产者发送数的时候,如果我们的消息的大小都是超过16K,
         * 说明其实就是一条消息就是一个批次,那也就是说消息是一条一条被发送出去的。
         * 那如果是这样的话,批次这个概念的设计就没有意义了
         * 所以大家一定要根据自定公司的数据大小的情况去设置批次的大小。
         *
         *
         *
         */
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        /**
         * 步骤四:
         *  根据批次的大小去分配内存
         *
         *
         *  线程一,线程二,线程三,执行到这儿都会申请内存
         *  假设每个线程 都申请了 16k的内存。
         *
         *  线程1 16k
         *  线程2 16k
         *  线程3 16k
         *
         */
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);

        synchronized (dq) {
            //假设线程一 进来了。
            //线程二进来了
            // Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            /**
             * 步骤五:
             *      尝试把数据写入到批次里面。
             *      代码第一次执行到这儿的时候 依然还是失败的(appendResult==null)
             *      目前虽然已经分配了内存
             *      但是还没有创建批次,那我们向往批次里面写数据
             *      还是不能写的。
             *
             *   线程二进来执行这段代码的时候,是成功的。
             */
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            //失败的意思就是appendResult 还是会等于null
if (appendResult != null) {
                //释放内存
                //线程二到这儿,其实他自己已经把数据写到批次了。所以
                //他的内存就没有什么用了,就把内存个释放了(还给内存池了。)
free.deallocate(buffer);
                return appendResult;
            }
            /**
             * 步骤六:
             *  根据内存大小封装批次
             *
             *
             *  线程一到这儿 会根据内存封装出来一个批次。
             */
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
            //尝试往这个批次里面写数据,到这个时候 我们的代码会执行成功。
            //线程一,就往批次里面写数据,这个时候就写成功了。
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
            /**
             * 步骤七:
             *  把这个批次放入到这个队列的队尾
             *
             *
             *  线程一把批次添加到队尾
             */
dq.addLast(batch);
            incomplete.add(batch);
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
        }//释放锁
} finally {
        appendsInProgress.decrementAndGet();
    }
}

使用到java.util.concurrent.atomic.AtomicInteger

private final AtomicInteger appendsInProgress;
appendsInProgress.incrementAndGet();
appendsInProgress.decrementAndGet();

 代码看起来可能有点奇怪,写了一堆synchronize,为啥不直接在完整的synchronize块中完成?这恰恰正是设计者的高明之处,其意义在于尽可能将锁的粒度更加精细化进一步提高并发,从上一节 2.5.1 得知,向BufferPool申请内存时可能会导致阻塞,假设一种场景:线程1发送的消息比较大,需要向BufferPool申请新的内存块,而此时因为BufferPool空间不足,随后进入阻塞,但此时它仍然持有Deque的锁;线程2发送的消息很小,Deque最后一个ProducerBatch的剩余空间足够,但由于线程1持有了Deque的锁导致阻塞,若类似线程2情况的线程较多时,势必会造成大量不必要的线程阻塞,降低吞吐量和并发。

 

2.CopyOnWriteMap  读多写少

 

org.apache.kafka.clients.producer.internals.RecordAccumulator类里边构造方法初始化

 

//CopyOnWriteMap的这样的一个数据类型。

        //这个数据结构在jdk里面是没有的,是kafka自己设计的。

        //这个数据结构设计得很好,因为有了这个数据结构

        //整体的提升了封装批次的这个流程的性能!! 

        //JDK  juc包下面:CopyOnWriteArrayList

 

//把这个空的队列存入batches 这个数据结构里面

 

 

//TopicPartition 分区 -》  Deque<RecordBatch> 队列
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

 

//CopyOnWriteMap的这样的一个数据类型。
//这个数据结构在jdk里面是没有的,是kafka自己设计的。
//这个数据结构设计得很好,因为有了这个数据结构
//整体的提升了封装批次的这个流程的性能!!
//JDK  juc包下面:CopyOnWriteArrayList
this.batches = new CopyOnWriteMap<>();

 

public RecordAccumulator(int batchSize,
                         long totalSize,
                         CompressionType compression,
                         long lingerMs,
                         long retryBackoffMs,
                         Metrics metrics,
                         Time time) {
    this.drainIndex = 0;
    this.closed = false;
    this.flushesInProgress = new AtomicInteger(0);
    this.appendsInProgress = new AtomicInteger(0);
    this.batchSize = batchSize;
    this.compression = compression;
    this.lingerMs = lingerMs;
    this.retryBackoffMs = retryBackoffMs;
    //CopyOnWriteMap的这样的一个数据类型。
    //这个数据结构在jdk里面是没有的,是kafka自己设计的。
    //这个数据结构设计得很好,因为有了这个数据结构
    //整体的提升了封装批次的这个流程的性能!!
    //JDK  juc包下面:CopyOnWriteArrayList
this.batches = new CopyOnWriteMap<>();
    String metricGrpName = "producer-metrics";
    this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
    this.incomplete = new IncompleteRecordBatches();
    this.muted = new HashSet<>();
    this.time = time;
    registerMetrics(metrics, metricGrpName);
}

 

/**
 * Get the deque for the given topic-partition, creating it if necessary.
 */
private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {

    /**
     * CopyonWriteMap:
     *      get
     *      put
     *
     */
    //直接从batches里面获取当前分区对应的存储队列
Deque<RecordBatch> d = this.batches.get(tp);
    //我们现在用到是场景驱动的方式,代码第一次执行到这儿的死活
    //是获取不到队列的,也就是说d 这个变量的值为null
if (d != null)
        return d;
    //代码继续执行,创建出来一个新的空队列,
d = new ArrayDeque<>();
    //把这个空的队列存入batches 这个数据结构里面
Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
    if (previous == null)
        return d;
    else
//直接返回新的结果
return previous;
}

 

这段代码作用是取出分区对应的 ProducerBatch 队列

其中this.batches采用了CopyOnWriteMap数据结构来存放

 

客户端每发送一条消息都会调用一次append方法,假设Topic有3个分区,总共发送1000万条消息就需要调用1000万次getOrCreateDeque(tp),其中get调用了1000万次,putIfAbsent仅调用了3次,可见这是一个高并发读多写少的场景。针对此场景,KAFKA精心设计了CopyOnWriteMap,CopyOnWriteMap允许线程并发访问,读操作没有加锁限制,性能较高,而写操作需要先在堆内存创建新对象,再将原对象的内容拷贝至新对象,写操作需要上锁。这种数据结构的优点和缺点都非常明显,

优点是:

1.采用读写分离的思想,读操作性能很高,几乎无需额外开销,十分适用于读多写少的场景;

2.map采用volatile关键字修饰,保证了写操作对map的修改对其它线程可见;

 

缺点是:

每次写操作都要内存复制一份,数据量大时对内存开销较大,容易导致频繁GC;

无法保证数据的强一致性,毕竟读写是作用于新老对象;

 

@Override
public synchronized V putIfAbsent(K k, V v) {
    //如果我们传进来的这个key不存在
if (!containsKey(k))
        //那么就调用里面内部的put方法
return put(k, v);
    else
//返回结果
return get(k);
}

 

@Override
public V get(Object k) {
    return map.get(k);
}

 

3.ConcurrentSkipListMap 跳表数据结构

 

Log.scala

  //java juc下面的一个数据结构

  //首先这个是跳表实现的一个并发安全的Map集合。

  //文件名作为key(base offset)

  //value就是一个segment

  //目的就是为了能我们根据offset的大小快速的定位到segment

 

/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] =
//java juc下面的一个数据结构
//首先这个是跳表实现的一个并发安全的Map集合。
//文件名作为key(base offset)
//value就是一个segment
//目的就是为了能我们根据offset的大小快速的定位到segment
new ConcurrentSkipListMap[java.lang.Long, LogSegment]

 

/* Load the log segments from the log files on disk */
private def loadSegments() {
  // create the log directory if it doesn't exist
dir.mkdirs()
  var swapFiles = Set[File]()

  // first do a pass through the files in the log directory and remove any temporary files
  // and find any interrupted swap operations
for(file <- dir.listFiles if file.isFile) {
    if(!file.canRead)
      throw new IOException("Could not read file " + file)
    val filename = file.getName
    if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
      // if the file ends in .deleted or .cleaned, delete it
file.delete()
    } else if(filename.endsWith(SwapFileSuffix)) {
      // we crashed in the middle of a swap operation, to recover:
      // if a log, delete the .index file, complete the swap operation later
      // if an index just delete it, it will be rebuilt
val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
      if(baseName.getPath.endsWith(IndexFileSuffix)) {
        file.delete()
      } else if(baseName.getPath.endsWith(LogFileSuffix)){
        // delete the index
val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
        index.delete()
        swapFiles += file
      }
    }
  }

  // now do a second pass and load all the .log and all index files
for(file <- dir.listFiles if file.isFile) {
    val filename = file.getName
    if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
      // if it is an index file, make sure it has a corresponding .log file
val logFile =
        if (filename.endsWith(TimeIndexFileSuffix))
          new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
        else
          new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))

      if(!logFile.exists) {
        warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
        file.delete()
      }
    } else if(filename.endsWith(LogFileSuffix)) {
      // if its a log file, load the corresponding log segment
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
      val indexFile = Log.indexFilename(dir, start)
      val timeIndexFile = Log.timeIndexFilename(dir, start)

      val indexFileExists = indexFile.exists()
      val segment = new LogSegment(dir = dir,
                                   startOffset = start,
                                   indexIntervalBytes = config.indexInterval,
                                   maxIndexSize = config.maxIndexSize,
                                   rollJitterMs = config.randomSegmentJitter,
                                   time = time,
                                   fileAlreadyExists = true)

      if (indexFileExists) {
        try {
          segment.index.sanityCheck()
          segment.timeIndex.sanityCheck()
        } catch {
          case e: java.lang.IllegalArgumentException =>
            warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
              s"${indexFile.getAbsolutePath} and rebuilding index...")
            indexFile.delete()
            timeIndexFile.delete()
            segment.recover(config.maxMessageSize)
        }
      } else {
        error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
        segment.recover(config.maxMessageSize)
      }

      segments.put(start, segment)
    }
  }

 

 

if(logSegments.isEmpty) {
  // no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
                                 startOffset = 0,
                                 indexIntervalBytes = config.indexInterval,
                                 maxIndexSize = config.maxIndexSize,
                                 rollJitterMs = config.randomSegmentJitter,
                                 time = time,
                                 fileAlreadyExists = false,
                                 initFileSize = this.initFileSize(),
                                 preallocate = config.preallocate))
} else {
  recoverLog()
  // reset the index size of the currently active log segment to allow more entries
activeSegment.index.resize(config.maxIndexSize)
  activeSegment.timeIndex.resize(config.maxIndexSize)
}


 

java.util.concurrent.ConcurrentSkipListMap 代码:

 

public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}

 

private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            if (n != null) {
                Object v; int c;
                Node<K,V> f = n.next;
                if (n != b.next)               // inconsistent read
break;
                if ((v = n.value) == null) {   // n is deleted
n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n) // b is deleted
break;
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) {
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; // restart if lost race to replace value
}
                // else c < 0; fall through
}

            z = new Node<K,V>(key, value, n);
            if (!b.casNext(n, z))
                break;         // restart if lost race to append to b
break outer;
        }
    }

    int rnd = ThreadLocalRandom.nextSecondarySeed();
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        Index<K,V> idx = null;
        HeadIndex<K,V> h = head;
        if (level <= (max = h.level)) {
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        else { // try to grow by one level
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                h = head;
                int oldLevel = h.level;
                if (level <= oldLevel) // lost race to add level
break;
                HeadIndex<K,V> newh = h;
                Node<K,V> oldbase = h.node;
                for (int j = oldLevel+1; j <= level; ++j)
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // find insertion points and splice in
splice: for (int insertionLevel = level;;) {
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                if (q == null || t == null)
                    break splice;
                if (r != null) {
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                if (j == insertionLevel) {
                    if (!q.link(r, t))
                        break; // restart
if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

 

==================================================================

ConcurrentSkipListMap是在JDK 1.6中新增的,为了对高并发场景下的有序Map提供更好的支持,它有几个特点:

 

高并发场景

key是有序的

添加、删除、查找操作都是基于跳表结构(Skip List)实现的

key和value都不能为null

 

跳表(Skip List)是一种类似于链表的数据结构,其查询、插入、删除的时间复杂度都是 O(logn)。

在传统的单链表结构中,查找某个元素需要从链表的头部按顺序遍历,直到找到目标元素为止,查找的时间复杂度为O(n)。

 

而跳表结合了树和链表的特点,其特性如下:

 

跳表由很多层组成;

每一层都是一个有序的链表;

最底层的链表包含所有元素;

对于每一层的任意一个节点,不仅有指向其下一个节点的指针,也有指向其下一层的指针;

如果一个元素出现在Level n层的链表中,则它在Level n层以下的链表也都会出现。

Skip List例子:

 

下图是一种可能的跳表结构:

如图,[1]和[40]节点有3层,[8]和[18]节点有2层。每一层都是有序的链表。

 

如果要查找目标节点[15],大致过程如下:

 

首先查看[1]节点的第1层,发现[1]节点的下一个节点为[40],大于15,那么查找[1]节点的下一层;

查找[1]节点的第2层,发现[1]节点的下一个节点为[8],小于15,接着查看下一个节点,发现下一个节点是[18],大于15,因此查找[8]节点的下一层;

查找[8]节点的第2层,发现[8]节点的下一个节点是[10],小于15,接着查看下一个节点[13],小于15,接着查看下一个节点[15],发现其值等于15,因此找到了目标节点,结束查询。

 

跳表实际上是一种 空间换时间 的数据结构。

ConcurrentSkipListMap用到了两种结构的节点。

 

Node节点代表了真正存储数据的节点,包含了key、value、指向下一个节点的指针next:

 

    static final class Node<K,V> {

        final K key;     // 键

        V val;           // 值

        Node<K,V> next;  // 指向下一个节点的指针

        Node(K key, V value, Node<K,V> next) {

            this.key = key;

            this.val = value;

            this.next = next;

        }

    }

 

Index节点代表了跳表的层级,包含了当前节点node、下一层down、当前层的下一个节点right:

 

    static final class Index<K,V> {

        final Node<K,V> node;   // 当前节点

        final Index<K,V> down;  // 下一层

        Index<K,V> right;       // 当前层的下一个节点

        Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {

            this.node = node;

            this.down = down;

            this.right = right;

        }

    }

 

 

下面是对ConcurrentSkipListMap的简单使用的一个例子:

key有顺序的跳表数据结构 

 

import org.apache.kafka.clients.producer.internals.RecordBatch;
import org.apache.kafka.common.TopicPartition;
//import org.apache.kafka.common.utils.CopyOnWriteMap;
import com.ehl.utils.skiplist.CopyOnWriteMap;

import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

 

//跳表Map代码实现
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<Integer, String>();
//        ConcurrentNavigableMap<Integer, String> map2 = new ConcurrentSkipListMap<Integer, String>();
map.put(4, "4");
        map.put(5, "5");
        map.put(1, "1");
        map.put(6, "6");
        map.put(2, "2");

        System.out.println(map.keySet());  // [1, 2, 4, 5, 6]
System.out.println(map); //{1=1, 2=2, 4=4, 5=5, 6=6}
System.out.println(map.descendingKeySet()); // [6, 5, 4, 2, 1]
System.out.println(map.descendingMap()); //{6=6, 5=5, 4=4, 2=2, 1=1}
//        [1, 2, 4, 5, 6]
//        {1=1, 2=2, 4=4, 5=5, 6=6}
//        [6, 5, 4, 2, 1]
//        {6=6, 5=5, 4=4, 2=2, 1=1}

 

 

CopyOnWriteMap

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
 * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
 * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
 * License. You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package org.apache.kafka.common.utils;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
 * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification
 *
 * 1) 这个数据结构是在高并发的情况下是线程安全的。
 * 2)  采用的读写分离的思想设计的数据结构
 *      每次插入(写数据)数据的时候都开辟新的内存空间
 *      所以会有个小缺点,就是插入数据的时候,会比较耗费内存空间。
 * 3)这样的一个数据结构,适合写少读多的场景。
 *      读数据的时候性能很高。
 *
 * batchs这个对象存储数据的时候,就是使用的这个数据结构。
 * 对于batches来说,它面对的场景就是读多写少的场景。
 *
 *batches:
 *   读数据:
 *      每生产一条消息,都会从batches里面读取数据。
 *      假如每秒中生产10万条消息,是不是就意味着每秒要读取10万次。
 *      所以绝对是一个高并发的场景。
 *   写数据:
 *     假设有100个分区,那么就是会插入100次数据。
 *     并且队列只需要插入一次就可以了。
 *     所以这是一个低频的操作。
 *
 *     高性能:
 *
 *      读写分离读设计方案:适合的场景就是读多写少。
 *          读多:
 *
 *          写少:
 *
 *
 *
 */
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
    /**
     * 核心的变量就是一个map
     * 这个map有个特点,它的修饰符是volatile关键字。
     * 在多线程的情况下,如果这个map的值发生变化,其他线程也是可见的。
     *
     * get
     * put
     */
private volatile Map<K, V> map;

    public CopyOnWriteMap() {
        this.map = Collections.emptyMap();
    }

    public CopyOnWriteMap(Map<K, V> map) {
        this.map = Collections.unmodifiableMap(map);
    }

    @Override
public boolean containsKey(Object k) {
        return map.containsKey(k);
    }

    @Override
public boolean containsValue(Object v) {
        return map.containsValue(v);
    }

    @Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
        return map.entrySet();
    }

    /**
     * 没有加锁,读取数据的时候性能很高(高并发的场景下,肯定性能很高)
     * 并且是线程安全的。
     * 因为人家采用的读写分离的思想。
     * @param k
* @return
*/
@Override
public V get(Object k) {
        return map.get(k);
    }

    @Override
public boolean isEmpty() {
        return map.isEmpty();
    }

    @Override
public Set<K> keySet() {
        return map.keySet();
    }

    @Override
public int size() {
        return map.size();
    }

    @Override
public Collection<V> values() {
        return map.values();
    }

    @Override
public synchronized void clear() {
        this.map = Collections.emptyMap();
    }

    /**
     * 1):
     *      整个方法使用的是synchronized关键字去修饰的,说明这个方法是线程安全。
     *      即使加了锁,这段代码的性能依然很好,因为里面都是纯内存的操作。
     * 2)
     *        这种设计方式,采用的是读写分离的设计思想。
     *        读操作和写操作 是相互不影响的。
     *        所以我们读数据的操作就是线程安全的。
     *3)
     *      最后把值赋给了map,map是用volatile关键字修饰的。
     *      说明这个map是具有可见性的,这样的话,如果get数据的时候,这儿的值发生了变化,也是能感知到的。
     * @param k
* @param v
* @return
*/
@Override
public synchronized V put(K k, V v) {
        //新的内存空间
        //读写分离
        //往新的内存空间里面插入
        //读,读数据就老读空间里面去
Map<K, V> copy = new HashMap<K, V>(this.map);
        //插入数据
V prev = copy.put(k, v);
        //赋值给map
this.map = Collections.unmodifiableMap(copy);
        return prev;
    }

    @Override
public synchronized void putAll(Map<? extends K, ? extends V> entries) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        copy.putAll(entries);
        this.map = Collections.unmodifiableMap(copy);
    }

    @Override
public synchronized V remove(Object key) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        V prev = copy.remove(key);
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }

    @Override
public synchronized V putIfAbsent(K k, V v) {
        //如果我们传进来的这个key不存在
if (!containsKey(k))
            //那么就调用里面内部的put方法
return put(k, v);
        else
//返回结果
return get(k);
    }

    @Override
public synchronized boolean remove(Object k, Object v) {
        if (containsKey(k) && get(k).equals(v)) {
            remove(k);
            return true;
        } else {
            return false;
        }
    }

    @Override
public synchronized boolean replace(K k, V original, V replacement) {
        if (containsKey(k) && get(k).equals(original)) {
            put(k, replacement);
            return true;
        } else {
            return false;
        }
    }

    @Override
public synchronized V replace(K k, V v) {
        if (containsKey(k)) {
            return put(k, v);
        } else {
            return null;
        }
    }

}

 

 COW MAP代码实践

import org.apache.kafka.clients.producer.internals.RecordBatch;
import org.apache.kafka.common.TopicPartition;
//import org.apache.kafka.common.utils.CopyOnWriteMap;
import com.ehl.utils.skiplist.CopyOnWriteMap;

import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

 

//COW写时复制MAP代码实现
        //TopicPartition 分区 -》  Deque<RecordBatch> 队列
//        private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
//        dq.addLast(batch);
//        deque.addFirst(batch);
//        deque.peekLast();
//        dq.remove(batch);
ConcurrentMap<Integer, Deque<String>> cowMap = new CopyOnWriteMap<Integer, Deque<String>>();
        // Integer 代表4个分区 1-4    Deque<String>  代表4个队列
Deque<String> d1 =  new ArrayDeque<String>();
        Deque<String> d2 =  new ArrayDeque<String>();
        Deque<String> d3 =  new ArrayDeque<String>();
        Deque<String> d4 =  new ArrayDeque<String>();

        d1.add("aaa");
        d2.add("bbb");
        d3.add("ccc");
        d4.add("ddd");

        d1 = cowMap.putIfAbsent(1, d1); //key-四个分区  写少 写入新Map 加锁synchronized 同时赋值给旧MAP
cowMap.putIfAbsent(2, d2); //key-四个分区 写少 写入新Map 加锁synchronized 同时赋值给旧MAP  写时复制为了读写分离
cowMap.putIfAbsent(3, d3); //key-四个分区 写少 写入新Map 加锁synchronized 同时赋值给旧MAP  写时复制为了读写分离
cowMap.putIfAbsent(4, d4); //key-四个分区 写少 写入新Map 加锁synchronized 同时赋值给旧MAP  写时复制为了读写分离
d1 = cowMap.putIfAbsent(1, d1);//第二次再写入 如key存在 则直接返回value队列 然后写入addLast  写时复制
d1.addLast("aaa2");
        System.out.println(cowMap.get(1));//从旧MAP中获取数据 此MAP是volatile的  读多
System.out.println(cowMap.get(2));//从旧MAP中获取数据 此MAP是volatile的  读多
System.out.println(cowMap.get(4));//从旧MAP中获取数据 此MAP是volatile的  读多
//        打印
//        [aaa, aaa2]
//        [bbb]
//        [ddd]

 

 

 

 

分享到:
评论

相关推荐

    Matlab环境下决策分类树的构建、优化与应用

    内容概要:本文详细介绍了如何利用Matlab构建、优化和应用决策分类树。首先,讲解了数据准备阶段,将数据与程序分离,确保灵活性。接着,通过具体实例展示了如何使用Matlab内置函数如fitctree快速构建决策树模型,并通过可视化工具直观呈现决策树结构。针对可能出现的过拟合问题,提出了基于成本复杂度的剪枝方法,以提高模型的泛化能力。此外,还分享了一些实用技巧,如处理连续特征、保存模型、并行计算等,帮助用户更好地理解和应用决策树。 适合人群:具有一定编程基础的数据分析师、机器学习爱好者及科研工作者。 使用场景及目标:适用于需要进行数据分类任务的场景,特别是当需要解释性强的模型时。主要目标是教会读者如何在Matlab环境中高效地构建和优化决策分类树,从而应用于实际项目中。 其他说明:文中不仅提供了完整的代码示例,还强调了代码模块化的重要性,便于后续维护和扩展。同时,对于初学者来说,建议从简单的鸢尾花数据集开始练习,逐步掌握决策树的各项技能。

    《营销调研》第7章-探索性调研数据采集.pptx

    《营销调研》第7章-探索性调研数据采集.pptx

    Assignment1_search_final(1).ipynb

    Assignment1_search_final(1).ipynb

    美团外卖优惠券小程序 美团优惠券微信小程序 自带流量主模式 带教程.zip

    美团优惠券小程序带举牌小人带菜谱+流量主模式,挺多外卖小程序的,但是都没有搭建教程 搭建: 1、下载源码,去微信公众平台注册自己的账号 2、解压到桌面 3、打开微信开发者工具添加小程序-把解压的源码添加进去-appid改成自己小程序的 4、在pages/index/index.js文件搜流量主广告改成自己的广告ID 5、到微信公众平台登陆自己的小程序-开发管理-开发设置-服务器域名修改成

    《计算机录入技术》第十八章-常用外文输入法.pptx

    《计算机录入技术》第十八章-常用外文输入法.pptx

    基于Andorid的跨屏拖动应用设计.zip

    基于Andorid的跨屏拖动应用设计实现源码,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业。

    《网站建设与维护》项目4-在线购物商城用户管理功能.pptx

    《网站建设与维护》项目4-在线购物商城用户管理功能.pptx

    区块链_房屋转租系统_去中心化存储_数据防篡改_智能合约_S_1744435730.zip

    区块链_房屋转租系统_去中心化存储_数据防篡改_智能合约_S_1744435730

    《计算机应用基础实训指导》实训五-Word-2010的文字编辑操作.pptx

    《计算机应用基础实训指导》实训五-Word-2010的文字编辑操作.pptx

    《移动通信(第4版)》第5章-组网技术.ppt

    《移动通信(第4版)》第5章-组网技术.ppt

    ABB机器人基础.pdf

    ABB机器人基础.pdf

    《综合布线施工技术》第9章-综合布线实训指导.ppt

    《综合布线施工技术》第9章-综合布线实训指导.ppt

    最新修复版万能镜像系统源码-最终版站群利器持续更新升级

    很不错的一套站群系统源码,后台配置采集节点,输入目标站地址即可全自动智能转换自动全站采集!支持 https、支持 POST 获取、支持搜索、支持 cookie、支持代理、支持破解防盗链、支持破解防采集 全自动分析,内外链接自动转换、图片地址、css、js,自动分析 CSS 内的图片使得页面风格不丢失: 广告标签,方便在规则里直接替换广告代码 支持自定义标签,标签可自定义内容、自由截取、内容正则截取。可以放在模板里,也可以在规则里替换 支持自定义模板,可使用标签 diy 个性模板,真正做到内容上移花接木 调试模式,可观察采集性能,便于发现和解决各种错误 多条采集规则一键切换,支持导入导出 内置强大替换和过滤功能,标签过滤、站内外过滤、字符串替换、等等 IP 屏蔽功能,屏蔽想要屏蔽 IP 地址让它无法访问 ****高级功能*****· url 过滤功能,可过滤屏蔽不采集指定链接· 伪原创,近义词替换有利于 seo· 伪静态,url 伪静态化,有利于 seo· 自动缓存自动更新,可设置缓存时间达到自动更新,css 缓存· 支持演示有阿三源码简繁体互转· 代理 IP、伪造 IP、随机 IP、伪造 user-agent、伪造 referer 来路、自定义 cookie,以便应对防采集措施· url 地址加密转换,个性化 url,让你的 url 地址与众不同· 关键词内链功能· 还有更多功能等你发现…… 程序使用非常简单,仅需在后台输入一个域名即可建站,不限子域名,站群利器,无授权,无绑定限制,使用后台功能可对页面进行自定义修改,在程序后台开启生 成功能,只要访问页面就会生成一个本地文件。当用户再次访问的时候就直接访问网站本地的页面,所以目标站点无法访问了也没关系,我们的站点依然可以访问, 支持伪静态、伪原创、生成静态文件、自定义替换、广告管理、友情链接管理、自动下载 CSS 内的图。

    《Approaching(Almost)any machine learning problem》中文版第11章

    【自然语言处理】文本分类方法综述:从基础模型到深度学习的情感分析系统设计

    基于Andorid的下拉浏览应用设计.zip

    基于Andorid的下拉浏览应用设计实现源码,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业。

    P2插电式混合动力系统Simulink模型:基于逻辑门限值控制策略的混动汽车仿真

    内容概要:本文详细介绍了一个原创的P2插电式混合动力系统Simulink模型,该模型基于逻辑门限值控制策略,涵盖了多个关键模块如工况输入、驾驶员模型、发动机模型、电机模型、制动能量回收模型、转矩分配模型、运行模式切换模型、档位切换模型以及纵向动力学模型。模型支持多种标准工况(WLTC、UDDS、EUDC、NEDC)和自定义工况,并展示了丰富的仿真结果,包括发动机和电机转矩变化、工作模式切换、档位变化、电池SOC变化、燃油消耗量、速度跟随和最大爬坡度等。此外,文章还深入探讨了逻辑门限值控制策略的具体实现及其效果,提供了详细的代码示例和技术细节。 适合人群:汽车工程专业学生、研究人员、混动汽车开发者及爱好者。 使用场景及目标:①用于教学和科研,帮助理解和掌握P2混动系统的原理和控制策略;②作为开发工具,辅助设计和优化混动汽车控制系统;③提供仿真平台,评估不同工况下的混动系统性能。 其他说明:文中不仅介绍了模型的整体架构和各模块的功能,还分享了许多实用的调试技巧和优化方法,使读者能够更好地理解和应用该模型。

    电力系统分布式调度中ADMM算法的MATLAB实现及其应用

    内容概要:本文详细介绍了基于ADMM(交替方向乘子法)算法在电力系统分布式调度中的应用,特别是并行(Jacobi)和串行(Gauss-Seidel)两种不同更新模式的实现。文中通过MATLAB代码展示了这两种模式的具体实现方法,并比较了它们的优劣。并行模式适用于多核计算环境,能够充分利用硬件资源,尽管迭代次数较多,但总体计算时间较短;串行模式则由于“接力式”更新机制,通常收敛更快,但在计算资源有限的情况下可能会形成瓶颈。此外,文章还讨论了惩罚系数rho的自适应调整策略以及在电-气耦合系统优化中的应用实例。 适合人群:从事电力系统优化、分布式计算研究的专业人士,尤其是有一定MATLAB编程基础的研究人员和技术人员。 使用场景及目标:①理解和实现ADMM算法在电力系统分布式调度中的应用;②评估并行和串行模式在不同应用场景下的性能表现;③掌握惩罚系数rho的自适应调整技巧,提高算法收敛速度和稳定性。 其他说明:文章提供了详细的MATLAB代码示例,帮助读者更好地理解和实践ADMM算法。同时,强调了在实际工程应用中需要注意的关键技术和优化策略。

    这篇文章详细探讨了交错并联Buck变换器的设计、仿真及其实现,涵盖了从理论分析到实际应用的多个方面(含详细代码及解释)

    内容概要:本文深入研究了交错并联Buck变换器的工作原理、性能优势及其具体实现。文章首先介绍了交错并联Buck变换器相较于传统Buck变换器的优势,包括减小输出电流和电压纹波、降低开关管和二极管的电流应力、减小输出滤波电容容量等。接着,文章详细展示了如何通过MATLAB/Simulink建立该变换器的仿真模型,包括参数设置、电路元件添加、PWM信号生成及连接、电压电流测量模块的添加等。此外,还探讨了PID控制器的设计与实现,通过理论分析和仿真验证了其有效性。最后,文章通过多个仿真实验验证了交错并联Buck变换器在纹波性能、器件应力等方面的优势,并分析了不同控制策略的效果,如P、PI、PID控制等。 适合人群:具备一定电力电子基础,对DC-DC变换器特别是交错并联Buck变换器感兴趣的工程师和技术人员。 使用场景及目标:①理解交错并联Buck变换器的工作原理及其相对于传统Buck变换器的优势;②掌握使用MATLAB/Simulink搭建交错并联Buck变换器仿真模型的方法;③学习PID控制器的设计与实现,了解其在电源系统中的应用;④通过仿真实验验证交错并联Buck变换器的性能,评估不同控制策略的效果。 其他说明:本文不仅提供了详细的理论分析,还给出了大量可运行的MATLAB代码,帮助读者更好地理解和实践交错并联Buck变换器的设计与实现。同时,通过对不同控制策略的对比分析,为实际工程应用提供了有价值的参考。

    《综合布线施工技术》第8章-综合布线工程案例.ppt

    《综合布线施工技术》第8章-综合布线工程案例.ppt

    基于STM32F103C8T6的K型热电偶温度控制仪设计与实现

    内容概要:本文详细介绍了基于STM32F103C8T6的K型热电偶温度控制仪的设计与实现。硬件部分涵盖了热电偶采集电路、OLED显示模块、蜂鸣器电路、风扇控制电路以及EEPROM存储模块。软件部分则涉及ADC配置、OLED刷新、PID控温算法、EEPROM参数存储、风扇PWM控制等多个方面的具体实现。文中不仅提供了详细的代码示例,还分享了许多调试经验和注意事项,如冷端补偿、DMA传输优化、I2C时钟配置、PWM频率选择等。 适合人群:具有一定嵌入式系统开发经验的工程师和技术爱好者。 使用场景及目标:适用于需要进行温度监测与控制的应用场景,如工业自动化、实验室设备等。目标是帮助读者掌握STM32F103C8T6在温度控制领域的应用技巧,提升硬件设计和软件编程能力。 其他说明:本文提供的工程文件包含Altium Designer的原理图PCB文件,便于二次开发。此外,文中还提到了一些扩展功能,如加入Modbus通信协议,供有兴趣的读者进一步探索。

Global site tag (gtag.js) - Google Analytics