福建西南建设有限公司网站,外贸网站怎么营销,竞价推广账户托管费用,做百度手机网站点文章目录常量说明put() 方法putVal() 方法initTable()#xff1a;初始化数组treeifyBin()#xff1a;链表转红黑树tryPresize()#xff1a;初始化数组扩容TreeBin() 构造方法#xff1a;生成红黑树putTreeVal()#xff1a;往红黑树中插入值helpTransfer()#xff1a;多线…
文章目录常量说明put() 方法putVal() 方法initTable()初始化数组treeifyBin()链表转红黑树tryPresize()初始化数组扩容TreeBin() 构造方法生成红黑树putTreeVal()往红黑树中插入值helpTransfer()多线程帮助扩容addCount()计算Map中的元素总数put时1delete时-1fullAddCount()CAS往CounterCell数组中加值sumCount()计算Map中所有元素总数resizeStamp()当返回值左移 RESIZE_STAMP_SHIFT 位时一定是负数transfer()将旧数组元素转移到新数组中ForwardingNode()构造方法hash值为-1MOVED总结JDK8中的ConcurrentHashMap是怎么保证并发安全的?常量说明
数组中 Node 节点的 Hash 值几种常量
// hash0表示数组中是链表hash0表示null
static final int MOVED -1; // Map 在扩容旧数组中对应节点已经被到了新数组中
static final int TREEBIN -2; // 数组对应节点是红黑树与HashMap不同hashMap数组中存放的是红黑树的根节点// 而ConcurrentHashMap给红黑树包了一层放的是TreeBin对象hash值为-2其中有一个指针指向红黑树的根节点
static final int RESERVED -3; // hash for transient reservations
static final int HASH_BITS 0x7fffffff; // usable bits of normal node hashput() 方法
public V put(K key, V value) {return putVal(key, value, false);
}putVal() 方法
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key null || value null) throw new NullPointerException(); // k,v不能为空int hash spread(key.hashCode()); // 计算hash值int binCount 0;for (NodeK,V[] tab table;;) { // 死循环NodeK,V f; int n, i, fh;if (tab null || (n tab.length) 0)tab initTable(); // 初始化数组else if ((f tabAt(tab, i (n - 1) hash)) null) { //数组对应位置的值为nulltabAt方法保证调用时的可见性if (casTabAt(tab, i, null,new NodeK,V(hash, key, value, null))) // cas 往里设置值break; // no lock when adding to empty bin}else if ((fh f.hash) MOVED) // Map正在扩容tab helpTransfer(tab, f);else {V oldVal null;synchronized (f) { // 头节点作为锁对象if (tabAt(tab, i) f) { if (fh 0) {binCount 1;for (NodeK,V e f;; binCount) { // 遍历链表K ek;if (e.hash hash ((ek e.key) key ||(ek ! null key.equals(ek)))) { // 链表中存在相同的值oldVal e.val;if (!onlyIfAbsent)e.val value;break;}NodeK,V pred e;if ((e e.next) null) { // 遍历到链表尾部插入pred.next new NodeK,V(hash, key,value, null);break;}}//HashMap数组中是放入了红黑树的root节点TreeNode类型//ConcurrentHashMap数组中是放入了TreeBin对象TreeBin对象中有一个root属性指向红黑树root节点//相当于是给红黑树在外面包了一层方便用数组中的节点加锁避免了HashMap红黑树root节点会变动问题else if (f instanceof TreeBin) {NodeK,V p;binCount 2;if ((p ((TreeBinK,V)f).putTreeVal(hash, key, // 红黑树中插入值value)) ! null) {oldVal p.val;if (!onlyIfAbsent)p.val value;}}}}if (binCount ! 0) {if (binCount TREEIFY_THRESHOLD) //链表数量大于等于8时转红黑树treeifyBin(tab, i);if (oldVal ! null)return oldVal;break;}}}addCount(1L, binCount);return null;
}initTable()初始化数组
private final NodeK,V[] initTable() {NodeK,V[] tab; int sc;while ((tab table) null || tab.length 0) {if ((sc sizeCtl) 0) // 说明正在有线程初始化Thread.yield(); //让出cpu资源else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //cas获取锁try {if ((tab table) null || tab.length 0) {int n (sc 0) ? sc : DEFAULT_CAPACITY; //默认容量16SuppressWarnings(unchecked)NodeK,V[] nt (NodeK,V[])new Node?,?[n]; //初始化数组table tab nt;sc n - (n 2); //n右移2位相当于16/4容量的1/4然后再用容量减掉就剩下3/4了和HashMap的0.75一样}} finally {sizeCtl sc;}break;}}return tab;
}treeifyBin()链表转红黑树
private final void treeifyBin(NodeK,V[] tab, int index) {NodeK,V b; int n, sc;if (tab ! null) {if ((n tab.length) MIN_TREEIFY_CAPACITY) //数组大小小于64扩容tryPresize(n 1);else if ((b tabAt(tab, index)) ! null b.hash 0) {synchronized (b) {if (tabAt(tab, index) b) {TreeNodeK,V hd null, tl null;for (NodeK,V e b; e ! null; e e.next) {TreeNodeK,V p new TreeNodeK,V(e.hash, e.key, e.val,null, null); //Node节点转换成TreeNode节点// 构建双向链表if ((p.prev tl) null)hd p;elsetl.next p;tl p;}setTabAt(tab, index, new TreeBinK,V(hd)); //构建红黑树并放入数组中}}}}
}tryPresize()初始化数组扩容
private final void tryPresize(int size) {int c (size (MAXIMUM_CAPACITY 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size (size 1) 1);int sc;while ((sc sizeCtl) 0) {NodeK,V[] tab table; int n;if (tab null || (n tab.length) 0) { // 数组初始化与initTable()方法相似n (sc c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table tab) { SuppressWarnings(unchecked)NodeK,V[] nt (NodeK,V[])new Node?,?[n];table nt;sc n - (n 2);}} finally {sizeCtl sc;}}}else if (c sc || n MAXIMUM_CAPACITY) //扩容完毕break;else if (tab table) { //多线程扩容与addCount()方法中逻辑相同int rs resizeStamp(n);if (sc 0) {NodeK,V[] nt;if ((sc RESIZE_STAMP_SHIFT) ! rs || sc rs 1 ||sc rs MAX_RESIZERS || (nt nextTable) null ||transferIndex 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs RESIZE_STAMP_SHIFT) 2))transfer(tab, null);}}
}TreeBin() 构造方法生成红黑树
与 HashMap 中 treeify() 的处理一样就不再赘述详见HashMap put() 方法源码分析#treeify()
// b 是双向链表的头节点
TreeBin(TreeNodeK,V b) {super(TREEBIN, null, null, null); //设置Hash值为-2表示一颗红黑树this.first b;TreeNodeK,V r null;for (TreeNodeK,V x b, next; x ! null; x next) {next (TreeNodeK,V)x.next;x.left x.right null;if (r null) { //根节点的情况x.parent null;x.red false;r x;}else {K k x.key;int h x.hash;Class? kc null;for (TreeNodeK,V p r;;) {int dir, ph;K pk p.key;if ((ph p.hash) h)dir -1;else if (ph h)dir 1;else if ((kc null (kc comparableClassFor(k)) null) ||(dir compareComparables(kc, k, pk)) 0)dir tieBreakOrder(k, pk);TreeNodeK,V xp p;if ((p (dir 0) ? p.left : p.right) null) {x.parent xp;if (dir 0)xp.left x;elsexp.right x;r balanceInsertion(r, x); // 与HashMap处理相同break;}}}}this.root r;assert checkInvariants(root);
}putTreeVal()往红黑树中插入值
final TreeNodeK,V putTreeVal(int h, K k, V v) {// 前面的代码和HashMap的代码一样就不赘述了Class? kc null;boolean searched false;for (TreeNodeK,V p root;;) {int dir, ph; K pk;if (p null) {first root new TreeNodeK,V(h, k, v, null, null);break;}else if ((ph p.hash) h)dir -1;else if (ph h)dir 1;else if ((pk p.key) k || (pk ! null k.equals(pk)))return p;else if ((kc null (kc comparableClassFor(k)) null) ||(dir compareComparables(kc, k, pk)) 0) {if (!searched) {TreeNodeK,V q, ch;searched true;if (((ch p.left) ! null (q ch.findTreeNode(h, k, kc)) ! null) ||((ch p.right) ! null (q ch.findTreeNode(h, k, kc)) ! null))return q;}dir tieBreakOrder(k, pk);}TreeNodeK,V xp p;if ((p (dir 0) ? p.left : p.right) null) {TreeNodeK,V x, f first;first x new TreeNodeK,V(h, k, v, f, xp);if (f ! null)f.prev x;if (dir 0)xp.left x;elsexp.right x;if (!xp.red) //父节点是黑的无需加锁直接插入x.red true;else {lockRoot(); //加写锁try {root balanceInsertion(root, x);} finally {unlockRoot();}}break;}}assert checkInvariants(root);return null;
}helpTransfer()多线程帮助扩容
final NodeK,V[] helpTransfer(NodeK,V[] tab, NodeK,V f) {NodeK,V[] nextTab; int sc;if (tab ! null (f instanceof ForwardingNode) (nextTab ((ForwardingNodeK,V)f).nextTable) ! null) {int rs resizeStamp(tab.length);while (nextTab nextTable table tab (sc sizeCtl) 0) {if ((sc RESIZE_STAMP_SHIFT) ! rs || sc rs 1 ||sc rs MAX_RESIZERS || transferIndex 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1)) {transfer(tab, nextTab);break;}}return nextTab;}return table;
}addCount()计算Map中的元素总数put时1delete时-1
private final void addCount(long x, int check) { //x1check链表长度或者红黑树是2CounterCell[] as; long b, s;if ((as counterCells) ! null ||!U.compareAndSwapLong(this, BASECOUNT, b baseCount, s b x)) { //尝试直接往baseCount中加值// CounterCell数组不为空说明有竞争 或者 直接往baseCount中加值失败cas失败CounterCell a; long v; int m;boolean uncontended true;if (as null || (m as.length - 1) 0 || // 数组为空(a as[ThreadLocalRandom.getProbe() m]) null || //获取线程的随机数 数组长度求出对应线程的数组下标as数组中对应下标的元素为null!(uncontended U.compareAndSwapLong(a, CELLVALUE, v a.value, v x))) { //cas往as数组中对应位置加1fullAddCount(x, uncontended); //CAS往里加数组里加1return; //进入这个if虽然加了1但是不会走后面的扩容逻辑了有冲突时不执行扩容没冲突且达到阈值时才扩容}if (check 1)return;s sumCount(); //算出当前map中元素个数}if (check 0) {NodeK,V[] tab, nt; int n, sc;while (s (long)(sc sizeCtl) (tab table) ! null //大于阈值sizeCtl时扩容(n tab.length) MAXIMUM_CAPACITY) {int rs resizeStamp(n);if (sc 0) { // sc是负数说明正在扩容与helpTransfer()方法逻辑相同if ((sc RESIZE_STAMP_SHIFT) ! rs || sc rs 1 ||sc rs MAX_RESIZERS || (nt nextTable) null ||transferIndex 0) //当满足这些条件时表示扩容完毕了break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc 1)) //每有一个后续线程帮助扩容时sc就1transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs RESIZE_STAMP_SHIFT) 2)) //注意第一个线程执行扩容时会把sc设置成负数并2初始值transfer(tab, null);s sumCount();}}
}ThreadLocalRandom.getProbe() 方法详解关于 ConcurrentHashMap 1.8 中的线程探针哈希 fullAddCount()CAS往CounterCell数组中加值
//wasUncontended字段用来标识当前线程获取的probe通过计算后在CounterCell数组中对应下标是否能加成功
private final void fullAddCount(long x, boolean wasUncontended) { //x1wasUncontendedfalseint h;if ((h ThreadLocalRandom.getProbe()) 0) { //获取线程的hash值多次调用值一样ThreadLocalRandom.localInit(); //初始化h ThreadLocalRandom.getProbe();wasUncontended true;}boolean collide false; // true时表示发生冲突数组要执行扩容for (;;) { //自旋CounterCell[] as; CounterCell a; int n; long v;if ((as counterCells) ! null (n as.length) 0) { // 数组不是空的已经完成了初始化if ((a as[(n - 1) h]) null) { //数组当前下标位置是空的if (cellsBusy 0) { // Try to attach new CellCounterCell r new CounterCell(x); // 初始化数组中对应位置的元素且初始值为1if (cellsBusy 0 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // 数组cas加锁boolean created false;try { // Recheck under lockCounterCell[] rs; int m, j;if ((rs counterCells) ! null (m rs.length) 0 rs[j (m - 1) h] null) { //再次判断rs[j] r;created true;}} finally { //释放锁cellsBusy 0;}if (created) //创建成功break;continue; // 继续循环用continue跳过了修改线程的probe值} }collide false;}else if (!wasUncontended) // false表示上一次cas1的时候失败了wasUncontended true; // 刷新状态为true并修改线程probe的值跳出if后else if (U.compareAndSwapLong(a, CELLVALUE, v a.value, v x)) //再尝试cas1成功则跳出循环失败则修改线程probe的值break;else if (counterCells ! as || n NCPU) //数组扩容后或者数组长度大于cpu核数时不允许扩容collide false; // At max size or staleelse if (!collide) //CAS失败说明发生冲突collide设置为true下次循环过来collide 为true时执行扩容collide true;else if (cellsBusy 0 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //数组加锁扩容try {if (counterCells as) {// Expand table unless staleCounterCell[] rs new CounterCell[n 1]; //创建新数组容量*2for (int i 0; i n; i)rs[i] as[i]; //旧数组元素放新数组中counterCells rs;}} finally {cellsBusy 0;}collide false;continue; // Retry with expanded table}h ThreadLocalRandom.advanceProbe(h); //给线程生成一个新的Probe值随机数}else if (cellsBusy 0 counterCells as //cellsBusy标识数组是否被占用0表示没有占用U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //cas将cellsBusy改为1加锁保证只有一个线程能对cells数组初始化boolean init false;try { // Initialize tableif (counterCells as) {CounterCell[] rs new CounterCell[2]; //初始化数组大小为2rs[h 1] new CounterCell(x); //对应位置直接1counterCells rs;init true; //初始化完毕}} finally {cellsBusy 0;}if (init) //初始化完毕且计数也加了1就跳出循环break;}else if (U.compareAndSwapLong(this, BASECOUNT, v baseCount, v x)) //如果其它情况都被其它线程占用则尝试对baseCount加1break; // Fall back on using base}
}sumCount()计算Map中所有元素总数
final long sumCount() {CounterCell[] as counterCells; CounterCell a;long sum baseCount; //基础值if (as ! null) {for (int i 0; i as.length; i) { //遍历CounterCell数组进行统计求和if ((a as[i]) ! null)sum a.value;}}return sum;
}resizeStamp()当返回值左移 RESIZE_STAMP_SHIFT 位时一定是负数
private static int RESIZE_STAMP_BITS 16;private static final int RESIZE_STAMP_SHIFT 32 - RESIZE_STAMP_BITS;//resizeStamp相当于把第int的第16位赋值为11左移15位是在第16位然后在addCount方法中向左移动16位时1到了符号位所以变成负数了//RESIZE_STAMP_SHIFT 32 - RESIZE_STAMP_BITS所以只要 1 左移RESIZE_STAMP_BITS-1位再左移RESIZE_STAMP_SHIFT位
//共左移了31位1到了第32位1就被移到了符号位成为了负数
static final int resizeStamp(int n) {return Integer.numberOfLeadingZeros(n) | (1 (RESIZE_STAMP_BITS - 1));
}Integer.numberOfLeadingZeros(i) 方法返回无符号整数 i 的最高非 0 位前面的 0 的个数包括符号位在内如果 i 为负数这个方法将会返回 0
transfer()将旧数组元素转移到新数组中
private final void transfer(NodeK,V[] tab, NodeK,V[] nextTab) { //tab是旧数组nextTab是新数组int n tab.length, stride;if ((stride (NCPU 1) ? (n 3) / NCPU : n) MIN_TRANSFER_STRIDE) //计算步长(n/8)/NCPU最小值是16stride MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab null) { // 新数组为空初始化try {SuppressWarnings(unchecked)NodeK,V[] nt (NodeK,V[])new Node?,?[n 1]; //扩容*2nextTab nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl Integer.MAX_VALUE;return;}nextTable nextTab;transferIndex n; //transferIndex多线程下数组从后往前遍历挨个元素转换transferIndex 表示下个线程应该转换旧数组的哪个元素}int nextn nextTab.length;ForwardingNodeK,V fwd new ForwardingNodeK,V(nextTab); //ForwardingNode是个特殊的Node对象hash值为MOVED-1boolean advance true; //当前线程是否继续往前找未转移的元素boolean finishing false; // 当前线程的扩容逻辑是否做完只是当前线程for (int i 0, bound 0;;) { NodeK,V f; int fh;while (advance) {int nextIndex, nextBound; //nextIndex是跳过的右边界nextBound是跳过的左边界左闭右开//不越界的情况下nextIndex - nextBound stride步长if (--i bound || finishing) //倒着遍历advance false;else if ((nextIndex transferIndex) 0) { //旧数组已经转换到下标0的位置了i -1;advance false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound (nextIndex stride ?nextIndex - stride : 0))) { //数组是从右往左遍历如果越界负数下标就设置为0bound nextBound;i nextIndex - 1;advance false;}}if (i 0 || i n || i n nextn) {int sc;if (finishing) {nextTable null;table nextTab; //转移到新的数组sizeCtl (n 1) - (n 1); // *2*0.75return;}if (U.compareAndSwapInt(this, SIZECTL, sc sizeCtl, sc - 1)) { //每有一个线程帮助扩容完sc就-1if ((sc - 2) ! resizeStamp(n) RESIZE_STAMP_SHIFT)//这个逻辑相当于 sc ! (resizeStamp(n) RESIZE_STAMP_SHIFT) 2 //(resizeStamp(n) RESIZE_STAMP_SHIFT) 2 是第一个线程执行扩容时sc 设置的初始值之后每有一个线程帮忙扩容就在 sc 的初始值上 1return; //当前线程扩容执行完毕// 当 sc (resizeStamp(n) RESIZE_STAMP_SHIFT) 2 时会走到这儿表示除了当前线程其它所有帮助扩容的线程都执行完了finishing advance true; //表示所有线程扩容执行完毕i n; // recheck before commit}}else if ((f tabAt(tab, i)) null) //数组对应位置是nulladvance casTabAt(tab, i, null, fwd);else if ((fh f.hash) MOVED) //数组对应位置已被其它线程处理advance true; // 循环处理前一个位置else {synchronized (f) {if (tabAt(tab, i) f) {NodeK,V ln, hn;if (fh 0) { //数组中对应位置是链表// 链表在扩容时会分成高位和低位和HashMap相同假设单向链表最后一位是高位然后往前推反向遍历runBit就指向链表节点第一次变成低位时的后一个高位节点// 即在原链表中runBit指向的节点为头的单向链表要么都是低位的要么都是高位的int runBit fh n; //指向链表最后相同位的节点NodeK,V lastRun f;for (NodeK,V p f.next; p ! null; p p.next) { //遍历链表找出runBit节点的位置int b p.hash n;if (b ! runBit) {runBit b;lastRun p;}}if (runBit 0) { //将runBit后的节点整个挪到新数组ln lastRun;hn null;}else {hn lastRun;ln null;}for (NodeK,V p f; p ! lastRun; p p.next) { //遍历剩余节点往新数组插int ph p.hash; K pk p.key; V pv p.val;if ((ph n) 0) //低位ln new NodeK,V(ph, pk, pv, ln); //头插法else //高位hn new NodeK,V(ph, pk, pv, hn);}setTabAt(nextTab, i, ln); //往新数组中设置值setTabAt(nextTab, i n, hn);setTabAt(tab, i, fwd); //往旧数组中设置fwdadvance true;}else if (f instanceof TreeBin) { //如果数组中的元素是红黑树与HashMap逻辑相同TreeBinK,V t (TreeBinK,V)f;TreeNodeK,V lo null, loTail null;TreeNodeK,V hi null, hiTail null;int lc 0, hc 0;for (NodeK,V e t.first; e ! null; e e.next) {int h e.hash;TreeNodeK,V p new TreeNodeK,V(h, e.key, e.val, null, null);if ((h n) 0) {if ((p.prev loTail) null)lo p;elseloTail.next p;loTail p;lc;}else {if ((p.prev hiTail) null)hi p;elsehiTail.next p;hiTail p;hc;}}ln (lc UNTREEIFY_THRESHOLD) ? untreeify(lo) : //小于6红黑树退化成链表(hc ! 0) ? new TreeBinK,V(lo) : t;hn (hc UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc ! 0) ? new TreeBinK,V(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i n, hn);setTabAt(tab, i, fwd);advance true;}}}}}
}ForwardingNode()构造方法hash值为-1MOVED
ForwardingNode(NodeK,V[] tab) {super(MOVED, null, null, null);this.nextTable tab;
}总结
JDK8中的ConcurrentHashMap是怎么保证并发安全的?
主要利用 Unsafe 操作 synchronized 关键字。
Unsafe 操作的使用主要负责并发安全的修改对象的属性或数组某个位置的值。synchronized 主要负责在需要操作某个位置时进行加锁(该位置不为空)比如向某个位置的链表进行插入结点向某个位置的红黑树插入结点。
JDK8中仍然有分段锁的思想JDK8中数组的每一个位置都有一把锁。 当向 ConcurrentHashMap 中 put 一个 keyvalue 时 首先根据 key 计算对应的数组下标 i如果该位置没有元素则通过自旋的方法去向该位置赋值。 如果该位置有元素则 synchronized 会加锁 加锁成功之后在判断该元素的类型 a. 如果是链表节点则进行添加节点到链表中 b. 如果是红黑树则添加节点到红黑树 添加成功后判断是否需要进行树化 addCount()这个方法的意思是 ConcurrentHashMap 的元素个数加 1但是这个操作也是需要并发安全的并且元素个数加 1 成功后会继续判断是否要进行扩容如果需要则会进行扩容所以这个方法很重要。 同时一个线程在 put 时如果发现当前 ConcurrentHashMap 正在进行扩容则会去帮助扩容。