UP | HOME

同步类

Table of Contents

管理状态依赖

状态依赖的类:某些方法只有满足特定的前置条件才能继续

状态依赖操作的模式:

申请锁  
while(前置条件不满足) {  
     释放锁
     重新获取锁  
}  
执行操作  
释放锁  

“有界缓存”工具类

BaseBoundedBuffer没有检查put操作的前置条件(buf未满)和take操作的前置条件(buf不为空) 

public abstract class BaseBoundedBuffer<V> {  
    private final V[] buf;  
    private int tail;  
    private int head;  
    private int count;  

    @SuppressWarnings("unchecked")  
    protected BaseBoundedBuffer(int capacity) {  
        this.buf = (V[]) new Object[capacity];  
    }  

    protected synchronized final void doPut(V v) {  
        buf[tail] = v;  
        if (++tail == buf.length) {  
            tail = 0;  
        }  
        ++count;  
    }  

    protected synchronized final V doTake() {  
        V v = buf[head];  
        buf[head] = null;  
        if (++head == buf.length) {  
            head = 0;  
        }  
        --count;  
        return v;  
    }  

    public synchronized final boolean isFull() {  
        return count == buf.length;  
    }  

    public synchronized final boolean isEmpty() {  
        return count == 0;  
    }
}   

抛出异常

当不满足前置条件的时候,直接抛出异常

public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    public GrumpyBoundedBuffer(int capacity) {
        super(capacity);
    }

    @Override
    public synchronized void put(V v) throws BufferFullException {
        if (isFull()) {
            throw new BufferFullException();
        }
        doPut(v);
    }

    @Override
    public synchronized V take() throws BufferEmptyException {
        if (isEmpty()) {
            throw new BufferEmptyException();
        }
        return doTake();
    }
}

实现起来很简单, 但是很难使用: 调用方需要捕获异常并手动进行重试

while (true) {
    try {
        V item = buffer.take();
        // use item
        break;
    } catch (BufferEmptyException e) {
        Thread.sleep(SLEEP_GRANULARITY);
    }
}

轮询

把重试的逻辑放入SleepyBoundedBuffer

@ThreadSafe
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    private static final long SLEEP_GRANULARITY = 1000L;

    public SleepyBoundedBuffer(int size) {
        super(size);
    }

    public void put(V v) throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isFull()) {
                    doPut(v);
                    return;
                }
            }
            // 释放锁后sleep一段时间再进行重试  
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }

    public V take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isEmpty()) {
                    return doTake();
                }
            }
            // 释放锁后sleep一段时间再进行重试  
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }
}
  1. 代码变得更复杂,让线程沉睡前,必须释放锁,不然会引起死锁的糟糕情况!
  2. sleep的时间不好确定: 如果设定的太短, 将大量消耗CPU资源。如果设定的太长, 则程序的响应性不好
  3. 客户端依然需要处理InterruptedException

条件等待

条件队列:等待某个特殊条件为真的一组线程。

每个Java对象都可以被用作内置锁,同样每个Java对象也可以通过wait,notify/notifyAll方法用作条件队列。

  • wait: 立刻释放锁,阻塞当前线程,让其他线程有机会获得锁,修改内部状态,使得前置条件为真
  • notify/All:唤醒在此对象监视器上等待的单个/所有线程,执行完synchronized代码后释放锁

只有获得内置锁,否则无法去检查前置条件是否为真,同样业务代码应该修改内部状态,否则无法使其他的等待线程的前置条件为真。因此wait,notify/All方法必须运行在同步代码内

public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
    public BoundedBuffer(int capacity) {
        super(capacity);
    }

    public synchronized void put(V v) throws InterruptedException {
        // 当缓冲区已满时将线程挂起, 等待其他线程唤醒  
        // 不给唤醒之后再次判断缓冲区是否已满         
        while (isFull())
            wait();
        doPut(v);
        // 操作完成后唤醒其他线程  
        notifyAll();
    }

    public synchronized V take() throws InterruptedException {
        // 当缓冲区为空时将线程挂起, 等待其他线程唤醒  
        // 被唤醒之后再次判断缓冲区是否为空  
        while (isEmpty())
              wait();
        V v = doTake();
        // 操作完成后唤醒其他线程  
        notifyAll();
        return v;
    }
}

BoundedBuffer具有更好的响应性, 更高的CPU效率,更少的上下文切换。当然生产环境必须提供一个有超时的版本

使用条件队列

条件队列容易构造高效率,快响应的程序,但是同样很容易被错误使用

条件谓词

使某个操作成为状态依赖的前提条件

在条件等待中存在一个重要的三元关系:加锁,wait方法,条件谓词。条件谓词中包含多个状态变量,而状态变量由一个锁来保护,在测试条件谓词前,必须先持有这个锁。锁对象与条件队列对象(调用wait/notfiy/notifyAll的对象)必须是同一个对象。

在BoundBuffer中take方法的条件谓词是:buf数组不为空。状态变量:buf数组。锁是BoundBuffer的内置锁。条件队列同样是这个BoundBuffer对象。首先take方法获得BoundBuffer的内置锁,然后测试条件谓词

  • 如果buf数组不为空,则拿走第一个元素,其实这已经修改了条件变量,之所以可以这样做是因为此时仍然持有BoundBuffer的内置锁
  • 如果buf数组为空,则将在BoundBuffer这个条件队列上调用wait方法, 其前提是已经获得这个对象的内置锁。在测试条件谓词的时候已经获得这个内置锁。wait将立刻释放内置锁,然后阻塞当前线程,直到其他线程唤醒,或者发生中断异常,或者等待超时。

唤醒后,重新与其他线程竞争运行,再次测试前置条件

每一次wait调用都会隐式地与一个条件谓词关联。当调用与某个条件谓词关联的wait方法时候,必须首先持有与条件队列相关的锁,而且这个锁保护着构成条件谓词的状态变量! 

事实上Java语言规范根本没有定义条件谓词,但如果不清楚条件谓词,条件队列将很可能会被错误使用

过早唤醒

唤醒并不意味着条件谓词已经为真,所以必须再次测试条件谓词

       void stateDependentMethod() throws InterruptedException {
           // condition predicate must be guarded by lock
           synchronized(lock) {
               while (!conditionPredicate())
                   lock.wait();
           // object is now in desired state
           // execute some business logic ... 
             //don't release lock too early 
           }
       }

使用条件等待Object.wait/Condition.await:

  • 线程在开始执行前必须通过条件谓词测试
  • 在调用wait前必须测试条件谓词,在从wait返回后必须重新测试条件谓词
  • wait总是在一个循环体内,不能使用if校验一次条件谓词
  • 确保构成条件谓词的状态变量处于条件队列的对象内置锁保护中
  • 在调用wait,notfiy,notifyAll前必须确保获得了条件队列对象的内置锁
  • 在通过条件谓词测试,但没有执行完业务操作前不能释放锁
丢失信号

线程必须等待一个已经为真的条件,但在开始等待前却忘记检查条件谓词

通知

每当在等待一个条件时,必须确保在条件谓词为真时通过某种方式发出通知。使用notfiy而不是notfiyAll通知会导致某些线程无法被唤醒

只有同时满足以下2个条件才可以使用notify,而不是notfiyAll

  1. 所有等待线程的类型相同:只有一个条件谓词与条件队列相关
  2. 单进单出:条件变量的每次通知只能唤醒一个线程

基本上所有情况应该都使用notifyAll

子类的安全问题

要么把条件队列的等待和通知机制文档公开,要么就完全禁止子类化

封装条件队列

使用私有的条件队列及其内置锁,使得客户端无法对条件队列对象进行加锁操作

显示的Condition对象

一个Java内置锁只能关联一个条件队列,同一个条件队列往往会关联不同的条件谓词,很难使用notfiy来唤醒等待相关条件谓词的线程。当有多个条件谓词的时候,使用显示的Condition对象是一种更灵活,易懂的选择

public interface Condition {
    void await() throws InterruptedException;
    boolean await(long time, TimeUnit unit)
        throws InterruptedException;
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    void awaitUninterruptibly();
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}
  • 一个Lock对象可以创建多个Condition对象
  • Condition对象使用await,signal/signalAll方法,而不是Object对象的wait, notfiy/notfiyAll方法
  • Condition对象继承了Lock对象的公平性,如果是公平锁,线程会依照先进先出顺序从Condition.await中释放

使用显示Condition对象实现有界缓存

和使用内置条件队列一样,也必须满足锁,条件谓词,状态变量的三元关系。状态变量必须由Lock对象保护,在检查条件谓词,调用wait,signal/signalAll方法的时候必须持有Lock对象

@ThreadSafe
public class ConditionBoundedBuffer<V> {
    protected final Lock lock = new ReentrantLock();
    private final static int BUFFER_SIZE = 1024;
    // CONDITION PREDICATE: notFull (count < items.length)
    private final Condition notFull = lock.newCondition();
    // CONDITION PREDICATE: notEmpty (count > 0)
    private final Condition notEmpty = lock.newCondition();
    @GuardedBy("lock")
    private final V[] items = (V[]) new Object[BUFFER_SIZE];
    @GuardedBy("lock")
    private int tail, head, count;

    // BLOCKS-UNTIL: notFull
    public void put(V x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[tail] = x;
            if (++tail == items.length) {
                tail = 0;
            }
            ++count;
            //这里可以调用signal方法显示通知等待notEmpty条件的线程,使用内置条件队列只能notifyAll
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    //BLOCKS-UNTIL: notEmpty
    public V take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            V x = items[head];
            items[head] = null;
            if (++head == items.length) {
                head = 0;
            }
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

AQS

ReentrantLock和Semaphore的相似点

这两个同步类接口功能都类似一个阀门:

  • 调用lock或acquire成功返回时通过
  • 调用lock或acquire阻塞时等待
  • 在tryLock或tryAcquire返回假时候,可以取消
  • 支持可中断,不可中断的,限时的获取操作
  • 支持公平,非公平等待队列

使用ReentrantLock实现Semaphore

@ThreadSafe
public class SemaphoreOnLock {
    private final Lock lock = new ReentrantLock();
    //CONDITION PREDICATE: permitsAvailable (permits > 0)
    private final Condition permitsAvailable = lock.newCondition();
    @GuardedBy("lock")
    private int permits;

    SemaphoreOnLock(int initialPermits) {
        lock.lock();
        try {
            permits = initialPermits;
        } finally {
            lock.unlock();
        }
    }

    // BLOCKS-UNTIL: permitsAvailable
    public void acquire() throws InterruptedException {
        lock.lock();
        try {
            while (permits <= 0) {
                permitsAvailable.await();
            }
            --permits;
        } finally {
            lock.unlock();
        }
    }

    public void release() {
        lock.lock();
        try {
            ++permits;
            permitsAvailable.signal();
        } finally {
            lock.unlock();
        }
    }
}

AbstractQueuedSynchronizer

AQS是一个用于构建锁和同步器的框架,解决了实现同步器的很多底层细节问题。事实上不仅Reent- rantLock和Semaphore,还有CountDownLatch,ReentrantReadWriteLock,FutureTask等都是基于AQS实现的

AQS原理

  1. AQS负责管理同步器类中的状态,这个状态被表示成一个整数信息,可以通过getState, setState以及compareAndSet来进行等protected方法来进行操作,当然同步类也可以自行管理额外的状态变量
  2. 获取操作,通常会阻塞线程

    boolean acquire() throws InterruptedException {
        while (当前状态不允许获取操作) {
            if (需要阻塞获取请求) {
                如果当前不在等待队列中,则将它放入当前等待队列;
                阻塞当前线程;
            }
            else {
                返回失败;
            }
             有必要的情况下更新同步器状态;
            如果线程位于等待队列中,将其移除队列; 
            返回成功;
        }
     }
    
  3. 释放操作

    void release() {
        更新同步器状态;
        if (新的状态允许一个或多个被阻塞的线程执行获取操作)
            解除队列中一个或多个线程的阻塞状态;
    }
    

扩展AQS

AQS的acquire/acquireShared, release/releaseShared会调用子类中带try开头的方法

  • 支持独占的同步器需要扩展tryAcquire, tryRelease, isHeldExclusively等方法
  • 支持共享的同步器需要扩展tryAcquireShared, tryReleaseShared等方法

子类通过getState,setState,compareAndSetState等方法来检查和更新状态,并通过try开头的扩展方法的返回值来通知AQS获取和释放同步器操作是否成功

  • tryAcquire/tryAcquireShared: 返回负值表示获取失败,返回0表示通过独占方式被获取,返回正值表示被共享方式被获取
  • tryRelease/tryReleaseShared: 如果释放操作使得其他线程可以执行获取操作,返回true

一次开启闭锁

@ThreadSafe
public class OneShotLatch {
    //default sync state is 0,, it means the latch is closed
    private final Sync sync = new Sync();

    public void signal() {
        sync.releaseShared(0);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }

    private class Sync extends AbstractQueuedSynchronizer {

        protected int tryAcquireShared(int ignored) {
            // Succeed if latch is open (state == 1), else fail
            return (getState() == 1) ? 1 : -1;
        }

        protected boolean tryReleaseShared (int ignored) {
            setState(1); // Latch is now open
            return true; // Other threads may now be able to acquire
        }
    }
}
  • Sync类的状态:0表示关闭,1表示开启
  • OneShotLatch的await方法调用AQS的acquireSharedInterruptibly方法,最终再调用Sync.tryAcquireShared方法
  • OneShotLatch的signal方法调用AQS的releaseShared方法,最终再调用Sync.tryReleaseShared方法
  • OneShotLatch可以通过继承AQS实现,但是这样会导致破坏父类的同步协议的可能性。不应该直接扩展AQS,而是委托给私有类AQS子类实现

java同步包

ReentrantLock

非公平版本的tryAcquire

protected boolean tryAcquire(int ignored) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, 1)) {
            owner = current;
            return true;
        }
    } else if (current == owner) {
        setState(c+1);
        return true;
    }
    return false;
}

同步状态用来保存锁获取操作的次数,并且还维护一个owner变量来标识当前线程所有者来区分是重入还是竞争。当一个线程尝试获取锁的时候,如果锁未被获取(状态为0),将用尝试更新锁的状态来表示已经被占有,由于可能状态被检查后立即更新,所以使用compareAndSetState来更新。如果锁已经被占据,则检查当前线程是否拥有者,如果拥有则设置状态为当前状态+1,反之则返回false表示获取失败

Semaphore & CountDownLatch

Semaphore是共享获取

protected int tryAcquireShared(int acquires) {
    while (true) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0
            || compareAndSetState(available, remaining))
            return remaining;
    }
}
protected boolean tryReleaseShared(int releases) {
    while (true) {
        int p = getState();
        if (compareAndSetState(p, p + releases))
            return true;
    }
}

Semaphore的状态用来表示当前可用许可的数量

  • tryAcquireShared首先检查是否有足够可用的许可,如果没有足够的许可,直接返回负值,如果有足够的许可,尝试更新状态,如果更新成功,则返回0或正值,如果没有更新成功则轮询再次尝试
  • tryReleaseShared尝试更新状态来增加当前可用的许可数量,更新成功直接返回true,更新失败同样轮询重新尝试

CountDownLatch实现类似Semaphore,它的状态用来表示计数值,每次释放的时候状态减去1,当计数值为0时候,可以获取

FutureTask

FutureTask的状态用来表示任务的状态,正在完成,已经完成,已经取消。FutureTask中存在一个指向当前计算线程的引用。当某个特定事件发生,计算线程执行结束,计算线程抛出中断异常等,更新FutureTask的状态,来释放阻塞

ReentrantReadWriteLock

ReentrantReadWriteLock虽然有读锁和写锁,但用一个32位整形变量来保存状态。一个16位用来表示写入锁的计数,另一个16用来表示读取锁的计数。读取锁使用共享的获取释放方法,写入锁使用独占的读取锁和写入锁方法

AQS内部中存在一个CLH队列来表示等待的队列。当ReentrantReadWriteLock锁可用时,如果这个队列的第一个线程是写操作,则独占写入锁,反之在这个队列中第一个写线程之前的所有读线程都将获取读取锁

Up:目录