1.分段加锁
生产者客户端每发送一条消息,都会调用org.apache.kafka.clients.producer.internals.RecordAccumulator#append,因此它是高并发方法,需要保证线程安全。在高并发海量吞吐的场景下,如何才能保证消息有序、高吞吐地发送是值得思考的问题。
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); try { // check if we have an in-progress batch /** * 步骤一:先根据分区找到应该插入到哪个队列里面。 * 如果有已经存在的队列,那么我们就使用存在队列 * 如果队列不存在,那么我们新创建一个队列 * * 我们肯定是有了存储批次的队列,但是大家一定要知道一个事 * 我们代码第一次执行到这儿,获取其实就是一个空的队列。 * * 现在代码第二次执行进来。 * 假设 分区还是之前的那个分区。 * * 这个方法里面我们之前分析,里面就是针对batchs进行的操作 * 里面kafka自己封装了一个数据结构:CopyOnWriteMap (这个数据结构本来就是线程安全的) * * * 根据当前消息的信息,获取一个队列 * * * 线程一,线程二,线程三 */ Deque<RecordBatch> dq = getOrCreateDeque(tp); /** * 假设我们现在有线程一,线程二,线程三 * */ synchronized (dq) { //线程一进来了 //线程二进来了 if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); /** * 步骤二: * 尝试往队列里面的批次里添加数据 * * 一开始添加数据肯定是失败的,我们目前只是以后了队列 * 数据是需要存储在批次对象里面(这个批次对象是需要分配内存的) * 我们目前还没有分配内存,所以如果按场景驱动的方式, * 代码第一次运行到这儿其实是不成功的。 */ RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); //线程一 进来的时候, //第一次进来的时候appendResult的值就为null if (appendResult != null) return appendResult; }//释放锁 // we don't have an in-progress record batch try to allocate a new batch /** * 步骤三:计算一个批次的大小 * 在消息的大小和批次的大小之间取一个最大值,用这个值作为当前这个批次的大小。 * 有可能我们的一个消息的大小比一个设定好的批次的大小还要大。 * 默认一个批次的大小是16K。 * 所以我们看到这段代码以后,应该给我们一个启示。 * 如果我们生产者发送数的时候,如果我们的消息的大小都是超过16K, * 说明其实就是一条消息就是一个批次,那也就是说消息是一条一条被发送出去的。 * 那如果是这样的话,批次这个概念的设计就没有意义了 * 所以大家一定要根据自定公司的数据大小的情况去设置批次的大小。 * * * */ int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); /** * 步骤四: * 根据批次的大小去分配内存 * * * 线程一,线程二,线程三,执行到这儿都会申请内存 * 假设每个线程 都申请了 16k的内存。 * * 线程1 16k * 线程2 16k * 线程3 16k * */ ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { //假设线程一 进来了。 //线程二进来了 // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); /** * 步骤五: * 尝试把数据写入到批次里面。 * 代码第一次执行到这儿的时候 依然还是失败的(appendResult==null) * 目前虽然已经分配了内存 * 但是还没有创建批次,那我们向往批次里面写数据 * 还是不能写的。 * * 线程二进来执行这段代码的时候,是成功的。 */ RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); //失败的意思就是appendResult 还是会等于null if (appendResult != null) { //释放内存 //线程二到这儿,其实他自己已经把数据写到批次了。所以 //他的内存就没有什么用了,就把内存个释放了(还给内存池了。) free.deallocate(buffer); return appendResult; } /** * 步骤六: * 根据内存大小封装批次 * * * 线程一到这儿 会根据内存封装出来一个批次。 */ MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); //尝试往这个批次里面写数据,到这个时候 我们的代码会执行成功。 //线程一,就往批次里面写数据,这个时候就写成功了。 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); /** * 步骤七: * 把这个批次放入到这个队列的队尾 * * * 线程一把批次添加到队尾 */ dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); }//释放锁 } finally { appendsInProgress.decrementAndGet(); } }
使用到java.util.concurrent.atomic.AtomicInteger
private final AtomicInteger appendsInProgress; appendsInProgress.incrementAndGet(); appendsInProgress.decrementAndGet(); 代码看起来可能有点奇怪,写了一堆synchronize,为啥不直接在完整的synchronize块中完成?这恰恰正是设计者的高明之处,其意义在于尽可能将锁的粒度更加精细化进一步提高并发,从上一节 2.5.1 得知,向BufferPool申请内存时可能会导致阻塞,假设一种场景:线程1发送的消息比较大,需要向BufferPool申请新的内存块,而此时因为BufferPool空间不足,随后进入阻塞,但此时它仍然持有Deque的锁;线程2发送的消息很小,Deque最后一个ProducerBatch的剩余空间足够,但由于线程1持有了Deque的锁导致阻塞,若类似线程2情况的线程较多时,势必会造成大量不必要的线程阻塞,降低吞吐量和并发。
2.CopyOnWriteMap 读多写少
org.apache.kafka.clients.producer.internals.RecordAccumulator类里边构造方法初始化
//CopyOnWriteMap的这样的一个数据类型。
//这个数据结构在jdk里面是没有的,是kafka自己设计的。
//这个数据结构设计得很好,因为有了这个数据结构
//整体的提升了封装批次的这个流程的性能!!
//JDK juc包下面:CopyOnWriteArrayList
//把这个空的队列存入batches 这个数据结构里面
//TopicPartition 分区 -》 Deque<RecordBatch> 队列 private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
//CopyOnWriteMap的这样的一个数据类型。 //这个数据结构在jdk里面是没有的,是kafka自己设计的。 //这个数据结构设计得很好,因为有了这个数据结构 //整体的提升了封装批次的这个流程的性能!! //JDK juc包下面:CopyOnWriteArrayList this.batches = new CopyOnWriteMap<>();
public RecordAccumulator(int batchSize, long totalSize, CompressionType compression, long lingerMs, long retryBackoffMs, Metrics metrics, Time time) { this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); this.appendsInProgress = new AtomicInteger(0); this.batchSize = batchSize; this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; //CopyOnWriteMap的这样的一个数据类型。 //这个数据结构在jdk里面是没有的,是kafka自己设计的。 //这个数据结构设计得很好,因为有了这个数据结构 //整体的提升了封装批次的这个流程的性能!! //JDK juc包下面:CopyOnWriteArrayList this.batches = new CopyOnWriteMap<>(); String metricGrpName = "producer-metrics"; this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName); this.incomplete = new IncompleteRecordBatches(); this.muted = new HashSet<>(); this.time = time; registerMetrics(metrics, metricGrpName); }
/** * Get the deque for the given topic-partition, creating it if necessary. */ private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) { /** * CopyonWriteMap: * get * put * */ //直接从batches里面获取当前分区对应的存储队列 Deque<RecordBatch> d = this.batches.get(tp); //我们现在用到是场景驱动的方式,代码第一次执行到这儿的死活 //是获取不到队列的,也就是说d 这个变量的值为null if (d != null) return d; //代码继续执行,创建出来一个新的空队列, d = new ArrayDeque<>(); //把这个空的队列存入batches 这个数据结构里面 Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else //直接返回新的结果 return previous; }
这段代码作用是取出分区对应的 ProducerBatch 队列
其中this.batches采用了CopyOnWriteMap数据结构来存放
客户端每发送一条消息都会调用一次append方法,假设Topic有3个分区,总共发送1000万条消息就需要调用1000万次getOrCreateDeque(tp),其中get调用了1000万次,putIfAbsent仅调用了3次,可见这是一个高并发读多写少的场景。针对此场景,KAFKA精心设计了CopyOnWriteMap,CopyOnWriteMap允许线程并发访问,读操作没有加锁限制,性能较高,而写操作需要先在堆内存创建新对象,再将原对象的内容拷贝至新对象,写操作需要上锁。这种数据结构的优点和缺点都非常明显,
优点是:
1.采用读写分离的思想,读操作性能很高,几乎无需额外开销,十分适用于读多写少的场景;
2.map采用volatile关键字修饰,保证了写操作对map的修改对其它线程可见;
缺点是:
每次写操作都要内存复制一份,数据量大时对内存开销较大,容易导致频繁GC;
无法保证数据的强一致性,毕竟读写是作用于新老对象;
@Override public synchronized V putIfAbsent(K k, V v) { //如果我们传进来的这个key不存在 if (!containsKey(k)) //那么就调用里面内部的put方法 return put(k, v); else //返回结果 return get(k); }
@Override public V get(Object k) { return map.get(k); }
3.ConcurrentSkipListMap 跳表数据结构
Log.scala
//java juc下面的一个数据结构
//首先这个是跳表实现的一个并发安全的Map集合。
//文件名作为key(base offset)
//value就是一个segment
//目的就是为了能我们根据offset的大小快速的定位到segment
/* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = //java juc下面的一个数据结构 //首先这个是跳表实现的一个并发安全的Map集合。 //文件名作为key(base offset) //value就是一个segment //目的就是为了能我们根据offset的大小快速的定位到segment new ConcurrentSkipListMap[java.lang.Long, LogSegment]
/* Load the log segments from the log files on disk */ private def loadSegments() { // create the log directory if it doesn't exist dir.mkdirs() var swapFiles = Set[File]() // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations for(file <- dir.listFiles if file.isFile) { if(!file.canRead) throw new IOException("Could not read file " + file) val filename = file.getName if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { // if the file ends in .deleted or .cleaned, delete it file.delete() } else if(filename.endsWith(SwapFileSuffix)) { // we crashed in the middle of a swap operation, to recover: // if a log, delete the .index file, complete the swap operation later // if an index just delete it, it will be rebuilt val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) if(baseName.getPath.endsWith(IndexFileSuffix)) { file.delete() } else if(baseName.getPath.endsWith(LogFileSuffix)){ // delete the index val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) index.delete() swapFiles += file } } } // now do a second pass and load all the .log and all index files for(file <- dir.listFiles if file.isFile) { val filename = file.getName if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) { // if it is an index file, make sure it has a corresponding .log file val logFile = if (filename.endsWith(TimeIndexFileSuffix)) new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix)) else new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) if(!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) file.delete() } } else if(filename.endsWith(LogFileSuffix)) { // if its a log file, load the corresponding log segment val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong val indexFile = Log.indexFilename(dir, start) val timeIndexFile = Log.timeIndexFilename(dir, start) val indexFileExists = indexFile.exists() val segment = new LogSegment(dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time, fileAlreadyExists = true) if (indexFileExists) { try { segment.index.sanityCheck() segment.timeIndex.sanityCheck() } catch { case e: java.lang.IllegalArgumentException => warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " + s"${indexFile.getAbsolutePath} and rebuilding index...") indexFile.delete() timeIndexFile.delete() segment.recover(config.maxMessageSize) } } else { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) } segments.put(start, segment) } }
if(logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 segments.put(0L, new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time, fileAlreadyExists = false, initFileSize = this.initFileSize(), preallocate = config.preallocate)) } else { recoverLog() // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) activeSegment.timeIndex.resize(config.maxIndexSize) }
java.util.concurrent.ConcurrentSkipListMap 代码:
public V put(K key, V value) { if (value == null) throw new NullPointerException(); return doPut(key, value, false); }
private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { if (n != null) { Object v; int c; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } if (c == 0) { if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } z = new Node<K,V>(key, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b break outer; } } int rnd = ThreadLocalRandom.nextSecondarySeed(); if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; if (level <= (max = h.level)) { for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } else { // try to grow by one level level = max + 1; // hold in array and later pick the one to use @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; int oldLevel = h.level; if (level <= oldLevel) // lost race to add level break; HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // find insertion points and splice in splice: for (int insertionLevel = level;;) { int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { if (q == null || t == null) break splice; if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } } if (j == insertionLevel) { if (!q.link(r, t)) break; // restart if (t.node.value == null) { findNode(key); break splice; } if (--insertionLevel == 0) break splice; } if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
==================================================================
ConcurrentSkipListMap是在JDK 1.6中新增的,为了对高并发场景下的有序Map提供更好的支持,它有几个特点:
高并发场景
key是有序的
添加、删除、查找操作都是基于跳表结构(Skip List)实现的
key和value都不能为null
跳表(Skip List)是一种类似于链表的数据结构,其查询、插入、删除的时间复杂度都是 O(logn)。
在传统的单链表结构中,查找某个元素需要从链表的头部按顺序遍历,直到找到目标元素为止,查找的时间复杂度为O(n)。
而跳表结合了树和链表的特点,其特性如下:
跳表由很多层组成;
每一层都是一个有序的链表;
最底层的链表包含所有元素;
对于每一层的任意一个节点,不仅有指向其下一个节点的指针,也有指向其下一层的指针;
如果一个元素出现在Level n层的链表中,则它在Level n层以下的链表也都会出现。
Skip List例子:
下图是一种可能的跳表结构:
如图,[1]和[40]节点有3层,[8]和[18]节点有2层。每一层都是有序的链表。
如果要查找目标节点[15],大致过程如下:
首先查看[1]节点的第1层,发现[1]节点的下一个节点为[40],大于15,那么查找[1]节点的下一层;
查找[1]节点的第2层,发现[1]节点的下一个节点为[8],小于15,接着查看下一个节点,发现下一个节点是[18],大于15,因此查找[8]节点的下一层;
查找[8]节点的第2层,发现[8]节点的下一个节点是[10],小于15,接着查看下一个节点[13],小于15,接着查看下一个节点[15],发现其值等于15,因此找到了目标节点,结束查询。
跳表实际上是一种 空间换时间 的数据结构。
ConcurrentSkipListMap用到了两种结构的节点。
Node节点代表了真正存储数据的节点,包含了key、value、指向下一个节点的指针next:
static final class Node<K,V> {
final K key; // 键
V val; // 值
Node<K,V> next; // 指向下一个节点的指针
Node(K key, V value, Node<K,V> next) {
this.key = key;
this.val = value;
this.next = next;
}
}
Index节点代表了跳表的层级,包含了当前节点node、下一层down、当前层的下一个节点right:
static final class Index<K,V> {
final Node<K,V> node; // 当前节点
final Index<K,V> down; // 下一层
Index<K,V> right; // 当前层的下一个节点
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
}
下面是对ConcurrentSkipListMap的简单使用的一个例子:
key有顺序的跳表数据结构
import org.apache.kafka.clients.producer.internals.RecordBatch; import org.apache.kafka.common.TopicPartition; //import org.apache.kafka.common.utils.CopyOnWriteMap; import com.ehl.utils.skiplist.CopyOnWriteMap; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap;
//跳表Map代码实现 ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<Integer, String>(); // ConcurrentNavigableMap<Integer, String> map2 = new ConcurrentSkipListMap<Integer, String>(); map.put(4, "4"); map.put(5, "5"); map.put(1, "1"); map.put(6, "6"); map.put(2, "2"); System.out.println(map.keySet()); // [1, 2, 4, 5, 6] System.out.println(map); //{1=1, 2=2, 4=4, 5=5, 6=6} System.out.println(map.descendingKeySet()); // [6, 5, 4, 2, 1] System.out.println(map.descendingMap()); //{6=6, 5=5, 4=4, 2=2, 1=1} // [1, 2, 4, 5, 6] // {1=1, 2=2, 4=4, 5=5, 6=6} // [6, 5, 4, 2, 1] // {6=6, 5=5, 4=4, 2=2, 1=1}
CopyOnWriteMap
/** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.utils; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; /** * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification * * 1) 这个数据结构是在高并发的情况下是线程安全的。 * 2) 采用的读写分离的思想设计的数据结构 * 每次插入(写数据)数据的时候都开辟新的内存空间 * 所以会有个小缺点,就是插入数据的时候,会比较耗费内存空间。 * 3)这样的一个数据结构,适合写少读多的场景。 * 读数据的时候性能很高。 * * batchs这个对象存储数据的时候,就是使用的这个数据结构。 * 对于batches来说,它面对的场景就是读多写少的场景。 * *batches: * 读数据: * 每生产一条消息,都会从batches里面读取数据。 * 假如每秒中生产10万条消息,是不是就意味着每秒要读取10万次。 * 所以绝对是一个高并发的场景。 * 写数据: * 假设有100个分区,那么就是会插入100次数据。 * 并且队列只需要插入一次就可以了。 * 所以这是一个低频的操作。 * * 高性能: * * 读写分离读设计方案:适合的场景就是读多写少。 * 读多: * * 写少: * * * */ public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> { /** * 核心的变量就是一个map * 这个map有个特点,它的修饰符是volatile关键字。 * 在多线程的情况下,如果这个map的值发生变化,其他线程也是可见的。 * * get * put */ private volatile Map<K, V> map; public CopyOnWriteMap() { this.map = Collections.emptyMap(); } public CopyOnWriteMap(Map<K, V> map) { this.map = Collections.unmodifiableMap(map); } @Override public boolean containsKey(Object k) { return map.containsKey(k); } @Override public boolean containsValue(Object v) { return map.containsValue(v); } @Override public Set<java.util.Map.Entry<K, V>> entrySet() { return map.entrySet(); } /** * 没有加锁,读取数据的时候性能很高(高并发的场景下,肯定性能很高) * 并且是线程安全的。 * 因为人家采用的读写分离的思想。 * @param k * @return */ @Override public V get(Object k) { return map.get(k); } @Override public boolean isEmpty() { return map.isEmpty(); } @Override public Set<K> keySet() { return map.keySet(); } @Override public int size() { return map.size(); } @Override public Collection<V> values() { return map.values(); } @Override public synchronized void clear() { this.map = Collections.emptyMap(); } /** * 1): * 整个方法使用的是synchronized关键字去修饰的,说明这个方法是线程安全。 * 即使加了锁,这段代码的性能依然很好,因为里面都是纯内存的操作。 * 2) * 这种设计方式,采用的是读写分离的设计思想。 * 读操作和写操作 是相互不影响的。 * 所以我们读数据的操作就是线程安全的。 *3) * 最后把值赋给了map,map是用volatile关键字修饰的。 * 说明这个map是具有可见性的,这样的话,如果get数据的时候,这儿的值发生了变化,也是能感知到的。 * @param k * @param v * @return */ @Override public synchronized V put(K k, V v) { //新的内存空间 //读写分离 //往新的内存空间里面插入 //读,读数据就老读空间里面去 Map<K, V> copy = new HashMap<K, V>(this.map); //插入数据 V prev = copy.put(k, v); //赋值给map this.map = Collections.unmodifiableMap(copy); return prev; } @Override public synchronized void putAll(Map<? extends K, ? extends V> entries) { Map<K, V> copy = new HashMap<K, V>(this.map); copy.putAll(entries); this.map = Collections.unmodifiableMap(copy); } @Override public synchronized V remove(Object key) { Map<K, V> copy = new HashMap<K, V>(this.map); V prev = copy.remove(key); this.map = Collections.unmodifiableMap(copy); return prev; } @Override public synchronized V putIfAbsent(K k, V v) { //如果我们传进来的这个key不存在 if (!containsKey(k)) //那么就调用里面内部的put方法 return put(k, v); else //返回结果 return get(k); } @Override public synchronized boolean remove(Object k, Object v) { if (containsKey(k) && get(k).equals(v)) { remove(k); return true; } else { return false; } } @Override public synchronized boolean replace(K k, V original, V replacement) { if (containsKey(k) && get(k).equals(original)) { put(k, replacement); return true; } else { return false; } } @Override public synchronized V replace(K k, V v) { if (containsKey(k)) { return put(k, v); } else { return null; } } }
COW MAP代码实践
import org.apache.kafka.clients.producer.internals.RecordBatch; import org.apache.kafka.common.TopicPartition; //import org.apache.kafka.common.utils.CopyOnWriteMap; import com.ehl.utils.skiplist.CopyOnWriteMap; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap;
//COW写时复制MAP代码实现 //TopicPartition 分区 -》 Deque<RecordBatch> 队列 // private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; // dq.addLast(batch); // deque.addFirst(batch); // deque.peekLast(); // dq.remove(batch); ConcurrentMap<Integer, Deque<String>> cowMap = new CopyOnWriteMap<Integer, Deque<String>>(); // Integer 代表4个分区 1-4 Deque<String> 代表4个队列 Deque<String> d1 = new ArrayDeque<String>(); Deque<String> d2 = new ArrayDeque<String>(); Deque<String> d3 = new ArrayDeque<String>(); Deque<String> d4 = new ArrayDeque<String>(); d1.add("aaa"); d2.add("bbb"); d3.add("ccc"); d4.add("ddd"); d1 = cowMap.putIfAbsent(1, d1); //key-四个分区 写少 写入新Map 加锁synchronized 同时赋值给旧MAP cowMap.putIfAbsent(2, d2); //key-四个分区 写少 写入新Map 加锁synchronized 同时赋值给旧MAP 写时复制为了读写分离 cowMap.putIfAbsent(3, d3); //key-四个分区 写少 写入新Map 加锁synchronized 同时赋值给旧MAP 写时复制为了读写分离 cowMap.putIfAbsent(4, d4); //key-四个分区 写少 写入新Map 加锁synchronized 同时赋值给旧MAP 写时复制为了读写分离 d1 = cowMap.putIfAbsent(1, d1);//第二次再写入 如key存在 则直接返回value队列 然后写入addLast 写时复制 d1.addLast("aaa2"); System.out.println(cowMap.get(1));//从旧MAP中获取数据 此MAP是volatile的 读多 System.out.println(cowMap.get(2));//从旧MAP中获取数据 此MAP是volatile的 读多 System.out.println(cowMap.get(4));//从旧MAP中获取数据 此MAP是volatile的 读多 // 打印 // [aaa, aaa2] // [bbb] // [ddd]
相关推荐
内容概要:本文详细介绍了欧姆龙NJ系列PLC与多个品牌总线设备(如汇川伺服、雷赛步进控制器、SMC电缸等)的控制程序及其配置方法。重点讨论了PDO映射、参数配置、单位转换、故障排查等方面的实际经验和常见问题。文中提供了具体的代码示例,帮助读者理解和掌握这些复杂系统的调试技巧。此外,还特别强调了不同品牌设备之间的兼容性和注意事项,以及如何避免常见的配置错误。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些需要进行PLC与总线设备集成工作的专业人士。 使用场景及目标:适用于需要将欧姆龙NJ PLC与其他品牌总线设备集成在一起的应用场景,如工厂自动化生产线、机器人控制等。主要目标是提高系统的可靠性和效率,减少调试时间和成本。 其他说明:文章不仅提供了理论知识,还包括大量来自实际项目的实践经验,有助于读者更好地应对现实中的挑战。建议读者在实践中不断积累经验,逐步掌握各种设备的特点和最佳实践。
数字化企业转型大数据解决方案.pptx
内容概要:本文详细介绍了利用MATLAB实现多智能体系统一致性算法在电力系统分布式经济调度中的应用。文中通过具体的MATLAB代码展示了如何将发电机组和柔性负荷视为智能体,通过局部通信和协商达成全局最优调度。核心算法通过迭代更新增量成本和增量效益,使各个节点在无中央指挥的情况下自行调整功率,最终实现经济最优分配。此外,文章还讨论了通信拓扑对收敛速度的影响以及一些工程优化技巧,如稀疏矩阵存储和自适应参数调整。 适合人群:从事电力系统调度、分布式控制系统设计的研究人员和技术人员,尤其是对多智能体系统和MATLAB编程有一定了解的人群。 使用场景及目标:适用于希望提高电力系统调度效率、降低成本并增强系统鲁棒性的应用场景。主要目标是在分布式环境下实现快速、稳定的经济调度,同时减少通信量和计算资源消耗。 其他说明:文章提供了详细的代码示例和测试结果,展示了算法的实际性能和优势。对于进一步研究和实际应用具有重要参考价值。
获取虎牙直播流地址的油猴脚本,可以直接使用VLC等播放器打开地址播放。
内容概要:本文详细介绍了如何利用MATLAB进行价格型需求响应的研究,特别是电价弹性矩阵的构建与优化。文章首先解释了电价弹性矩阵的概念及其重要性,接着展示了如何通过MATLAB代码实现弹性矩阵的初始化、负荷变化量的计算以及优化方法。文中还讨论了如何通过非线性约束和目标函数最小化峰谷差,确保用户用电舒适度的同时实现负荷的有效调节。此外,文章提供了具体的代码实例,包括原始负荷曲线与优化后负荷曲线的对比图,以及基于历史数据的参数优化方法。 适合人群:从事电力系统优化、能源管理及相关领域的研究人员和技术人员。 使用场景及目标:适用于希望深入了解并掌握价格型需求响应机制的专业人士,旨在帮助他们更好地理解和应用电价弹性矩阵,优化电力系统的负荷分布,提高能源利用效率。 其他说明:文章强调了实际应用中的注意事项,如弹性矩阵的动态校准和用户价格敏感度的滞后效应,提供了实用的技术细节和实践经验。
CSP-J 2021 初赛真题.pdf
内容概要:本文详细介绍了如何利用麻雀优化算法(SSA)与长短期记忆网络(LSTM)相结合,在MATLAB环境中构建一个用于时间序列单输入单输出预测的模型。首先简述了SSA和LSTM的基本原理,接着逐步讲解了从数据准备、预处理、模型构建、参数优化到最后的预测与结果可视化的完整流程。文中提供了详细的MATLAB代码示例,确保读者能够轻松复现实验。此外,还讨论了一些关键参数的选择方法及其对模型性能的影响。 适合人群:对时间序列预测感兴趣的科研人员、研究生以及有一定编程基础的数据分析师。 使用场景及目标:适用于需要对单变量时间序列数据进行高精度预测的应用场合,如金融、能源等领域。通过本篇文章的学习,读者将掌握如何使用MATLAB实现SSA优化LSTM模型的具体步骤和技术要点。 其他说明:为了提高模型的泛化能力,文中特别强调了数据预处理的重要性,并给出了具体的实现方式。同时,针对可能出现的问题,如过拟合、梯度爆炸等,也提供了一些建议性的解决方案。
内容概要:本文详细介绍了西门子S7-1200 PLC与施耐德ATV310/312变频器通过Modbus RTU进行通讯的具体实现步骤和调试技巧。主要内容涵盖硬件接线、通讯参数配置、控制启停、设定频率、读取运行参数的方法以及常见的调试问题及其解决方案。文中提供了具体的代码示例,帮助读者理解和实施通讯程序。此外,还强调了注意事项,如地址偏移量、数据格式转换和超时匹配等。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些需要将西门子PLC与施耐德变频器进行集成的工作人员。 使用场景及目标:适用于需要通过Modbus RTU协议实现PLC与变频器通讯的工程项目。目标是确保通讯稳定可靠,掌握解决常见问题的方法,提高调试效率。 其他说明:文中提到的实际案例和调试经验有助于读者避免常见错误,快速定位并解决问题。建议读者在实践中结合提供的代码示例和调试工具进行操作。
本文详细介绍了Scala语言的基础知识和特性。Scala是一种运行在JVM上的编程语言,兼具面向对象和函数式编程的特点,适合大数据处理。其环境配置需注意Java版本和路径问题。语言基础涵盖注释、变量、数据类型、运算符和流程控制。函数特性包括高阶函数、柯里化、闭包、尾递归等。面向对象方面,Scala支持继承、抽象类、特质等,并通过包、类和对象实现代码组织和管理,同时提供了单例对象和伴生对象的概念。
内容概要:本文详细探讨了石墨烯-金属强耦合拉比分裂现象的研究,主要借助Comsol多物理场仿真软件进行模拟。文章首先介绍了拉比分裂的基本概念,即当石墨烯与金属相互靠近时,原本单一的共振模式会分裂成两个,这种现象背后的电磁学和量子力学原理对于开发新型光电器件、高速通信设备等意义重大。接着阐述了Comsol在研究中的重要作用,包括构建石墨烯-金属相互作用模型、设置材料属性、定义边界条件、划分网格以及求解模型的具体步骤。此外,还展示了具体的建模示例代码,并对模拟结果进行了深入分析,解释了拉比分裂现象的形成机理。最后强调了该研究对未来技术创新的重要价值。 适合人群:从事物理学、材料科学、光电工程等领域研究的专业人士,尤其是对石墨烯-金属强耦合感兴趣的科研工作者。 使用场景及目标:适用于希望深入了解石墨烯-金属强耦合拉比分裂现象的研究人员,旨在帮助他们掌握Comsol仿真工具的应用技巧,提高研究效率,推动相关领域的创新发展。 其他说明:文中提供的代码片段和建模思路可供读者参考实践,但需要注意实际应用时需根据具体情况调整参数配置。
内容概要:本文详细介绍了基于FPGA的电机控制系统的设计与实现,重点探讨了Verilog和Nios II软核相结合的方式。具体来说,编码器模块利用Verilog实现了高精度的四倍频计数,解决了AB相信号的跳变问题;坐标变换部分则由Nios II软核负责,通过C语言实现Clarke变换和Park变换,提高了计算效率;SVPWM生成模块采用了Verilog硬件加速,优化了调制波的生成时间和波形质量。此外,文章还讨论了Nios II和Verilog之间的高效交互方式,如自定义指令和DMA传输,以及中断处理机制,确保系统的实时性和稳定性。文中提到的一些优化技巧,如定点数运算、查表法、流水线设计等,进一步提升了系统的性能。 适合人群:具有一定FPGA和嵌入式开发经验的研发人员,尤其是对电机控制感兴趣的工程师。 使用场景及目标:适用于需要高性能、低延迟的电机控制应用场景,如工业自动化、机器人、无人机等领域。目标是帮助读者掌握FPGA与Nios II结合的电机控制方法,提高系统的实时性和可靠性。 其他说明:文章提供了详细的代码片段和优化建议,有助于读者理解和实践。同时,文中提及了一些常见的调试问题及其解决方案,如符号位处理不当导致的电机反转、数据溢出等问题,提醒读者在实际项目中加以注意。
内容概要:本文档《ATK-DLRK3568嵌入式Qt开发实战V1.2》是正点原子出品的一份面向初学者的嵌入式Qt开发指南,主要内容涵盖嵌入式Linux环境下Qt的安装配置、C++基础、Qt基础、多线程编程、网络编程、多媒体开发、数据库操作以及项目实战案例。文档从最简单的“Hello World”程序开始,逐步引导读者熟悉Qt开发环境的搭建、常用控件的使用、信号与槽机制、UI设计、数据处理等关键技术点。此外,文档还提供了详细的项目实战案例,如车牌识别系统的开发,帮助读者将理论知识应用于实际项目中。 适合人群:具备一定Linux和C++基础,希望快速入门嵌入式Qt开发的初学者或有一定开发经验的研发人员。 使用场景及目标: 1. **环境搭建**:学习如何在Ubuntu环境下搭建Qt开发环境,包括安装必要的工具和库。 2. **基础知识**:掌握C++面向对象编程、Qt基础控件的使用、信号与槽机制等核心概念。 3. **高级功能**:理解多线程编程、网络通信、多媒体处理、数据库操作等高级功能的实现方法。 4. **项目实战**:通过具体的项目案例(如车牌识别系统),巩固
内容概要:文章深入探讨了宇树科技人形机器人的技术实力、市场表现及未来前景,揭示其背后是科技创新还是市场炒作。宇树科技,成立于2016年,由90后创业者王兴兴创办,从四足机器人(如Laikago、AlienGo、A1)成功跨越到人形机器人(如H1和G1)。H1具有出色的运动能力和高精度导航技术,G1则专注于娱乐陪伴场景,具备模拟人手操作的能力。市场方面,宇树科技人形机器人因春晚表演而走红,但目前仅限于“极客型”用户购买,二手市场租赁价格高昂。文章认为,宇树科技的成功既源于技术突破,也离不开市场炒作的影响。未来,宇树科技将在工业、服务业、娱乐等多个领域拓展应用,但仍需克服成本、稳定性和安全等方面的挑战。 适合人群:对人工智能和机器人技术感兴趣的科技爱好者、投资者以及相关行业的从业者。 使用场景及目标:①了解宇树科技人形机器人的技术特点和发展历程;②分析其市场表现及未来应用前景;③探讨科技创新与市场炒作之间的关系。 阅读建议:本文详细介绍了宇树科技人形机器人的技术细节和市场情况,读者应关注其技术创新点,同时理性看待市场炒作现象,思考人形机器人的实际应用价值和发展潜力。
C#3-的核心代码以及练习题相关
内容概要:本文详细介绍了一种将麻雀搜索算法(SSA)用于优化支持向量机(SVM)分类的方法,并以红酒数据集为例进行了具体实现。首先介绍了数据预处理步骤,包括从Excel读取数据并进行特征和标签的分离。接着阐述了适应度函数的设计,采用五折交叉验证计算准确率作为评价标准。然后深入探讨了麻雀算法的核心迭代过程,包括参数初始化、种群更新规则以及如何通过指数衰减和随机扰动来提高搜索效率。此外,文中还提到了一些实用技巧,如保存最优参数以避免重复计算、利用混淆矩阵可视化分类结果等。最后给出了完整的代码框架及其在GitHub上的开源地址。 适合人群:具有一定MATLAB编程基础的研究人员和技术爱好者,尤其是对机器学习算法感兴趣的人士。 使用场景及目标:适用于需要解决多分类问题的数据科学家或工程师,旨在提供一种高效且易于使用的SVM参数优化方法,帮助用户获得更高的分类准确性。 其他说明:该方法不仅限于红酒数据集,在其他类似的数据集中同样适用。用户只需确保数据格式正确即可轻松替换数据源。
内容概要:本文详细介绍了如何在MATLAB/Simulink环境中搭建四分之一车被动悬架双质量(二自由度)模型。该模型主要用于研究车辆悬架系统在垂直方向上的动态特性,特别是针对路面不平度引起的车轮和车身振动。文中不仅提供了具体的建模步骤,包括输入模块、模型主体搭建和输出模块的设计,还展示了如何通过仿真分析来评估悬架性能,如乘坐舒适性和轮胎接地性。此外,文章还讨论了一些常见的建模技巧和注意事项,如选择合适的求解器、处理代数环等问题。 适合人群:从事汽车动力学研究的科研人员、高校学生以及对车辆悬架系统感兴趣的工程师。 使用场景及目标:①用于教学目的,帮助学生理解车辆悬架系统的理论知识;②用于科研实验,验证不同的悬架设计方案;③用于工业应用,优化实际车辆的悬架系统设计。 其他说明:本文提供的模型基于MATLAB 2016b及以上版本,确保读者能够顺利重现所有步骤并获得预期结果。同时,文中附带了大量的代码片段和具体的操作指南,便于读者快速上手。
内容概要:本文详细介绍了如何使用COMSOL软件进行光子晶体板谷态特性的建模与仿真。首先,定义了晶格常数和其他关键参数,如六边形蜂窝结构的创建、材料属性的设定以及周期性边界的配置。接下来,重点讲解了网格剖分的方法,强调了自适应网格和边界层细化的重要性。随后,讨论了如何通过参数扫描和频域分析来探索谷态特征,特别是在布里渊区高对称点附近观察到的能量带隙和涡旋结构。最后,提供了关于仿真收敛性和优化技巧的建议,确保结果的可靠性和准确性。 适合人群:从事光子学、电磁学及相关领域的研究人员和技术人员,尤其是对拓扑光子学感兴趣的学者。 使用场景及目标:适用于希望深入了解光子晶体板谷态特性的科研工作者,旨在帮助他们掌握COMSOL的具体应用方法,从而更好地进行相关实验和理论研究。 其他说明:文中不仅提供了详细的代码示例,还穿插了许多形象生动的比喻,使复杂的物理概念变得通俗易懂。同时,强调了仿真过程中需要注意的技术细节,如网格划分、边界条件设置等,有助于避免常见错误并提高仿真的成功率。
内容概要:本文详细介绍了利用有限差分时域法(FDTD)对金纳米球进行米氏散射仿真的全过程。首先,通过Python脚本设置了仿真环境,包括网格精度、材料参数、光源配置等。接着,展示了如何通过近场积分计算散射截面和吸收截面,并进行了远场角分布的仿真。文中还讨论了常见错误及其解决方法,如网格精度不足、边界条件不当等问题。最终,将仿真结果与米氏解析解进行了对比验证,确保了仿真的准确性。 适合人群:从事微纳光学研究的科研人员、研究生以及相关领域的工程师。 使用场景及目标:适用于需要精确模拟纳米颗粒与电磁波相互作用的研究项目,旨在提高仿真精度并验证理论模型。通过本文的学习,可以掌握FDTD仿真的具体实施步骤和技术要点。 其他说明:本文不仅提供了详细的代码示例,还分享了许多实践经验,帮助读者避免常见的仿真陷阱。同时强调了参数选择的重要性,特别是在纳米尺度下,每一个参数都需要精心调整以获得准确的结果。
基数
2ddddddddddddddddddddddddddd