CAS的英文为Compare and Swap 翻译为比较并交换。
CAS加volatile关键字是实现并发包的基石。没有CAS就不会有并发包,synchronized是一种独占锁、悲观锁,java.util.concurrent中借助了CAS指令实现了一种区别于synchronized的一种乐观锁。
乐观锁和悲观锁是一种概念和思想:
悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样当第二个线程想拿这个数据的时候,第二个线程会一直堵塞,直到第一个释放锁,他拿到锁后才可以访问。传统的数据库里面就用到了这种锁机制,例如:行锁,表锁,读锁,写锁,都是在操作前先上锁。java中的synchronized的实现也是一种悲观锁。
乐观锁:乐观锁概念为,每次拿数据的时候都认为别的线程不会修改这个数据,所以不会上锁,但是在更新的时候会判断一下在此期间别的线程有没有修改过数据,乐观锁适用于读操作多的场景,这样可以提高程序的吞吐量。在Java中
java.util.concurrent.atomic包下面的原子变量就是使用了乐观锁的一种实现方式CAS实现。
在JDK 5之前Java语言是靠 synchronized 关键字保证同步的,这会导致有锁。锁机制存在以下问题:
Volatile关键字能够在并发条件下,强制将修改后的值刷新到主内存中来保持内存的可见性。通过 CPU内存屏障禁止编译器指令性重排来保证并发操作的有序性
如果多个线程同时操作 Volatile 修饰的变量,也会造成数据的不一致。
public class Test { public volatile int inc = 0; public void increase() { inc++; } public static void main(String[] args) { final Test test = new Test(); for(int i=0;i<10;i++){ new Thread(){ public void run() { for(int j=0;j<1000;j++) test.increase(); }; }.start(); } while(Thread.activeCount()>1) Thread.yield(); System.out.println(test.inc); }}
事实上运行它会发现每次运行结果都不一致,都是一个小于10000的数字。
假如某个时刻变量 inc 的值为10:
之所以出现还是 volatile 只是保证读写具有原子性,但是对于 ++ 操作的复合操作是不存在原子操作的。只能在有限的一些情形下使用 volatile 变量替代锁。要使 volatile 变量提供理想的线程安全,比如:对变量的写操作不依赖于当前值。
volatile 是不错的机制,但是 volatile 不能保证原子性。因此对于同步最终还是要回到锁机制上来。
独占锁是一种悲观锁,synchronized 就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是 CAS。
CAS 操作包含三个操作数 -- 内存位置、预期数值和新值。CAS 的实现逻辑是将内存位置处的数值与预期数值想比较,若相等,则将内存位置处的值替换为新值。若不相等,则不做任何操作。
在 Java 中,Java 并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的。Java 代码需通过 JNI 才能调用。
在JVM中的CAS操作就是基于处理器的CMPXCHG汇编指令实现的,因此,JVM中的CAS的原子性是处理器保障的
CAS 是一条 CPU 的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe 提供的 CAS 方法(如compareAndSwapXXX)底层实现即为 CPU 指令 cmpxchg
对
java.util.concurrent.atomic 包下的原子类 AtomicInteger 中的 compareAndSet 方法进行分析,相关分析如下:
public class AtomicInteger extends Number implements java.io.Serializable { // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { // 计算变量 value 在类对象中的偏移 valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; public final boolean compareAndSet(int expect, int update) { /** * compareAndSet 实际上只是一个壳子,主要的逻辑封装在 Unsafe 的 * compareAndSwapInt 方法中 */ return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } // ......} public final class Unsafe { // compareAndSwapInt 是 native 类型的方法,继续往下看 public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x); // ...... }
public class AtomicInteger extends Number implements java.io.Serializable { // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { // 计算变量 value 在类对象中的偏移 valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; public final boolean compareAndSet(int expect, int update) { /** * compareAndSet 实际上只是一个壳子,主要的逻辑封装在 Unsafe 的 * compareAndSwapInt 方法中 */ return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } // ......} public final class Unsafe { // compareAndSwapInt 是 native 类型的方法,继续往下看 public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x); // ...... }
上面的分析看起来比较多,不过主流程并不复杂。如果不纠结于代码细节,还是比较容易看懂的。接下来,我会分析 Windows 平台下的 Atomic::cmpxchg 函数。继续往下看吧。
// atomic_windows_x86.inline.hpp#define LOCK_IF_MP(mp) __asm cmp mp, 0 / __asm je L0 / __asm _emit 0xF0 / __asm L0: inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange int mp = os::is_MP(); __asm { mov edx, dest mov ecx, exchange_value mov eax, compare_value LOCK_IF_MP(mp) cmpxchg dword ptr [edx], ecx }}
上面的代码由 LOCK_IF_MP 预编译标识符和 cmpxchg 函数组成。为了看到更清楚一些,我们将 cmpxchg 函数中的 LOCK_IF_MP 替换为实际内容。如下:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // 判断是否是多核 CPU int mp = os::is_MP(); __asm { // 将参数值放入寄存器中 mov edx, dest // 注意: dest 是指针类型,这里是把内存地址存入 edx 寄存器中 mov ecx, exchange_value mov eax, compare_value // LOCK_IF_MP cmp mp, 0 /* * 如果 mp = 0,表明是线程运行在单核 CPU 环境下。此时 je 会跳转到 L0 标记处, * 也就是越过 _emit 0xF0 指令,直接执行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令 * 前加 lock 前缀。 */ je L0 /* * 0xF0 是 lock 前缀的机器码,这里没有使用 lock,而是直接使用了机器码的形式。至于这样做的 * 原因可以参考知乎的一个回答: * https://www.zhihu.com/question/50878124/answer/123099923 */ _emit 0xF0L0: /* * 比较并交换。简单解释一下下面这条指令,熟悉汇编的朋友可以略过下面的解释: * cmpxchg: 即“比较并交换”指令 * dword: 全称是 double word,在 x86/x64 体系中,一个 * word = 2 byte,dword = 4 byte = 32 bit * ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元 * [edx]: [...] 表示一个内存单元,edx 是寄存器,dest 指针值存放在 edx 中。 * 那么 [edx] 表示内存地址为 dest 的内存单元 * * 这一条指令的意思就是,将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值 * 进行对比,如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中。 */ cmpxchg dword ptr [edx], ecx }}
到这里 CAS 的实现过程就讲完了,CAS 的实现离不开处理器的支持。如上面源代码所示,程序会根据当前处理器的类型来决定是否为 cmpxchg 指令添加 lock 前缀。如果程序是在多处理器上运行,就为 cmpxchg 指令加上 lock 前缀(lock cmpxchg)。反之,如果程序是在单处理器上运行,就省略 lock 前缀(单处理器自身会维护单处理器内的顺序一致性,不需要 lock 前缀提供的内存屏障效果)。
intel 的手册对 lock 前缀的说明如下:
上面的第 2 点和第 3 点所具有的内存屏障效果,足以同时实现 volatile 读和 volatile 写的内存语义。
经过上面的这些分析,现在我们终于能明白为什么 JDK 文档说 CAS 同时具有 volatile 读和 volatile 写的内存语义了。
Java 的 CAS 会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读 - 改 - 写操作,这是在多处理器中实现同步的关键(从本质上来说,能够支持原子性读 - 改 - 写指令的计算机器,是顺序计算图灵机的异步等价机器,因此任何现代的多处理器都会去支持某种能对内存执行原子性读 - 改 - 写操作的原子指令)。同时,volatile 变量的读 / 写和 CAS 可以实现线程之间的通信。把这些特性整合在一起,就形成了整个 concurrent 包得以实现的基石。如果我们仔细分析 concurrent 包的源代码实现,会发现一个通用化的实现模式:
AQS,非阻塞数据结构和原子变量类(
java.util.concurrent.atomic 包中的类),这些 concurrent 包中的基础类都是使用这种模式来实现的,而 concurrent 包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent 包的实现示意图如下:
JVM中的CAS(堆中对象的分配):
Java 调用 new object() 会创建一个对象,这个对象会被分配到 JVM 的堆中。那么这个对象到底是怎么在堆中保存的呢?
首先,new object() 执行的时候,这个对象需要多大的空间,其实是已经确定的,因为 java 中的各种数据类型,占用多大的空间都是固定的(对其原理不清楚的请自行Google)。那么接下来的工作就是在堆中找出那么一块空间用于存放这个对象。
在单线程的情况下,一般有两种分配策略:
但是JVM不可能一直在单线程状态下运行,那样效率太差了。由于再给一个对象分配内存的时候不是原子性的操作,至少需要以下几步:查找空闲列表、分配内存、修改空闲列表等等,这是不安全的。解决并发时的安全问题也有两种策略:
虚拟机是否使用TLAB,可以通过-XX:+/-UseTLAB参数来进行配置(jdk5及以后的版本默认是启用TLAB的)。
谈到 CAS,基本上都要谈一下 CAS 的 ABA 问题。CAS 由三个步骤组成,分别是“读取-比较-写回”。考虑这样一种情况,线程1和线程2同时执行 CAS 逻辑,两个线程的执行顺序如下:
然后用新值(newValue)写入内存中,完成 CAS 操作
如上流程,线程1并不知道原值已经被修改过了,在它看来并没什么变化,所以它会继续往下执行流程。对于 ABA 问题,通常的处理措施是对每一次 CAS 操作设置版本号。
ABA问题的解决思路其实也很简单,就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A了。
java.util.concurrent.atomic 包下提供了一个可处理 ABA 问题的原子类 AtomicStampedReference,
从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
自旋CAS(不成功,就一直循环执行,直到成功) 如果长时间不成功,会给 CPU 带来非常大的执行开销。如果JVM能支持处理器提供的 pause 指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。
当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i=2,j=a,合并一下 ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。
CAS 与 Synchronized 的使用情景:
补充: synchronized 在 jdk1.6 之后,已经改进优化。synchronized 的底层实现主要依靠 Lock-Free 的队列,基本思路是自旋后阻塞,竞争切换后继续竞争锁,稍微牺牲了公平性,但获得了高吞吐量。在线程冲突较少的情况下,可以获得和 CAS 类似的性能;而线程冲突严重的情况下,性能远高于 CAS。
本文链接:http://www.28at.com/showinfo-26-13497-0.html并发乐观锁CAS原理,吊打问并发的面试官
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
下一篇: 必知必会的22种设计模式(GO语言)