并发编程相关

synchronized:同步锁

synchronized是一种悲观锁,它假设最坏的情况,并且只有在确保其它线程不会造成干扰的情况下执行,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁

synchronized同步锁的应用场景

  • 实例方法,锁住的是该类的实例对象

当一个线程访问一个对象实例的同步方法A时,其他线程对该对象实例中所有其它同步方法的访问将被阻塞。因为第一个线程已经获得了对象锁,其他线程得不到锁,则虽然是访问不同的方法,但是没有获得锁,也无法访问

1
2
3
public synchronized void method(){
...
}
  • 静态方法,锁住的是类对象

当一个线程访问一个类的静态同步方法A时,其他线程对该类中所有其它静态同步方法的访问将被阻塞。因为第一个线程已经获得了类锁,其他线程得不到锁

1
2
3
public static synchronized void method(){
...
}
  • 同步代码块,锁住的是该类的实例对象

当一个线程访问一个对象实例的synchronized(this)同步代码块时,它就获得了这个对象实例的对象锁。其它线程对该对象实例的所有同步代码部分的访问都被暂时阻塞

使用同步代码块来替代同步方法减少同步的时间

1
2
3
synchronized(this){
...
}
  • 同步代码块,锁住的是该类的类对象

当一个线程访问一个对象实例的synchronized(XX.class)同步代码块时,它就获得了这个类的类锁。其它线程对该类的所有同步代码部分的访问都被暂时阻塞

1
2
3
synchronized(Issue.class){
...
}
  • 同步代码块,锁住的是配置的实例对象
1
2
3
4
String str = 'test';
synchronized(str){
...
}

CAS:Compare and Swap

CAS操作是一种乐观锁,悲观锁由于在进程挂起和恢复执行过程中存在着很大的开销,因此产生乐观锁,乐观锁每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止

总结:在线程冲突较少的情况下,CAS的性能比synchronized好;而线程冲突严重的情况下,synchronized性能远高于CAS

Atomic包

Java从JDK1.5开始提供了java.util.concurrent.atomic包,方便程序员在多线程环境下,无锁的进行原子操作。原子变量的底层使用了处理器提供的原子指令,但是不同的CPU架构可能提供的原子指令不一样,也有可能需要某种形式的内部锁,所以该方法不能绝对保证线程不被阻塞。底层也使用到了volatile

AtomicInteger:原子更新整型
AtomicIntegerArray:原子更新整型数组里的元素
AtomicReference:原子更新引用类型
AtomicReferenceFieldUpdater:原子更新引用类型里的字段
等...
  • AtomicInteger源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Unsafe类提供了硬件级别的原子操作,native本地方法
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();

/**
* 用来存储当前实例中value字段的内存地址
*/
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException e) {
throw new IllegalArgumentException(e.getMessage());
}
}

private volatile int value;

/**
* 当执行增加操作时,采用cas循环,每次执行+1时,都会进入到unsafe.compareAndSwapInt
* 方法中进行比较,若A线程获取到current后,B线程已经+1,此时A线程的current就是旧的值,
* 则需要循环获取新的current,以此来保证每个线程的+1都会正常执行
*/
public final int getAndIncrement() {
for (; ; ) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) {
return current;
}
}
}

/**
* 调用unsafe的native方法,根据value的内存地址直接进行比较与修改
*/
public final Boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
  • CAS的缺陷:ABA问题

因为CAS是基于内存共享机制实现的,比如在AtomicInteger类中使用了关键字volatile修饰的属性: private volatile int value;

线程1在共享变量中读到值为A
线程1被抢占了,线程2执行
线程2把共享变量里的值从A改成了B,再改回到A,此时被线程1抢占。
线程1回来看到共享变量里的值没有被改变,于是继续执行。
虽然线程t1以为变量值没有改变,继续执行了,但是这个过程中(即A的值被t2改变期间)会引发一些潜在的问题

因为CAS判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。(地址被重用是很经常发生的,一个内存分配后释放了,再分配,很有可能还是原来的地址)

简单解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号,即使这个值由A变为B,然后为变为A,版本号也是不同的。AtomicStampedReference和AtomicMarkableReference支持在两个变量上执行原子的条件更新。AtomicStampedReference更新一个“对象-引用”二元组,通过在引用上加上“版本号”,从而避免ABA问题

AQS:AbstractQueuedSynchronizer(抽象队列同步器

AQS 是很多同步器的基础框架,比如 ReentrantLock(可重入锁)、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的,AQS最底层也是通过CAS线程自旋实现的

实现原理:

在 AQS 内部,通过维护一个FIFO 队列来管理多线程的排队工作。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。当头结点释放同步状态后,且后继节点对应的线程被阻塞,此时头结点线程将会去唤醒后继节点线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将自己设为头结点

节点的等待状态:waitStatus


waitStatus 说明
CANCELLED(1) 当前线程因为超时或者中断被取消。这是一个终结态,也就是状态到此为止
SIGNAL(-1) 当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程,这个状态一般都是后继线程来设置前驱节点的
CONDITION(-2) 当前线程在condition队列中
PROPAGATE(-3) 用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的
0 表示无状态,后续节点正在运行中

需要由同步组件覆写的方法

  • tryAcquire:尝试获取同步状态

返回true获取成功,返回false获取失败

  • tryRelease:尝试释放同步状态

返回true释放成功,返回false释放失败

  • tryAcquireShared:尝试获取共享同步状态

返回结果小于0获取失败,大于等于0获取成功

  • tryReleaseShared:尝试释放共享同步状态

返回true释放成功,返回false释放失败

独占模式:获取锁

  • 调用acquire,内部调用tryAcquire(自定义覆盖实现的)方法尝试获取同步状态

  • -获取成功,直接返回

  • -获取失败,调用addWaiter(传入Node.EXCLUSIVE)将线程封装到节点中并将节点入队,再调用acquireQueued

  • 入队节点在acquireQueued方法中自旋获取同步状态

  • 若节点的前驱节点是头节点,则再次调用tryAcquire尝试获取同步状态

  • -获取成功,当前节点将自己设为头节点并返回

  • -获取失败,进入下一步判断

  • 获取同步状态失败,则会调用shouldParkAfterFailedAcquire判断是否应该阻塞自己,如果不阻塞,CPU就会处于忙等状态,这样会浪费CPU资源

  • shouldParkAfterFailedAcquire中会根据前驱节点的waitStatus的值,决定后续的动作

  • -前驱节点等待状态为SIGNAL(-1),表示当前线程应该被阻塞,调用parkAndCheckInterrupt中的LockSupport.park(this)阻塞自己,线程阻塞后,会在前驱节点释放同步状态后被前驱节点线程唤醒

  • -前驱节点等待状态为CANCELLED(1),则以前驱节点为起点向前遍历,移除其他等待状态为 CANCELLED 的节点,继续自旋获取同步状态

  • -否则前驱节点等待状态为0 或 PROPAGATE(-3),则设置前驱节点为 SIGNAL,继续自旋获取同步状态

  • 如果在获取同步状态中出现异常(tryAcquire 需同步组件开发者覆写,难免不了会出现异常),则会调用cancelAcquire取消获取同步状态

  • cancelAcquire中,设置当前节点为 CANCELLED,继续唤醒后续节点unparkSuccessor(node)

  • unparkSuccessor中通过CAS将当前节点设置为 0,以便后续节点多一次尝试获取同步状态的机会,唤醒后续节点线程LockSupport.unpark(s.thread)

List的架构图

独占模式:释放锁

  • 调用release,内部调用tryRelease(自定义覆盖实现的)方法尝试释放同步状态

  • -获取失败,返回false

  • -获取成功,判断当前队列中头节点的值,进行相应操作

  • head=null,还未初始化完,不需要唤醒

  • head!=null && waitStatus=0,表示后续节点还在运行中,不需要唤醒

  • head!=null && waitStatus<0,表示后续线程可能被阻塞,需要唤醒,调用unparkSuccessor(h)

共享模式:获取共享状态

与独占模式不同,共享模式下,同一时刻会有多个线程获取共享同步状态。共享模式是实现读写锁中的读锁、CountDownLatch 和 Semaphore 等同步组件的基础

  • 调用acquireShared,内部调用tryAcquireShared(自定义覆盖实现的)方法尝试获取共享同步状态

  • -获取成功,直接返回

  • -获取失败,进入doAcquireShared,首先调用addWaiter(传入Node.SHARED)将线程封装到节点中并将节点入队

  • 入队节点在doAcquireShared方法中自旋获取同步状态

  • 若节点的前驱节点是头节点,则再次调用tryAcquireShared尝试获取共享同步状态(如果头节点是 EXCLUSIVE,线程无法获取共享同步状态。如果是 SHARED,线程则可获取共享同步状态)

  • -获取成功,调用setHeadAndPropagate,将自己设为头节点

  • –如果propagate>0,并且旧的头节点的waitStatus < 0 即 waitStatus = SIGNAL 或 PROPAGATE 时,并且当前节点的后续节点是共享节点,则调用doReleaseShared继续唤醒

  • doReleaseShared,如果 head 节点等待状态为 SIGNAL,则将 head 节点状态设为 0,并调用unparkSuccessor唤醒后继节点,如果 head 节点等待状态为 0,则将 head 节点状态设为 PROPAGATE,保证唤醒能够正常传播下去

  • –否则不唤醒后续节点

  • -获取失败,进入下一步判断

  • 获取同步状态失败,则会调用shouldParkAfterFailedAcquire判断是否应该阻塞自己,如果不阻塞,CPU就会处于忙等状态,这样会浪费CPU资源

  • shouldParkAfterFailedAcquire中会根据前驱节点的waitStatus的值,决定后续的动作

  • -前驱节点等待状态为SIGNAL(-1),表示当前线程应该被阻塞,调用parkAndCheckInterrupt中的LockSupport.park(this)阻塞自己,线程阻塞后,会在前驱节点释放同步状态后被前驱节点线程唤醒

  • -前驱节点等待状态为CANCELLED(1),则以前驱节点为起点向前遍历,移除其他等待状态为 CANCELLED 的节点,继续自旋获取同步状态

  • -否则前驱节点等待状态为0 或 PROPAGATE(-3),则设置前驱节点为 SIGNAL,继续自旋获取同步状态

  • 如果在获取同步状态中出现异常(tryAcquire 需同步组件开发者覆写,难免不了会出现异常),则会调用cancelAcquire取消获取同步状态

  • cancelAcquire中,设置当前节点为 CANCELLED,继续唤醒后续节点unparkSuccessor(node)

  • unparkSuccessor中通过CAS将当前节点设置为 0,以便后续节点多一次尝试获取同步状态的机会,唤醒后续节点线程LockSupport.unpark(s.thread)

总结

共享模式获取同步状态时,调用acquireShared之后,内部会调用tryAcquireShared获取同步状态,并调用doReleaseShared唤醒后续节点(必须是共享节点),后续线程会继续执行tryAcquireShared,而设置等待状态为PROPAGATE是为了让线程继续传播下去

共享模式:释放共享状态

  • 调用releaseShared,内部调用tryReleaseShared(自定义覆盖实现的)方法尝试释放同步状态

  • -释放失败,返回false

  • -释放成功,调用doReleaseShared

  • doReleaseShared,如果 head 节点等待状态为 SIGNAL,则将 head 节点状态设为 0,并调用unparkSuccessor唤醒后继节点,如果 head 节点等待状态为 0,则将 head 节点状态设为 PROPAGATE,保证唤醒能够正常传播下去

总结

当某个线程被唤醒后,在doAcquireShared方法中会循环继续调用tryAcquireShared尝试获取共享同步状态,成功获取同步状态后,会向后传播唤醒为共享模式的后续节点的线程,以此达到共享锁的实现

PROPAGATE

问题一:PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的?

答:PROPAGATE 状态用在 setHeadAndPropagate。当头节点状态被设为 PROPAGATE 后,后继节点成为新的头结点后。若 propagate > 0 条件不成立,则根据条件h.waitStatus < 0成立与否,来决定是否唤醒后继节点,即向后传播唤醒动作。

问题二:引入 PROPAGATE 状态是为了解决什么问题?

答:引入 PROPAGATE 状态是为了解决并发释放信号量所导致部分请求信号量的线程无法被唤醒的问题

CountDownLatch

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行任务

  • 核心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected int tryAcquireShared(int acquires) {
/*
* 获取共享同步状态,若同步状态为0,则获取成功,否则获取失败
*/
return (getState() == 0) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int releases) {
/*
* 释放共享同步状态,若为0释放失败,否则通过CAS-1,当减到0时,释放成功
*/
for (; ; ) {
int c = getState();
if (c == 0) {
return false;
}
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
  • 创建共享同步锁
1
final CountDownLatch latch = new CountDownLatch(10);
  • 生成多个线程去调用tryReleaseShared,从而减少同步状态的值,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for(int i=0; i< 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getId() + "准备减少同步状态");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getId() + "成功减少同步状态");
latch.countDown();
}
}).start();
}
  • 主线程获取同步状态
1
2
3
4
5
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
  • 原理

主线程调用await(),await()内部调用tryAcquireShared()获取共享同步状态,一开始线程获取时无法获取同步状态而阻塞线程,当其他线程调用countDown()减少同步状态至0时成功释放同步状态后,唤醒阻塞的主线程,从而达到能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行的功能,这里用到的是AQS的共享模式,所以后续阻塞的共享模式节点线程都会被唤醒,达到同时唤醒多个线程的功能

ReentrantLock(可重入锁)

ReentrantLock并不是一种替代内置加锁的方法,而是作为一种可选择的高级功能。
相比于synchronizedReentrantLock在功能上更加丰富,它具有可重入可中断可限时公平锁等特点

在jdk1.5里面,`ReentrantLock`的性能是明显优于`synchronized`的,但是在jdk1.6里面,synchronized做了优化,他们之间的性能差别已经**不明显**了
  • 可重入

由于ReentrantLock是重入锁,所以可以反复得到相同的一把锁,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放(重入锁)

  • 可中断

synchronized不同的是,ReentrantLock对中断是有响应的,synchronized一旦尝试获取锁就会一直等待直到获取到锁

  • 可限时

超时不能获得锁,就返回false,不会永久等待构成死锁;使用lock.tryLock(long timeout, TimeUnit unit)来实现可限时锁,参数为时间和单位

  • 公平锁

一般意义上的锁是不公平的,不一定先来的线程能先得到锁,后来的线程就后得到锁。不公平的锁可能会产生饥饿现象

公平锁的意思就是,这个锁能保证线程是先来的先得到锁。虽然公平锁不会产生饥饿现象,但是公平锁的性能会比非公平锁差很多

Lock锁与AQS的关系

  • Lock 面向使用者,定义了使用者与锁交互的接口

  • AQS 面向锁的实现者,简化了锁的实现,屏蔽了同步状态的管理、线程排队、线程等待与唤醒等底层操作

concurrent包的实现:底层都是使用volatile

如果我们仔细分析 concurrent 包的源代码实现,会发现一个通用化的实现模式:

首先,声明共享变量为 volatile;
然后,使用 CAS 的原子条件更新来实现线程之间的同步;
同时,配合以 volatile 的读/写和 CAS 所具有的 volatile 读和写的内存语义来实现线程之间的通信。

AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些 concurrent包中的基础类都是使用这种模式来实现的,而concurrent包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent包的实现示意图如下:

image

参考文献