同步类
Table of Contents
管理状态依赖
状态依赖的类:某些方法只有满足特定的前置条件才能继续
状态依赖操作的模式:
申请锁 while(前置条件不满足) { 释放锁 重新获取锁 } 执行操作 释放锁
“有界缓存”工具类
BaseBoundedBuffer没有检查put操作的前置条件(buf未满)和take操作的前置条件(buf不为空)
public abstract class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head; private int count; @SuppressWarnings("unchecked") protected BaseBoundedBuffer(int capacity) { this.buf = (V[]) new Object[capacity]; } protected synchronized final void doPut(V v) { buf[tail] = v; if (++tail == buf.length) { tail = 0; } ++count; } protected synchronized final V doTake() { V v = buf[head]; buf[head] = null; if (++head == buf.length) { head = 0; } --count; return v; } public synchronized final boolean isFull() { return count == buf.length; } public synchronized final boolean isEmpty() { return count == 0; } }
抛出异常
当不满足前置条件的时候,直接抛出异常
public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> { public GrumpyBoundedBuffer(int capacity) { super(capacity); } @Override public synchronized void put(V v) throws BufferFullException { if (isFull()) { throw new BufferFullException(); } doPut(v); } @Override public synchronized V take() throws BufferEmptyException { if (isEmpty()) { throw new BufferEmptyException(); } return doTake(); } }
实现起来很简单, 但是很难使用: 调用方需要捕获异常并手动进行重试
while (true) { try { V item = buffer.take(); // use item break; } catch (BufferEmptyException e) { Thread.sleep(SLEEP_GRANULARITY); } }
轮询
把重试的逻辑放入SleepyBoundedBuffer
@ThreadSafe public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> { private static final long SLEEP_GRANULARITY = 1000L; public SleepyBoundedBuffer(int size) { super(size); } public void put(V v) throws InterruptedException { while (true) { synchronized (this) { if (!isFull()) { doPut(v); return; } } // 释放锁后sleep一段时间再进行重试 Thread.sleep(SLEEP_GRANULARITY); } } public V take() throws InterruptedException { while (true) { synchronized (this) { if (!isEmpty()) { return doTake(); } } // 释放锁后sleep一段时间再进行重试 Thread.sleep(SLEEP_GRANULARITY); } } }
- 代码变得更复杂,让线程沉睡前,必须释放锁,不然会引起死锁的糟糕情况!
- sleep的时间不好确定: 如果设定的太短, 将大量消耗CPU资源。如果设定的太长, 则程序的响应性不好
- 客户端依然需要处理InterruptedException
条件等待
条件队列:等待某个特殊条件为真的一组线程。
每个Java对象都可以被用作内置锁,同样每个Java对象也可以通过wait,notify/notifyAll方法用作条件队列。
- wait: 立刻释放锁,阻塞当前线程,让其他线程有机会获得锁,修改内部状态,使得前置条件为真
- notify/All:唤醒在此对象监视器上等待的单个/所有线程,执行完synchronized代码后释放锁
只有获得内置锁,否则无法去检查前置条件是否为真,同样业务代码应该修改内部状态,否则无法使其他的等待线程的前置条件为真。因此wait,notify/All方法必须运行在同步代码内
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { public BoundedBuffer(int capacity) { super(capacity); } public synchronized void put(V v) throws InterruptedException { // 当缓冲区已满时将线程挂起, 等待其他线程唤醒 // 不给唤醒之后再次判断缓冲区是否已满 while (isFull()) wait(); doPut(v); // 操作完成后唤醒其他线程 notifyAll(); } public synchronized V take() throws InterruptedException { // 当缓冲区为空时将线程挂起, 等待其他线程唤醒 // 被唤醒之后再次判断缓冲区是否为空 while (isEmpty()) wait(); V v = doTake(); // 操作完成后唤醒其他线程 notifyAll(); return v; } }
BoundedBuffer具有更好的响应性, 更高的CPU效率,更少的上下文切换。当然生产环境必须提供一个有超时的版本
使用条件队列
条件队列容易构造高效率,快响应的程序,但是同样很容易被错误使用
条件谓词
使某个操作成为状态依赖的前提条件
在条件等待中存在一个重要的三元关系:加锁,wait方法,条件谓词。条件谓词中包含多个状态变量,而状态变量由一个锁来保护,在测试条件谓词前,必须先持有这个锁。锁对象与条件队列对象(调用wait/notfiy/notifyAll的对象)必须是同一个对象。
在BoundBuffer中take方法的条件谓词是:buf数组不为空。状态变量:buf数组。锁是BoundBuffer的内置锁。条件队列同样是这个BoundBuffer对象。首先take方法获得BoundBuffer的内置锁,然后测试条件谓词
- 如果buf数组不为空,则拿走第一个元素,其实这已经修改了条件变量,之所以可以这样做是因为此时仍然持有BoundBuffer的内置锁
- 如果buf数组为空,则将在BoundBuffer这个条件队列上调用wait方法, 其前提是已经获得这个对象的内置锁。在测试条件谓词的时候已经获得这个内置锁。wait将立刻释放内置锁,然后阻塞当前线程,直到其他线程唤醒,或者发生中断异常,或者等待超时。
唤醒后,重新与其他线程竞争运行,再次测试前置条件
每一次wait调用都会隐式地与一个条件谓词关联。当调用与某个条件谓词关联的wait方法时候,必须首先持有与条件队列相关的锁,而且这个锁保护着构成条件谓词的状态变量!
事实上Java语言规范根本没有定义条件谓词,但如果不清楚条件谓词,条件队列将很可能会被错误使用
过早唤醒
唤醒并不意味着条件谓词已经为真,所以必须再次测试条件谓词
void stateDependentMethod() throws InterruptedException { // condition predicate must be guarded by lock synchronized(lock) { while (!conditionPredicate()) lock.wait(); // object is now in desired state // execute some business logic ... //don't release lock too early } }
使用条件等待Object.wait/Condition.await:
- 线程在开始执行前必须通过条件谓词测试
- 在调用wait前必须测试条件谓词,在从wait返回后必须重新测试条件谓词
- wait总是在一个循环体内,不能使用if校验一次条件谓词
- 确保构成条件谓词的状态变量处于条件队列的对象内置锁保护中
- 在调用wait,notfiy,notifyAll前必须确保获得了条件队列对象的内置锁
- 在通过条件谓词测试,但没有执行完业务操作前不能释放锁
丢失信号
线程必须等待一个已经为真的条件,但在开始等待前却忘记检查条件谓词
通知
每当在等待一个条件时,必须确保在条件谓词为真时通过某种方式发出通知。使用notfiy而不是notfiyAll通知会导致某些线程无法被唤醒
只有同时满足以下2个条件才可以使用notify,而不是notfiyAll
- 所有等待线程的类型相同:只有一个条件谓词与条件队列相关
- 单进单出:条件变量的每次通知只能唤醒一个线程
基本上所有情况应该都使用notifyAll
子类的安全问题
要么把条件队列的等待和通知机制文档公开,要么就完全禁止子类化
封装条件队列
使用私有的条件队列及其内置锁,使得客户端无法对条件队列对象进行加锁操作
显示的Condition对象
一个Java内置锁只能关联一个条件队列,同一个条件队列往往会关联不同的条件谓词,很难使用notfiy来唤醒等待相关条件谓词的线程。当有多个条件谓词的时候,使用显示的Condition对象是一种更灵活,易懂的选择
public interface Condition { void await() throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException; void awaitUninterruptibly(); boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
- 一个Lock对象可以创建多个Condition对象
- Condition对象使用await,signal/signalAll方法,而不是Object对象的wait, notfiy/notfiyAll方法
- Condition对象继承了Lock对象的公平性,如果是公平锁,线程会依照先进先出顺序从Condition.await中释放
使用显示Condition对象实现有界缓存
和使用内置条件队列一样,也必须满足锁,条件谓词,状态变量的三元关系。状态变量必须由Lock对象保护,在检查条件谓词,调用wait,signal/signalAll方法的时候必须持有Lock对象
@ThreadSafe public class ConditionBoundedBuffer<V> { protected final Lock lock = new ReentrantLock(); private final static int BUFFER_SIZE = 1024; // CONDITION PREDICATE: notFull (count < items.length) private final Condition notFull = lock.newCondition(); // CONDITION PREDICATE: notEmpty (count > 0) private final Condition notEmpty = lock.newCondition(); @GuardedBy("lock") private final V[] items = (V[]) new Object[BUFFER_SIZE]; @GuardedBy("lock") private int tail, head, count; // BLOCKS-UNTIL: notFull public void put(V x) throws InterruptedException { lock.lock(); try { while (count == items.length) { notFull.await(); } items[tail] = x; if (++tail == items.length) { tail = 0; } ++count; //这里可以调用signal方法显示通知等待notEmpty条件的线程,使用内置条件队列只能notifyAll notEmpty.signal(); } finally { lock.unlock(); } } //BLOCKS-UNTIL: notEmpty public V take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } V x = items[head]; items[head] = null; if (++head == items.length) { head = 0; } --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
AQS
ReentrantLock和Semaphore的相似点
这两个同步类接口功能都类似一个阀门:
- 调用lock或acquire成功返回时通过
- 调用lock或acquire阻塞时等待
- 在tryLock或tryAcquire返回假时候,可以取消
- 支持可中断,不可中断的,限时的获取操作
- 支持公平,非公平等待队列
使用ReentrantLock实现Semaphore
@ThreadSafe public class SemaphoreOnLock { private final Lock lock = new ReentrantLock(); //CONDITION PREDICATE: permitsAvailable (permits > 0) private final Condition permitsAvailable = lock.newCondition(); @GuardedBy("lock") private int permits; SemaphoreOnLock(int initialPermits) { lock.lock(); try { permits = initialPermits; } finally { lock.unlock(); } } // BLOCKS-UNTIL: permitsAvailable public void acquire() throws InterruptedException { lock.lock(); try { while (permits <= 0) { permitsAvailable.await(); } --permits; } finally { lock.unlock(); } } public void release() { lock.lock(); try { ++permits; permitsAvailable.signal(); } finally { lock.unlock(); } } }
AbstractQueuedSynchronizer
AQS是一个用于构建锁和同步器的框架,解决了实现同步器的很多底层细节问题。事实上不仅Reent- rantLock和Semaphore,还有CountDownLatch,ReentrantReadWriteLock,FutureTask等都是基于AQS实现的
AQS原理
- AQS负责管理同步器类中的状态,这个状态被表示成一个整数信息,可以通过getState, setState以及compareAndSet来进行等protected方法来进行操作,当然同步类也可以自行管理额外的状态变量
获取操作,通常会阻塞线程
boolean acquire() throws InterruptedException { while (当前状态不允许获取操作) { if (需要阻塞获取请求) { 如果当前不在等待队列中,则将它放入当前等待队列; 阻塞当前线程; } else { 返回失败; } 有必要的情况下更新同步器状态; 如果线程位于等待队列中,将其移除队列; 返回成功; } }
释放操作
void release() { 更新同步器状态; if (新的状态允许一个或多个被阻塞的线程执行获取操作) 解除队列中一个或多个线程的阻塞状态; }
扩展AQS
AQS的acquire/acquireShared, release/releaseShared会调用子类中带try开头的方法
- 支持独占的同步器需要扩展tryAcquire, tryRelease, isHeldExclusively等方法
- 支持共享的同步器需要扩展tryAcquireShared, tryReleaseShared等方法
子类通过getState,setState,compareAndSetState等方法来检查和更新状态,并通过try开头的扩展方法的返回值来通知AQS获取和释放同步器操作是否成功
- tryAcquire/tryAcquireShared: 返回负值表示获取失败,返回0表示通过独占方式被获取,返回正值表示被共享方式被获取
- tryRelease/tryReleaseShared: 如果释放操作使得其他线程可以执行获取操作,返回true
一次开启闭锁
@ThreadSafe public class OneShotLatch { //default sync state is 0,, it means the latch is closed private final Sync sync = new Sync(); public void signal() { sync.releaseShared(0); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(0); } private class Sync extends AbstractQueuedSynchronizer { protected int tryAcquireShared(int ignored) { // Succeed if latch is open (state == 1), else fail return (getState() == 1) ? 1 : -1; } protected boolean tryReleaseShared (int ignored) { setState(1); // Latch is now open return true; // Other threads may now be able to acquire } } }
- Sync类的状态:0表示关闭,1表示开启
- OneShotLatch的await方法调用AQS的acquireSharedInterruptibly方法,最终再调用Sync.tryAcquireShared方法
- OneShotLatch的signal方法调用AQS的releaseShared方法,最终再调用Sync.tryReleaseShared方法
- OneShotLatch可以通过继承AQS实现,但是这样会导致破坏父类的同步协议的可能性。不应该直接扩展AQS,而是委托给私有类AQS子类实现
java同步包
ReentrantLock
非公平版本的tryAcquire
protected boolean tryAcquire(int ignored) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, 1)) { owner = current; return true; } } else if (current == owner) { setState(c+1); return true; } return false; }
同步状态用来保存锁获取操作的次数,并且还维护一个owner变量来标识当前线程所有者来区分是重入还是竞争。当一个线程尝试获取锁的时候,如果锁未被获取(状态为0),将用尝试更新锁的状态来表示已经被占有,由于可能状态被检查后立即更新,所以使用compareAndSetState来更新。如果锁已经被占据,则检查当前线程是否拥有者,如果拥有则设置状态为当前状态+1,反之则返回false表示获取失败
Semaphore & CountDownLatch
Semaphore是共享获取
protected int tryAcquireShared(int acquires) { while (true) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected boolean tryReleaseShared(int releases) { while (true) { int p = getState(); if (compareAndSetState(p, p + releases)) return true; } }
Semaphore的状态用来表示当前可用许可的数量
- tryAcquireShared首先检查是否有足够可用的许可,如果没有足够的许可,直接返回负值,如果有足够的许可,尝试更新状态,如果更新成功,则返回0或正值,如果没有更新成功则轮询再次尝试
- tryReleaseShared尝试更新状态来增加当前可用的许可数量,更新成功直接返回true,更新失败同样轮询重新尝试
CountDownLatch实现类似Semaphore,它的状态用来表示计数值,每次释放的时候状态减去1,当计数值为0时候,可以获取
FutureTask
FutureTask的状态用来表示任务的状态,正在完成,已经完成,已经取消。FutureTask中存在一个指向当前计算线程的引用。当某个特定事件发生,计算线程执行结束,计算线程抛出中断异常等,更新FutureTask的状态,来释放阻塞
ReentrantReadWriteLock
ReentrantReadWriteLock虽然有读锁和写锁,但用一个32位整形变量来保存状态。一个16位用来表示写入锁的计数,另一个16用来表示读取锁的计数。读取锁使用共享的获取释放方法,写入锁使用独占的读取锁和写入锁方法
AQS内部中存在一个CLH队列来表示等待的队列。当ReentrantReadWriteLock锁可用时,如果这个队列的第一个线程是写操作,则独占写入锁,反之在这个队列中第一个写线程之前的所有读线程都将获取读取锁