Java无锁应对高并发Atomic详解 不念不忘少年蓝@ 2022-12-23 02:00 236阅读 0赞 问题: ** java多线程有线程安全问题,因为操作非原子,分为三步:1.读主内存的值 2.在工作内存中操作 3.回写到主内存** 解决方案: **操作加锁,在写回主内存前,其他线程不允许更新** 该方案缺点: **重量级锁,性能差** 优化方案: **使用Atomic系列类,如AtomicInteger,其底层调用CPU的CAS(CompareAndSwap)指令,先比较是否符合预期,如符合预期更新,不符合预期放弃重试**。源码如下: // 封装了一个int对其加减 private volatile int value; ....... public final boolean compareAndSet(int expect, int update) { // 通过unsafe 基于CPU的CAS指令来实现, 可以认为无阻塞. return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } ....... public final int getAndIncrement() { for (;;) { // 当前值 int current = get(); // 预期值 int next = current + 1; if (compareAndSet(current, next)) { // 如果加成功了, 则返回当前值 return current; } // 如果加失败了, 说明其他线程已经修改了数据, 与期望不相符, // 则继续无限循环, 直到成功. 这种乐观锁, 理论上只要等两三个时钟周期就可以设值成功 // 相比于直接通过synchronized独占锁的方式操作int, 要大大节约等待时间. } } ## ## ### 目录 ### ![9beef1d1251f25a156c9bba3bbdecc79.png][] 无锁即无障碍的运行, 所有线程都可以到达临界区, 接近于无等待. 无锁采用CAS(compare and swap)算法来处理线程冲突, 其原理如下 ### CAS原理 ### CAS包含3个参数CAS(V,E,N).V表示要更新的变量, E表示预期值, N表示新值. 仅当V值等于E值时, 才会将V的值设为N, 如果V值和E值不同, 则说明已经有其他线程做了更新, 则当前线程什么 都不做. 最后, CAS返回当前V的真实值. CAS操作是抱着乐观的态度进行的, 它总是认为自己可以成功完成操作. 当多个线程同时使用CAS操作一个变量时, 只有一个会胜出, 并成功更新, 其余均会失败.失败的线程不会被挂起, 仅是被告知失败, 并且允许再次尝试, 当然也允许失败的线程放弃操作.基于这样的原理, CAS操作即时没有锁, 也可以发现其他线程对当前线程的干扰, 并进行恰当的处理. **CPU指令** 另外, 虽然上述步骤繁多, 实际上CAS整一个操作过程是一个原子操作, 它是由一条CPU指令完成的, 从指令层保证操作可靠, 不会被多线程干扰. **无锁与volatile** 无锁可以通过cas来保证原子性与线程安全, 他与volatile什么区别呢? 当给变量加了volatile关键字, 表示该变量对所有线程可见, **但不保证原子性**. 以volatile i, i++为例, 分为以下四步: * 加载i * 对i进行+1 * 回写i的值 * 用内存屏障通知其他线程i的值 其中前三步是线程不安全的, 可能其他线程会对i进行读写. > 因此任何依赖于之前值的操作, 如i++, i = i \*10使用volatile都不安全. > > 而诸如get/set, boolean这类可以使用volatile. ### AtomicInteger ### **主要接口** ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 // 取得当前值 2 public final int get() 3 // 设置当前值 4 public final void set(int newValue) 5 // 设置新值,并返回旧值 6 public final int getAndSet(int newValue) 7 // 如果当前值为expect,则设置为u 8 public final boolean compareAndSet(int expect, int u) 9 // 当前值加1,返回旧值 10 public final int getAndIncrement() 11 // 当前值减1,返回旧值 12 public final int getAndDecrement() 13 // 当前值增加delta,返回旧值 14 public final int getAndAdd(int delta) 15 // 当前值加1,返回新值 16 public final int incrementAndGet() 17 // 当前值减1,返回新值 18 public final int decrementAndGet() 19 // 当前值增加delta,返回新值 20 public final int addAndGet(int delta) ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] **源码实现** ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 // 封装了一个int对其加减 2 private volatile int value; 3 ....... 4 public final boolean compareAndSet(int expect, int update) { 5 // 通过unsafe 基于CPU的CAS指令来实现, 可以认为无阻塞. 6 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 7 } 8 ....... 9 public final int getAndIncrement() { 10 for (;;) { 11 // 当前值 12 int current = get(); 13 // 预期值 14 int next = current + 1; 15 if (compareAndSet(current, next)) { 16 // 如果加成功了, 则返回当前值 17 return current; 18 } 19 // 如果加失败了, 说明其他线程已经修改了数据, 与期望不相符, 20 // 则继续无限循环, 直到成功. 这种乐观锁, 理论上只要等两三个时钟周期就可以设值成功 21 // 相比于直接通过synchronized独占锁的方式操作int, 要大大节约等待时间. 22 } 23 } ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] **Demo** 使用10个线程打印0-10000, 最终得到结果10w. ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 import java.util.concurrent.atomic.AtomicInteger; 2 3 public class AtomicIntegerDemo { 4 static AtomicInteger i = new AtomicInteger(); 5 6 public static class AddThread implements Runnable { 7 public void run() { 8 for (int k = 0; k < 10000; k++) { 9 i.incrementAndGet(); 10 } 11 } 12 } 13 14 public static void main(String[] args) throws InterruptedException { 15 Thread[] ts = new Thread[10]; 16 for (int k = 0; k < 10; k++) { 17 ts[k] = new Thread(new AddThread()); 18 } 19 for (int k = 0; k < 10; k++) { 20 ts[k].start(); 21 } 22 for (int k = 0; k < 10; k++) { 23 ts[k].join(); 24 } 25 System.out.println(i); 26 } 27 } ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] ### Unsafe ### Unsafe类是在sun.misc包下, 可以用于一些非安全的操作,比如: 根据偏移量设置值, 线程park(), 底层的CAS操作等等. 1 // 获取类实例中变量的偏移量 2 valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); 3 // 基于偏移量对值进行操作 4 unsafe.compareAndSwapInt(this, valueOffset, expect, update); **主要接口** ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 // 获得给定对象偏移量上的int值 2 public native int getInt(Object o, long offset); 3 // 设置给定对象偏移量上的int值 4 public native void putInt(Object o, long offset, int x); 5 // 获得字段在对象中的偏移量 6 public native long objectFieldOffset(Field f); 7 // 设置给定对象的int值,使用volatile语义 8 public native void putIntVolatile(Object o, long offset, int x); 9 // 获得给定对象对象的int值,使用volatile语义 10 public native int getIntVolatile(Object o, long offset); 11 // 和putIntVolatile()一样,但是它要求被操作字段就是volatile类型的 12 public native void putOrderedInt(Object o, long offset, int x); ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] ### AtomicReference ### 与AtomicInteger类似, 只是里面封装了一个对象, 而不是int, 对引用进行修改 **主要接口** 1 get() 2 set(V) 3 compareAndSet() 4 getAndSet(V) **Demo** 使用10个线程, 同时尝试修改AtomicReference中的String, 最终只有一个线程可以成功. ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 import java.util.concurrent.atomic.AtomicReference; 2 3 public class AtomicReferenceTest { 4 public final static AtomicReference<String> attxnicStr = new AtomicReference<String>("abc"); 5 6 public static void main(String[] args) { 7 for (int i = 0; i < 10; i++) { 8 new Thread() { 9 public void run() { 10 try { 11 Thread.sleep(Math.abs((int) (Math.random() * 100))); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 if (attxnicStr.compareAndSet("abc", "def")) { 16 System.out.println("Thread:" + Thread.currentThread().getId() + " change value to " + attxnicStr.get()); 17 } else { 18 System.out.println("Thread:" + Thread.currentThread().getId() + " change failed!"); 19 } 20 } 21 }.start(); 22 } 23 } 24 } ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] ### AtomicStampedReference ### 也是封装了一个引用, 主要解决ABA问题. **ABA问题** 线程一准备用CAS将变量的值由A替换为B, 在此之前线程二将变量的值由A替换为C, 线程三又将C替换为A, 然后线程一执行CAS时发现变量的值仍然为A, 所以线程一CAS成功. **主要接口** ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 // 比较设置 参数依次为:期望值 写入新值 期望时间戳 新时间戳 2 public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) 3 // 获得当前对象引用 4 public V getReference() 5 // 获得当前时间戳 6 public int getStamp() 7 // 设置当前对象引用和时间戳 8 public void set(V newReference, int newStamp) ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] **源码分析** ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 // 内部封装了一个Pair对象, 每次对对象操作的时候, stamp + 1 2 private static class Pair<T> { 3 final T reference; 4 final int stamp; 5 private Pair(T reference, int stamp) { 6 this.reference = reference; 7 this.stamp = stamp; 8 } 9 static <T> Pair<T> of(T reference, int stamp) { 10 return new Pair<T>(reference, stamp); 11 } 12 } 13 14 private volatile Pair<V> pair; 15 16 // 进行cas操作的时候, 会对比stamp的值 17 public boolean compareAndSet(V expectedReference, 18 V newReference, 19 int expectedStamp, 20 int newStamp) { 21 Pair<V> current = pair; 22 return 23 expectedReference == current.reference && 24 expectedStamp == current.stamp && 25 ((newReference == current.reference && 26 newStamp == current.stamp) || 27 casPair(current, Pair.of(newReference, newStamp))); 28 } ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] **Demo** 后台使用多个线程对用户充值, 要求只能充值一次 ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 public class AtomicStampedReferenceDemo { 2 static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0); 3 public staticvoid main(String[] args) { 4 //模拟多个线程同时更新后台数据库,为用户充值 5 for(int i = 0 ; i < 3 ; i++) { 6 final int timestamp=money.getStamp(); 7 newThread() { 8 public void run() { 9 while(true){ 10 while(true){ 11 Integerm=money.getReference(); 12 if(m<20){ 13 if(money.compareAndSet(m,m+20,timestamp,timestamp+1)){ 14 System.out.println("余额小于20元,充值成功,余额:"+money.getReference()+"元"); 15 break; 16 } 17 }else{ 18 //System.out.println("余额大于20元,无需充值"); 19 break ; 20 } 21 } 22 } 23 } 24 }.start(); 25 } 26 27 //用户消费线程,模拟消费行为 28 new Thread() { 29 publicvoid run() { 30 for(int i=0;i<100;i++){ 31 while(true){ 32 int timestamp=money.getStamp(); 33 Integer m=money.getReference(); 34 if(m>10){ 35 System.out.println("大于10元"); 36 if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){ 37 System.out.println("成功消费10元,余额:"+money.getReference()); 38 break; 39 } 40 }else{ 41 System.out.println("没有足够的金额"); 42 break; 43 } 44 } 45 try {Thread.sleep(100);} catch (InterruptedException e) {} 46 } 47 } 48 }.start(); 49 } 50 } ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] ### AtomicIntegerArray ### 支持无锁的数组 **主要接口** ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 // 获得数组第i个下标的元素 2 public final int get(int i) 3 // 获得数组的长度 4 public final int length() 5 // 将数组第i个下标设置为newValue,并返回旧的值 6 public final int getAndSet(int i, int newValue) 7 // 进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true 8 public final boolean compareAndSet(int i, int expect, int update) 9 // 将第i个下标的元素加1 10 public final int getAndIncrement(int i) 11 // 将第i个下标的元素减1 12 public final int getAndDecrement(int i) 13 // 将第i个下标的元素增加delta(delta可以是负数) 14 public final int getAndAdd(int i, int delta) ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] **源码分析** ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 // 数组本身基地址 2 private static final int base = unsafe.arrayBaseOffset(int[].class); 3 4 // 封装了一个数组 5 private final int[] array; 6 7 static { 8 // 数组中对象的宽度, int类型, 4个字节, scale = 4; 9 int scale = unsafe.arrayIndexScale(int[].class); 10 if ((scale & (scale - 1)) != 0) 11 throw new Error("data type scale not a power of two"); 12 // 前导0 : 一个数字转为二进制后, 他前面0的个数 13 // 对于4来讲, 他就是00000000 00000000 00000000 00000100, 他的前导0 就是29 14 // 所以shift = 2 15 shift = 31 - Integer.numberOfLeadingZeros(scale); 16 } 17 18 // 获取第i个元素 19 public final int get(int i) { 20 return getRaw(checkedByteOffset(i)); 21 } 22 23 // 第i个元素, 在数组中的偏移量是多少 24 private long checkedByteOffset(int i) { 25 if (i < 0 || i >= array.length) 26 throw new IndexOutOfBoundsException("index " + i); 27 28 return byteOffset(i); 29 } 30 31 // base : 数组基地址, i << shift, 其实就是i * 4, 因为这边是int array. 32 private static long byteOffset(int i) { 33 // i * 4 + base 34 return ((long) i << shift) + base; 35 } 36 37 // 根据偏移量从数组中获取数据 38 private int getRaw(long offset) { 39 return unsafe.getIntVolatile(array, offset); 40 } ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] **Demo** ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 import java.util.concurrent.atomic.AtomicIntegerArray; 2 3 public class AtomicArrayDemo { 4 static AtomicIntegerArray arr = new AtomicIntegerArray(10); 5 6 public static class AddThread implements Runnable { 7 public void run() { 8 for (int k = 0; k < 10000; k++) { 9 arr.incrementAndGet(k % arr.length()); 10 } 11 } 12 } 13 14 public static void main(String[] args) throws InterruptedException { 15 Thread[] ts = new Thread[10]; 16 for (int k = 0; k < 10; k++) { 17 ts[k] = new Thread(new AddThread()); 18 } 19 for (int k = 0; k < 10; k++) { 20 ts[k].start(); 21 } 22 for (int k = 0; k < 10; k++) { 23 ts[k].join(); 24 } 25 System.out.println(arr); 26 } 27 } ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] AtomicIntegerFieldUpdater 让普通变量也享受原子操作 **主要接口** 1 AtomicIntegerFieldUpdater.newUpdater() 2 incrementAndGet() * Updater只能修改它可见范围内的变量。因为Updater使用反射得到这个变量。如果变量不可见,就会出错。比如如果score申明为private,就是不可行的。 * 为了确保变量被正确的读取,它必须是volatile类型的。如果我们原有代码中未申明这个类型,那么简单得申明一下就行,这不会引起什么问题。 * 由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(Unsafe.objectFieldOffset()不支持静态变量)。 ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 import java.util.concurrent.atomic.AtomicInteger; 2 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; 3 4 public class AtomicIntegerFieldUpdaterDemo { 5 public static class Candidate { 6 int id; 7 // 如果直接把int改成atomicinteger, 可能对代码破坏比较大 8 // 因此使用AtomicIntegerFieldUpdater对score进行封装 9 volatile int score; 10 } 11 12 // 通过反射实现 13 public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); 14 // 检查Updater是否工作正确, allScore的结果应该跟score一致 15 public static AtomicInteger allScore = new AtomicInteger(0); 16 17 public static void main(String[] args) throws InterruptedException { 18 final Candidate stu = new Candidate(); 19 Thread[] t = new Thread[10000]; 20 for (int i = 0; i < 10000; i++) { 21 t[i] = new Thread() { 22 public void run() { 23 if (Math.random() > 0.4) { 24 scoreUpdater.incrementAndGet(stu); 25 allScore.incrementAndGet(); 26 } 27 } 28 }; 29 t[i].start(); 30 } 31 for (int i = 0; i < 10000; i++) { 32 t[i].join(); 33 } 34 35 System.out.println("score=" + stu.score); 36 System.out.println("allScore=" + allScore); 37 } 38 } ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] ### 无锁的Vector ### jdk中Vector是加锁的, 网上找的一个无锁Vector LockFreeVector, 给他添加了源码中文注释. 主要关注push\_back, 添加元素的函数 ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] 1 import java.util.AbstractList; 2 import java.util.concurrent.atomic.AtomicReference; 3 import java.util.concurrent.atomic.AtomicReferenceArray; 4 5 /** 6 * It is a thread safe and lock-free vector. 7 * This class implement algorithm from:<br> 8 * 9 * Lock-free Dynamically Resizable Arrays <br> 10 * 11 * @param <E> type of element in the vector 12 * 13 */ 14 public class LockFreeVector<E> extends AbstractList<E> { 15 private static final boolean debug = false; 16 /** 17 * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i]) 18 */ 19 private static final int FIRST_BUCKET_SIZE = 8; 20 21 /** 22 * number of buckets. 30 will allow 8*(2^30-1) elements 23 */ 24 private static final int N_BUCKET = 30; 25 26 /** 27 * We will have at most N_BUCKET number of buckets. And we have 28 * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1) 29 * 30 * 为什么AtomicReferenceArray里再套一个AtomicReferenceArray呢, 类似一个篮子(buckets)里放了很多篮子 31 * 为了在容量扩展时希望尽可能少的改动原有数据, 因此把一维数组扩展成二维数组. 32 * 该二维数组并非均衡的分布. 可能第一个数组8个元素, 第二个数组16个元素, 第三个数组32个...... 33 */ 34 private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets; 35 36 /** 37 * @param <E> 38 */ 39 static class WriteDescriptor<E> { 40 public E oldV; 41 public E newV; 42 public AtomicReferenceArray<E> addr; 43 public int addr_ind; 44 45 /** 46 * Creating a new descriptor. 47 * 48 * @param addr Operation address 对哪个数组进行写 49 * @param addr_ind Index of address 指定index 50 * @param oldV old operand 51 * @param newV new operand 52 */ 53 public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind, 54 E oldV, E newV) { 55 this.addr = addr; 56 this.addr_ind = addr_ind; 57 this.oldV = oldV; 58 this.newV = newV; 59 } 60 61 /** 62 * set newV. 63 */ 64 public void doIt() { 65 // 这边失败后重试的逻辑在另外的代码里. 66 addr.compareAndSet(addr_ind, oldV, newV); 67 } 68 } 69 70 /** 71 * @param <E> 72 */ 73 static class Descriptor<E> { 74 public int size; 75 volatile WriteDescriptor<E> writeop; 76 77 /** 78 * Create a new descriptor. 79 * 80 * @param size Size of the vector 81 * @param writeop Executor write operation 82 */ 83 public Descriptor(int size, WriteDescriptor<E> writeop) { 84 this.size = size; 85 this.writeop = writeop; 86 } 87 88 /** 89 * 90 */ 91 public void completeWrite() { 92 WriteDescriptor<E> tmpOp = writeop; 93 if (tmpOp != null) { 94 tmpOp.doIt(); 95 writeop = null; // this is safe since all write to writeop use 96 // null as r_value. 97 } 98 } 99 } 100 101 private AtomicReference<Descriptor<E>> descriptor; 102 private static final int zeroNumFirst = Integer 103 .numberOfLeadingZeros(FIRST_BUCKET_SIZE); 104 105 /** 106 * Constructor. 107 */ 108 public LockFreeVector() { 109 buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET); 110 buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE)); 111 descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0, 112 null)); 113 } 114 115 /** 116 * add e at the end of vector. 117 * 把元素e加到vector中 118 * 119 * @param e 120 * element added 121 */ 122 public void push_back(E e) { 123 Descriptor<E> desc; 124 Descriptor<E> newd; 125 do { 126 desc = descriptor.get(); 127 desc.completeWrite(); 128 // desc.size Vector 本身的大小 129 // FIRST_BUCKET_SIZE 第一个一维数组的大小 130 int pos = desc.size + FIRST_BUCKET_SIZE; 131 // 取出pos 的前导0 132 int zeroNumPos = Integer.numberOfLeadingZeros(pos); 133 // zeroNumFirst 为FIRST_BUCKET_SIZE 的前导0 134 // bucketInd 数据应该放到哪一个一维数组(篮子)里的 135 int bucketInd = zeroNumFirst - zeroNumPos; 136 // 00000000 00000000 00000000 00001000 第一个篮子满 8 137 // 00000000 00000000 00000000 00011000 第二个篮子满 8 + 16 138 // 00000000 00000000 00000000 00111000 第三个篮子满 8 + 16 + 32 139 // ... bucketInd其实通过前导0相减, 就是为了得出来当前第几个篮子是空的. 140 141 // 判断这个一维数组是否已经启用, 可能是第一次初始化 142 if (buckets.get(bucketInd) == null) { 143 //newLen 一维数组的长度, 取前一个数组长度 * 2 144 int newLen = 2 * buckets.get(bucketInd - 1).length(); 145 // 设置失败也没关系, 只要有人初始化成功就行 146 buckets.compareAndSet(bucketInd, null, 147 new AtomicReferenceArray<E>(newLen)); 148 } 149 150 // 在这个一位数组中,我在哪个位置 151 // 0x80000000是 10000000 00000000 00000000 00000000 152 // 这句话就是把上述111000, 第一个1变成了0, 得到011000, 即新值的位置. 153 int idx = (0x80000000>>>zeroNumPos) ^ pos; 154 // 通过bucketInd与idx来确定元素在二维数组中的位置 155 // 期望写入的时候, 该位置值是null, 如果非null, 说明其他线程已经写了, 则继续循环. 156 newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>( 157 buckets.get(bucketInd), idx, null, e)); 158 // 循环cas设值 159 } while (!descriptor.compareAndSet(desc, newd)); 160 descriptor.get().completeWrite(); 161 } 162 163 /** 164 * Remove the last element in the vector. 165 * 166 * @return element removed 167 */ 168 public E pop_back() { 169 Descriptor<E> desc; 170 Descriptor<E> newd; 171 E elem; 172 do { 173 desc = descriptor.get(); 174 desc.completeWrite(); 175 176 int pos = desc.size + FIRST_BUCKET_SIZE - 1; 177 int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 178 - Integer.numberOfLeadingZeros(pos); 179 int idx = Integer.highestOneBit(pos) ^ pos; 180 elem = buckets.get(bucketInd).get(idx); 181 newd = new Descriptor<E>(desc.size - 1, null); 182 } while (!descriptor.compareAndSet(desc, newd)); 183 184 return elem; 185 } 186 187 /** 188 * Get element with the index. 189 * 190 * @param index 191 * index 192 * @return element with the index 193 */ 194 @Override 195 public E get(int index) { 196 int pos = index + FIRST_BUCKET_SIZE; 197 int zeroNumPos = Integer.numberOfLeadingZeros(pos); 198 int bucketInd = zeroNumFirst - zeroNumPos; 199 int idx = (0x80000000>>>zeroNumPos) ^ pos; 200 return buckets.get(bucketInd).get(idx); 201 } 202 203 /** 204 * Set the element with index to e. 205 * 206 * @param index 207 * index of element to be reset 208 * @param e 209 * element to set 210 */ 211 /** 212 * {@inheritDoc} 213 */ 214 public E set(int index, E e) { 215 int pos = index + FIRST_BUCKET_SIZE; 216 int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 217 - Integer.numberOfLeadingZeros(pos); 218 int idx = Integer.highestOneBit(pos) ^ pos; 219 AtomicReferenceArray<E> bucket = buckets.get(bucketInd); 220 while (true) { 221 E oldV = bucket.get(idx); 222 if (bucket.compareAndSet(idx, oldV, e)) 223 return oldV; 224 } 225 } 226 227 /** 228 * reserve more space. 229 * 230 * @param newSize 231 * new size be reserved 232 */ 233 public void reserve(int newSize) { 234 int size = descriptor.get().size; 235 int pos = size + FIRST_BUCKET_SIZE - 1; 236 int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 237 - Integer.numberOfLeadingZeros(pos); 238 if (i < 1) 239 i = 1; 240 241 int initialSize = buckets.get(i - 1).length(); 242 while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 243 - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) { 244 i++; 245 initialSize *= FIRST_BUCKET_SIZE; 246 buckets.compareAndSet(i, null, new AtomicReferenceArray<E>( 247 initialSize)); 248 } 249 } 250 251 /** 252 * size of vector. 253 * 254 * @return size of vector 255 */ 256 public int size() { 257 return descriptor.get().size; 258 } 259 260 /** 261 * {@inheritDoc} 262 */ 263 @Override 264 public boolean add(E object) { 265 push_back(object); 266 return true; 267 } 268 } [https://www.cnblogs.com/xdecode/p/9022525.html][https_www.cnblogs.com_xdecode_p_9022525.html] ![复制代码][48304ba5e6f9fe08f3fa1abda7d326ab.png] [9beef1d1251f25a156c9bba3bbdecc79.png]: /images/20221120/7b6cff360190445a83de86bedc05b134.png [48304ba5e6f9fe08f3fa1abda7d326ab.png]: /images/20221120/d97e1e9aaa8743cbab830ed44e6fe737.png [https_www.cnblogs.com_xdecode_p_9022525.html]: https://www.cnblogs.com/xdecode/p/9022525.html
相关 应对并发高峰:Java高并发编程实战案例 Java作为一种流行的编程语言,尤其在处理高并发场景时表现出色。以下是一些Java高并发编程的实战案例: 1. **多线程池**: - Redis Cluster: 利 Bertha 。/ 2024年09月10日 09:24/ 0 赞/ 48 阅读
相关 后台应对高并发策略 1、常见的策略 1)服务集群:使用ngnix、F5进行路由和反向代理,服务集群部署,增加实例数量(传统方式)。 2)数据库层:迁移数据库中大量历史数据。 3)数据库层 ╰半夏微凉°/ 2023年06月30日 02:38/ 0 赞/ 18 阅读
相关 高并发应对策略 策略 1、在开发高并发系统时,有很多手段用来保护系统如:缓存、限流、降级 2、缓存的目的是提升系统访问速度和增大系统处理能力,可谓是抗高并发流量的银弹 3、而降 妖狐艹你老母/ 2023年06月10日 04:26/ 0 赞/ 100 阅读
相关 高并发项目应对方案 服务监控 zabbix: https://blog.csdn.net/weixin\_43822878/article/details/91569016 服务器 快来打我*/ 2022年12月26日 08:15/ 0 赞/ 165 阅读
相关 Java无锁应对高并发Atomic详解 问题: java多线程有线程安全问题,因为操作非原子,分为三步:1.读主内存的值 2.在工作内存中操作 3.回写到主内存 解决方案: 操作加锁,在写回主内存前,其他 不念不忘少年蓝@/ 2022年12月23日 02:00/ 0 赞/ 237 阅读
相关 无锁 java 实战高_【实战Java高并发程序设计 4】数组也能无锁:AtomicIntegerArray... 除了提供基本数据类型外,JDK还为我们准备了数组等复合结构。当前可用的原子数组有:AtomicIntegerArray、AtomicLongArray和AtomicRefere 灰太狼/ 2022年11月07日 05:28/ 0 赞/ 182 阅读
相关 详解应对平台高并发的分布式调度框架TBSchedule 【编者按】 TBSchedule是一款非常优秀的高性能分布式调度框架,本文是作者结合多年使用TBSchedule的经验,在研读三遍源码的基础上完成。期间作者也与阿里空玄有过不少 亦凉/ 2022年09月29日 15:54/ 0 赞/ 232 阅读
相关 [高并发Java 四] 无锁 1 无锁类的原理详解 1.1 CAS CAS算法的过程是这样:它包含3个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V 值等于E 桃扇骨/ 2022年07月18日 11:49/ 0 赞/ 280 阅读
相关 java高并发:CAS无锁原理及广泛应用 文章目录 欢迎关注本人公众号 前言 CAS无锁实现原理 为什么要用CAS CAS原理分析 CAS算法 「爱情、让人受尽委屈。」/ 2022年07月13日 02:09/ 0 赞/ 336 阅读
还没有评论,来说两句吧...