UP | HOME

同步

Table of Contents

访问共享的可变数据必须同步

同步不仅是为了互斥,更是为了抱证修改的可见性

下面程序可能永远不会停止!

// Broken! - How long would you expect this program to run?
public class BrokenStopThread {

    private static boolean stopRequested = false;

    public static void main(String[] args)
        throws InterruptedException {
        Thread backgroundThread = new Thread(() -> {
                int i = 0;
                while (!stopRequested) {
                    i++;
                }
            });
        backgroundThread.start();
        TimeUnit.SECONDS.sleep(1);
        stopRequested = true;
    }
}

JVM可能会对循环进行优化。在缺少同步的情况下,一个线程修改了变量,其他线程或许永远无法读到修改后的值

while (!done)
    i++;
//JVM优化成
if (!done)
    while (true)
        i++;

修复方法:使用synchronized方法代替变量

// Properly synchronized cooperative thread termination
public class StopThread {

    private static boolean stopRequested = false;

    private static synchronized void requestStop() {
        stopRequested = true;
    }

    private static synchronized boolean stopRequested() {
        return stopRequested;
    }

    public static void main(String[] args)
        throws InterruptedException {
        Thread backgroundThread = new Thread(() -> {
                int i = 0;
                while (!stopRequested()) {
                    i++;
                }
            });
        backgroundThread.start();
        TimeUnit.SECONDS.sleep(1);
        requestStop();
    }
}

更好的解决方法:使用volatile变量

// Cooperative thread termination with a volatile field
public class VolatileStopThread {
    private static volatile boolean stopRequested = false;

    public static void main(String[] args)
        throws InterruptedException {
        Thread backgroundThread = new Thread(() -> {
                int i = 0;
                while (!stopRequested) {
                    i++;
                }
            });
        backgroundThread.start();
        TimeUnit.SECONDS.sleep(1);
        stopRequested = true;
    }
}

然而volatile只保证了可见性,并没有保证互斥性,下面代码尽管在多线程环境下运行不会报错,但是结果很可能是错误的

// Broken - requires synchronization!
private static volatile int nextSerialNumber = 0;
public static int generateSerialNumber() {
    return nextSerialNumber++;
}

除了为generteSerialNumber添加synchronized关键字之外,可以使用Atomic变量代替volatile

private static final AtomicLong nextSerialNum = new AtomicLong();
public static long generateSerialNumber() {
    return nextSerialNum.getAndIncrement();
}

共享总结

  • 为了在线程之间进行可靠地通信,也为了互斥访问,同步是必要的
  • 如果读和写操作没有都被同步,同步就不会起作用,当多个线程共享可变数据的时候,每个读或者写数据的线程都必须执行同步
  • 避免同步最好的办法:使用不可变对象把修改限制在单个线程中,然后通过volatile让所有线程可见
  • volatile保证了可见性,但并没有保证互斥性
  • 安全发布:把变量保存到static成员,变量声明为volatile,变量声明为final,变量保存到同步集合内等

避免过度同步

难以预料的错误

编写Set的wrapper类

public class ForwardingSet<E> implements Set<E> {
    private final Set<E> s;

    public ForwardingSet(Set<E> s) {
        this.s = s;
    }

    @Override
    public int size() {
        return s.size();
    }

    @Override
    public boolean isEmpty() {
        return s.isEmpty();
    }

    @Override
    public boolean contains(Object o) {
        return s.contains(o);
    }

    @Override
    public Iterator<E> iterator() {
        return s.iterator();
    }

    @Override
    public Object[] toArray() {
        return s.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return s.toArray(a);
    }

    @Override
    public boolean add(E e) {
        return s.add(e);
    }

    @Override
    public boolean remove(Object o) {
        return s.remove(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return s.containsAll(c);
    }

    @Override
    public boolean addAll(Collection<? extends E> c) {
        return s.addAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return s.retainAll(c);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return s.removeAll(c);
    }

    @Override
    public void clear() {
        s.clear();
    }

}

定义观察者接口:当元素被添加时候,触发这个接口实现类中的added方法

public interface SetObserver<E> {
    // Invoked when an element is added to the observable set
    void added(
        BrokenObservableSet<E> set, E element);
}

扩展ForwardingSet:添加观察者接口列表,并在添加元素的时候触发这个观察者的added方法

public class BrokenObservableSet<E> extends ForwardingSet<E>{

    public BrokenObservableSet(Set<E> set) {
        super(set);
    }

    private final List<SetObserver<E>> observers =
        new ArrayList<>();

    public void addObserver(SetObserver<E> observer) {
        synchronized (observers) {
            observers.add(observer);
        }
    }

    public boolean removeObserver(SetObserver<E> observer) {
        synchronized (observers) {
            return observers.remove(observer);
        }
    }

    private void notifyElementAdded(E element) {
        synchronized (observers) {
//在同步代码块中调用的是客户化的方法,这会导致意外的错误!!!
            for (SetObserver<E> observer : observers) {
                observer.added(this, element);
            }
        }
    }

    @Override
    public boolean add(E element) {
        boolean added = super.add(element);
        if (added) {
            notifyElementAdded(element);
        }
        return added;
    }

    @Override
    public boolean addAll(Collection<? extends E> c) {
        boolean result = false;
        // calls notifyElementAdded
        for (E element : c) {
            result |= add(element); 
        }
        return result;
    }
}

添加元素到BrokenObservableSet的时候,会触发遍历observers列表,而一旦同时删除这个列表的某个元素,后果抛出ConcurrentModificationException

public static void main(String[] args) {
    BrokenObservableSet<Integer> set
        = new BrokenObservableSet<>(new HashSet<>());
//        set.addObserver(new SetObserver<Integer>() {
//            @Override
//            public void added(BrokenObservableSet<Integer> s, Integer e) {
//                System.out.println(e);
//            }
//        });

    set.addObserver(new SetObserver<Integer>() {
            @Override
            public void added(BrokenObservableSet<Integer> s, Integer e) {
                System.out.println(e);
                if (e == 23) {
                    s.removeObserver(this);
                }
            }
        });

    for (int i = 0; i < 100; i++) {
        set.add(i);
    }
}

下面代码比抛出异常错误更严重:新启动的线程会要求对observers加锁,而BrokenObservableSet已经获得这个列表的锁,同时BrokenObservableSet还在等待新启动线程返回,这就导致了死锁!

// Observer that uses a background thread needlessly
set.addObserver(new SetObserver<Integer>() {
        public void added(final ObservableSet<Integer> s, Integer e) {
            System.out.println(e);
            if (e == 23) {
                ExecutorService executor =
                    Executors.newSingleThreadExecutor();
                final SetObserver<Integer> observer = this;
                try {
                    executor.submit(new Runnable() {
                            public void run() {
                                s.removeObserver(observer);
                            }
                        }).get();
                } catch (ExecutionException ex) {
                    throw new AssertionError(ex.getCause());
                } catch (InterruptedException ex) {
                    throw new AssertionError(ex.getCause());
                } finally {
                    executor.shutdown();
                }
            }
        }
    });

解决方法:同步代码只是对观察者列表进行拷贝,客户端代码移除到同步块之外

// Alien method moved outside of synchronized block - open calls
private void notifyElementAdded(E element) {
    List<SetObserver<E>> snapshot = null;
    synchronized(observers) {
        snapshot = new ArrayList<SetObserver<E>>(observers);
    }

    snapshot.forEach((observer) -> {
            observer.added(this, element);
        });
}

更好的解决办法:使用同步列表CopyOnWriteArrayList

private final List<SetObserver<E>> observers = new CopyOnWriteArrayList<>();

public void addObserver(SetObserver<E> observer) {
    observers.add(observer);
}

public boolean removeObserver(SetObserver<E> observer) {
    return observers.remove(observer);
}

private void notifyElementAdded(E element) {
    observers.forEach((observer) -> {
            observer.added(this, element);
        });
}

总之:在一个被同步的方法或者代码块中,永远不要调用客户端允许覆盖的方法或者使用一个客户端可以继承的对象

效率低下

  • 设计可变类时候,要考虑是否会被多线程环境使用。如果是的化,则在内部实现同步,尽量禁止客户端修改同步机制。反之在文档中明确写清楚并不是线程的,让使用者去做额外同步
  • 对于某个类的static成员,如果可能在多线程环境中被修改,必须考虑同步

并发工具优先于wait和notify

并发工具

  1. Executor框架:Executor和Task优先于线程
  2. 并发集合:并发集合中不可能排除并发活动;对它加锁没有什么作用,只会使程序速度变慢
  3. 同步器

对于间歇式的定时,始终应该优先使用System.nanoTime而不是System.currentTimeMills,System.nanoTime更加准确也更加精确,不受系统的实时时钟的调整所影响

即使使用wait, 也应该使用notifyAll,而不是notify

线程安全性需要文档化

JavaDoc并没有在输出中包含synchronized修饰符信息,因为这个属于实现的细节,而不是接口的一部分。事实上syncrhonized并不能完全等同于线程安全

线程安全级别

  • 不可变:类的实例是不变的,无须客户端任何同步
  • 无条件的线程安全:虽然类的实例是可变的,但有着足够的内部同步。例如ConcurrentHashMap的实例可以安全地并发使用
  • 有条件的线程安全:某些方法为进行安全的并发使用必须要额外地进行外部同步
  • 非线程安全:客户必须自己外部同步包围每个方法调用,例如ArrayList和HashMap
  • 线程对立:即使所有的方法调用都被外部同步包围也不能安全地被多个线程并发使用。通常其根源在于无法同步地修改静态成员数据

其中有条件的线程安全必须清楚说明哪些操作顺序需要加锁,以及对哪个对象进行加锁。这时候不仅要对方法注释线程安全,必要时还需要对成员进行注释。最后必须清楚指出在继承的情况下如何可以不破坏父类的线程安全约束 

私有成员加锁

使用public对象(往往是对象本身)同步锁,会有意或无意地引发DDOS拒绝服务攻击。为了避免这个缺陷,有时候应该使用私有成员加锁

// Private lock object idiom - thwarts denial-of-service attack
private final Object lock = new Object();
public void foo() {
    synchronized(lock) {
    //...
    }
}

lock对象如果不申明为final会导致灾难性的后果,这意味着实际上私有成员加锁就是无条件线程安全,客户端不能再做任何同步。私有成员加锁尤其适合面向继承的设计

线程安全文档的总结

每个类都应该精确地在文档中说明线程安全属性,请利用@Immuable, @ThreadSafe, @NotThreadSafe来指出线程安全级别

谨慎使用延迟初始化

延迟初始化:真正被使用的时候才开始初始化

这是为了权衡初始化和访问之间的开销。只有当某个成员实例初始化非常昂贵,但这个成员实例可能不是每次都必须被用到的情况下才值得。

如非必要请不要使用延迟初始化,因为延迟初始化的这个成员需要同步! 

正常初始化

初始化开销不值一提的时候

// Normal initialization of an instance field
private final FieldType field1 = computeFieldValue();

实例成员延迟初始化

使用synchronized修饰符为实例成员进行延迟初始化

// Lazy initialization of instance field - synchronized accessor
private FieldType field2;

public synchronized FieldType getField2() {
    if (field2 == null) {
        field2 = computeFieldValue();
    }
    return field2;
                  }

降低了初始化开销,但最大程度增加了访问时开销

双重检查优化实例成员延迟访问的开销

引入局部变量result, 第一次检查并不加锁,第二次检查才加锁,这避免了初始化后的访问还需要同步的开销

// Double-check idiom for lazy initialization of instance fields
private volatile FieldType field4;

public FieldType getField4() {
    FieldType result = field4;
    if (result == null) { // First check (no locking)
        synchronized (this) {
            result = field4;
            if (result == null) { // Second check (with locking)
                field4 = result = computeFieldValue();
            }
        }
    }
    return result;
}

注意:被延迟初始化的实例变量必须被声明为volatile,这是为了保证多线程下的可见性!

静态成员延迟初始化

虽然对于静态成员延迟初始化只需要static修饰符号,但更好地方式是使用class holder

private static class FieldHolder {

    private static final FieldType FIELD = computeFieldValue();
}

// Lazy initialization holder class idiom for static fields
public static FieldType getField3() {
    return FieldHolder.FIELD;
}

只有首次读取FieldHolder.FIELD,才会导致私有静态类FieldHolder被初始化。现在的JVM会保证只有当某个类的成员被使用才会被初始化,而在初始化的过程中会保证成员的线程安全。这优雅地避免了额外同步

延迟初始化总结

谨慎使用延迟初始化,如果必须实例成员使用double check方式,静态成员使用holder class方式

不要依赖于线程调度器

  • 任何依赖于线程调度器,线程优先级,以及Thread.yield来达到正确性或者性能要求的程序都是不可移植的,因为不同操作系统的JVM对其实现也不相同
  • 使用Thread.sleep(1)代替Thread.yield
  • 不要使用忙等待,这样的代码会导致其他线程无法获得CPU使用权,而不得不依赖线程调度器

永远不要使用线程组

线程组已经过时,永远不要使用线程组

Next:方法

Previous:异常处理

Home:目录