CAS 机制入门
-
需求
- 我们开发一个网站,需要对访问量进行统计,用户每发送一次请求,访问量+1,如何实现?
- 我们模拟有100个人同时访问,并且每个人对咱们的网站发起10次请求,最后总访问次数应该是1000次。
-
怎么实现
public class CAS机制入门 { // 模拟总访问量 public static int count = 0; // 模拟访问方法 public static void request() throws InterruptedException { // 模拟耗时 5ms TimeUnit.MILLISECONDS.sleep(5); count++; } public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(100); // 模拟请求 int threadSize = 100; for (int i = 0; i < threadSize; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { for (int j = 0; j < 10; j++) { request(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } }); thread.start(); } // Q:怎么保证上面的 100 个线程执行完再执行下面的? // A:使用 CountDownLatch 来完成 countDownLatch.await(); long endTime = System.currentTimeMillis(); System.out.println("耗时:" + (endTime - startTime)); System.out.println("count 值:" + count); } } 复制代码
-
结果
-
分析问题
- 首先明确 count++ 不是一个原子操作,在执行引擎中是分为三个步骤来完成的
- 获取 count 的值,记做 A:A = count
- 将 A 值 +1,得到 B:B = A + 1
- 将 B 值赋值给 count
- 当多个线程同时执行 count++ 操作的时候,若其中 A 线程、B 线程同时执行到上面的步骤 1 之后,A 和 B 两个线程拿到的值是一样的,那么他们经过 3 个步骤之后计算出来的结果也是一样的,这样就相当于只是对 count 进行了一次 +1 操作,丢失了一次,导致最终结果的不正确
- 首先明确 count++ 不是一个原子操作,在执行引擎中是分为三个步骤来完成的
-
怎么解决
- 对 count++ 进行操作的时候,让多个线程进行排队处理。即使多个线程同时到达 request 方法,也只允许一个线程进入方法区操作 count,其与线程在外面排队等待
-
怎么实现
- Java 中的 synchronized 关键字
- Java 中的 ReentrantLock
- 上面里两种方式都可以对资源上锁,在多线程的环境下对锁住的资源实现“串行”访问,保证并发的正确性
public class CAS机制入门2 { // 模拟总访问量 public static int count = 0; // 模拟访问方法 public synchronized static void request() throws InterruptedException { // 模拟耗时 5ms TimeUnit.MILLISECONDS.sleep(5); count++; } public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(100); // 模拟请求 int threadSize = 100; for (int i = 0; i < threadSize; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { for (int j = 0; j < 10; j++) { request(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } }); thread.start(); } // Q:怎么保证上面的 100 个线程执行完再执行下面的? // A:使用 CountDownLatch 来完成 countDownLatch.await(); long endTime = System.currentTimeMillis(); System.out.println("耗时:" + (endTime - startTime)); System.out.println("count 值:" + count); } } 复制代码
-
结果
-
分析问题
- 对 request 方法使用了 synchronized 关键字,保证了结果的正确性,但是耗时太长了
- 注意:如果 synchronized 关键字只锁住 count++ 这个操作,既可以保证正确性又可以保证效率,但是为了了解 CAS 的机制,此处我们使用另外一种方式来完成
-
怎么解决
- count++ 步骤:
- 获取 count 的值,记做 A:A = count
- 将 A 值 +1,得到 B:B = A + 1
- 将 B 值赋值给 count
- 只对 count++ 这个操作的第 3 步“上锁”
- 获取锁
- 获取此时 count 的值,记做 LV
- 将此时的值 LV 和 期望值 A 进行比较,如果相等则将最新值 B 值赋值给 count 并返回 true,否则返回 false
- 释放锁
- 那么要实现 count++ 操作的话,只有当返回 true 的时候才能够真正的做到,如果返回 false 则不停的循环去执行 上面的三个步骤直到返回 true
- count++ 步骤:
-
怎么实现
public class CAS机制入门3 { // 模拟总访问量:注意此时需要使用 volatile 修饰,保证多个线程可见 public volatile static int count = 0; // 模拟访问方法 public static void request() throws InterruptedException { // 模拟耗时 5ms TimeUnit.MILLISECONDS.sleep(5); int expectCount = 0; int newCount = 0; do { expectCount = getCount(); newCount = expectCount + 1; }while (!compareAndSwap(expectCount, newCount)); // 只有当返回 false 的时候我们才需要重新获取 count 的值再继续去比较交换 } /** * * @param expectCount 期望的 count 值 * @param newCount 需要给 count 赋的最新值 * @return 成功则返回 true,失败则返回 false */ public static synchronized boolean compareAndSwap(int expectCount, int newCount) { // 判断当前 count 值是否和 expectCount 一致,一致则将最新值赋值给 count if (getCount() == expectCount) { count = newCount; return true; } return false; } public static int getCount() { return count; } public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(100); // 模拟请求 int threadSize = 100; for (int i = 0; i < threadSize; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { for (int j = 0; j < 10; j++) { request(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } }); thread.start(); } // Q:怎么保证上面的 100 个线程执行完再执行下面的? // A:使用 CountDownLatch 来完成 countDownLatch.await(); long endTime = System.currentTimeMillis(); System.out.println("耗时:" + (endTime - startTime)); System.out.println("count 值:" + count); } } 复制代码
CAS 实现原理
- 定义
- CAS:CompareAndSwap
- CAS 操作包含三个操作数——内存位置(V)、期望值(A)和新值(B)。如果内存位置的值与期望值匹配,那么处理器会自动将该位置值更新为新值,否则处理器不作任何操作
- 无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(CAS 在一些特殊情况下仅返回CAS是否成功,而不提取当前值)CAS 有效的说明了“我认为位置 V 应该包含值 A,如果包含该值则将 B 放到这个位置,否则不要更改该位置的值,只告诉我这个位置现在的值即可”
- 怎么使用JDK提供的CAS支持?
- java中提供了对CAS操作的支持,具体在sun.misc.unsafe类中,声明如下:
public final native boolean compareAndSwapObject(Object var1,long var2, object var4,Object var5); public final native boolean compareAndSwapInt(object var1,long var2,int var4,int var5); public final native boplean compareAndSwapLong(Object var1,long var2,long var4,long var6); 复制代码
- 参数分析
- var1:表示要操作的对象
- var2:表示要操作对象中属性地址的偏移量(偏移量是指在对象属性地址相对于对象地址的一个偏移,可以通过对象地址和偏移量获取到对象的属性)
- var4:表示需要修改的数据的期望值
- var5:表示需要修改为的新值
- CAS 实现原理是什么?
- CAS 通过调用 JNI 的代码实现,JNI: java Native Interface,允许 Java 调用其它语言
- compareAndSwapxxx 系列的方法就是借助 “C语言” 来调用 cpu 底层指令实现的
- 以常用的 Intbl x86 平台来说,最终映射到的 cpu 的指令为 “cmpxchg”,这是一个原子指令,cpu 执行此命令时,实现比较并替换的操作!
- CAS 存在的问题
- ABA 问题
- 循环时间长导致CPU执行开销大
- 自旋锁的优点是不需要休眠当前线程,因为自旋锁使用者一般保持锁时间非常短,因此选择自旋而不是休眠当前线程是提高并发性能的关键点,这是因为减少了很多不必要的线程上下文切换开销但是,如果 CAS 一直操作不成功,会造成长时间原地自旋,会给 CPU 带来非常大的执行开销
- 只能保证一个共享变量的原子操作,如果需要保证多个变量的原子性,则需要使用锁
ABA 问题
-
产生条件
- 某个线程需要重复读某个内存地址,并以内存地址的值变化作为该值是否变化的唯一判定依据
- 重复读取的变量会被多线程共享,且存在『值回退』的可能,即值变化后有可能因为某个操作复归原值
- 在多次读取间隔中,开发者没有采取有效的同步手段,比如上锁
-
条件分析
- CAS 本身就要拿新值和旧值做对比,所谓新旧就是两次不同时间点对同一内存地址的读取而已,如果开发者以新旧值对比作为判定值是否变更的唯一依据,那么就满足了条件 1
- 如果该值共享给了多个线程,且存在值回退的可能(线程 B 由 1 改成 2,线程 C由 2 改成 1,造成值回退),则满足了条件 2
- 条件 1 和 2 条件都达成,开发者却并没有在多次读取该内存地址的间隔中使用有效的同步手段,而这就达成了条件 3
-
模拟 ABA 问题
public class ABATest { private static AtomicInteger count = new AtomicInteger(1); public static void main(String[] args) { Thread main = new Thread(new Runnable() { @Override public void run() { System.out.println("操作线程:" + Thread.currentThread().getName() + ",count 值:" + count.get()); try { int expectCount = count.get(); int newCount = expectCount + 1; Thread.sleep(1000); // 主线程休眠 1s,让出 cpu boolean isUpdateSuccess = count.compareAndSet(expectCount, newCount); System.out.println("操作线程:" + Thread.currentThread().getName() + ",是否更新成功:" + isUpdateSuccess); } catch (InterruptedException e) { e.printStackTrace(); } } }, "主线程"); Thread other = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(20); // 保证主线程先执行 count.getAndIncrement(); // a + 1 = 2 System.out.println("操作线程:" + Thread.currentThread().getName() + ",increment >>> count 值:" + count.get()); count.getAndDecrement(); // a - 1 = 1 System.out.println("操作线程:" + Thread.currentThread().getName() + ",decrement >>> count 值:" + count.get()); } catch (InterruptedException e) { e.printStackTrace(); } } }, "干扰线程"); main.start(); other.start(); } } 复制代码
-
模拟结果
-
问题分析
- 实际上 count 值经历了 1 >>> 2 >>> 1 这么个过程,但是主线程并未察觉,依旧是更新成功的
-
结论
- 如果系统对 ABA 问题敏感的话则最好使用 synchronized 或者是 ReentrantLock 来保证并发的正确性,比如银行系统则不会使用 CAS 相关代码
- 如果系统对 ABA 问题不敏感的话则可以使用 Java 为我们提供的另一个类:AtomicStampedReference
ABA 问题解决
-
AtomicStampedReference 主要包含一个对象引用及一个可以自动更新的整数 “stamp” 的 pair 对象来解决 ABA 问题
public class ABATest2 { private static AtomicStampedReference<Integer> count = new AtomicStampedReference(1, 1); // 初始化值 1,初始化版本号 1 public static void main(String[] args) { Thread main = new Thread(new Runnable() { @Override public void run() { System.out.println("操作线程:" + Thread.currentThread().getName() + ",count 值:" + count.getReference()); try { Integer expectReference = count.getReference(); Integer newReference = expectReference + 1; Integer expectStamp = count.getStamp(); Integer newStamp = count.getStamp(); Thread.sleep(1000); // 主线程休眠 1s,让出 cpu boolean isUpdateSuccess = count.compareAndSet(expectReference, newReference, expectStamp, newStamp); System.out.println("操作线程:" + Thread.currentThread().getName() + ",是否更新成功:" + isUpdateSuccess); } catch (InterruptedException e) { e.printStackTrace(); } } }, "主线程"); Thread other = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(20); // 保证主线程先执行 count.compareAndSet(count.getReference(), count.getReference() + 1, count.getStamp(), count.getStamp() + 1); // a + 1 = 2 System.out.println("操作线程:" + Thread.currentThread().getName() + ",increment >>> count 值:" + count.getReference() + ",count 版本号:" + count.getStamp()); count.compareAndSet(count.getReference(), count.getReference() - 1, count.getStamp(), count.getStamp() + 1); // a + 1 = 2 System.out.println("操作线程:" + Thread.currentThread().getName() + ",decrement >>> count 值:" + count.getReference() + ",count 版本号:" + count.getStamp()); } catch (InterruptedException e) { e.printStackTrace(); } } }, "干扰线程"); main.start(); other.start(); } } 复制代码
-
模拟结果
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END