构建模块
Table of Contents
同步集合类
java提供了多种 synchronized 集合:
- Vector , HashTable
- Collections 的 synchronizedXxx 方法的返回值.
这些集合封装了状态变量,对每个 public 方法添加 synchronized 关键字
同步集合类的问题
@NotThreadSafe public static Object getLast(Vector list) { int lastIndex = list.size() - 1; return list.get(lastIndex); } public static void deleteLast(Vector list) { int lastIndex = list.size() - 1; list.remove(lastIndex); }
假设Vector对象中含有10个元素, 多线程环境下可能出现这样的场景: 1. 线程1调用getLast方法, 计算得知 lastIndex 为9 2. 线程1失去CPU使用权 3. 线程2调用deleteLast方法, 其lastIndex也为9 4. 线程2删除了第9个元素 5. 线程1重新获得CPU时间, 线程1会试图获取第9个元素, 但是该元素已经被线程2删除了, 此时将抛出ArrayIndexOutOfBoundsException异常
对于 列表的复合操作 仍然需要 额外的同步 保证原子性
ConcurrentModificationException
- 迭代集合时, 每个集合内部都拥有一个名为 modCount 的成员变量, 如果集合发生了变化, 就会 更改modCount的值 .
- 使用 Iterator 开始迭代时,会将modCount的赋值给 expectedModCount
- 在迭代过程中, 通过 每次比较两者是否相等 来判断集合 是否在内部或被其它线程修改
如果 expectedModCount和modCount不相等 , 将抛出 ConcurrentModificationException 异常
Iterator<Integer> it = list.iterator(); while(it.hasNext()) { System.out.println(it.next()); // remove操作会导致modCount的值被修改, 从而引发ConcurrentModificationException异常 list.remove(0); }
- 可以使用 并发集合类
- 先同步的 clone一份集合 然后对 clone集合进行迭代 , clone的过程仍然需要同步
ArrayList<Integer> listClone = null; // clone时依然需要同步 synchronized (list) { listClone = (ArrayList<Integer>) list.clone(); } Iterator<Integer> it = listClone.iterator(); while (it.hasNext()) { doSomething(it.next()); }
并发集合类
- ConcurrentHashMap 代替散列Map
- CopyOnWriteArrayList 代替同步List
- CopyOnWriteArraySet 代替同步Set
- ConcurrentSkipListMap 代替同步SortMap
- ConcurrentSkipListSet 代替同步SortSet
ConcurrentHashMap
- ConcurrentHashMap具有 更好的并发性能 . ConcurrentHashMap是 线程安全 的, 但是其同步策略和SynchronizedMap有很大不同
- ConcurrentHashMap在 read 时 几乎不用加锁
- write 时使用的是 细粒度的分段锁 ,甚至可以做到 并发write
由于ConcurrentHashMap的 分段加锁机制 , 使用时 调用方无法再自行加锁
- 因此ConcurrentHashMap类提供了一些常见的 复合操作
// 只有key不是集合中的键时才插入该键值对 V putIfAbsent(K key, V value); // 集合中存在该键值对时才删除 boolean remove(K key, V value); // 只有key和oldValue是集合中的键值对是才进行替换 boolean replace(K key, V oldValue, V newValue); // 只有key是集合中的key时才进行替换 V replace(K key, V newValue);
迭代时不需要调用方进行额外的同步. ConcurrentHashMap使用的迭代器被称为weakly consistent(弱一致)迭代器, 迭代开始后, 如果其他线程删除了ConcurrentHashMap集合的某个元素, 且被删除的元素尚未由next方法返回, 则该元素就不会被迭代器返回给调用方. 如果迭代开始后其他线程往ConcurrentHashMap集合中插入了新的元素, 那么新的元素可能会也可能不会被返回给调用方. 无论如何, 弱一致迭代器都保证不会将同一个元素多次返回给调用方,弱一致迭代器也不会在迭代期间抛出ConcurrentModificationException异常 调用ConcurrentHashMap对象的size, isEmpty等方法时(这些方法是针对整体Map的操作), 性能比较差 ConcurrentHashMap适合在要求高并发高性能的场合下使用, 在这些场景下, size或者isEmpty等方法用处不大, 这是可以接受的权衡
CopyOnWriteArrayList
- 线程安全 :
- 处理读操作 不需要进行同步和加锁 . 所以读操作具有很好的并发性
- 每次写操作 都会把 底层的数组进行拷贝 ,然后对拷贝数组进行修改,代价很高
无法在调用方进行额外加锁 :但也提供了一些常用的复合操作, 如 putIfAbsent 等
// Insert into map only if no value is mapped from K V putIfAbsent(K key, V value); // Remove only if K is mapped to V boolean remove(K key, V value); // Replace value only if K is mapped to oldValue boolean replace(K key, V oldValue, V newValue); // Replace value only if K is mapped to some value V replace(K key, V newValue);
CopyOnWriteArrayList的迭代只能反应迭代开始时所持有的集合. 迭代期间不会抛出ConcurrentModificationException异常, 调用方不需要进行额外的加锁 CopyOnWriteArrayList只适用于读操作频率远远大于写操作频率的场景
BlockingQueue接口
BlockingQueue的 容量 可以是 无限的 , 也可以是有限的. 无限容量的BlockingQueue 永远也不会发生队列已满的事件
- put 方法:用于 将数据放入队列
- 如果 队列已满 , put方法所在的线程 将 阻塞 , 直到队列不满
- take 方法:用于从队列中 取出数据
- 如果 队列为空 , take方法所在的线程 将 阻塞 , 直到队列不为空
- offer 方法:用于将 数据放入队列
- 如果 队列已满 , 将 最多等待 指定的时间
- 返回 true :说明 数据成功入队 , 否则说明没有成功
- poll 方法:从队列中 取出数据
- 如果 队列为空 , 最多等待 指定的时间
- 返回值为 null :说明 没有取到数据
常见实现类
- ArrayBlockingQueue : 底层使用 循环数组 实现
- LinkedBlockingQueue : 底层使用 链表 实现
- PriorityBlockingQueue : 一个 可排序 的阻塞队列, 可以按照:
- 元素的自然顺序 :元素需要实现 Comparable 接口
- 指定的 Comparator 排序
- SynchronousQueue : 不用保存元素到Queue ,直接交付给消费线程
生产消费模式
生产者
public class FileCrawler implements Runnable { private final BlockingQueue<File> fileQueue; private final FileFilter fileFilter; private final File root; ... public void run() { try { crawl(root); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null) { for (File entry : entries) if (entry.isDirectory()) crawl(entry); else if (!alreadyIndexed(entry)) fileQueue.put(entry); } } }
消费者
public class Indexer implements Runnable { private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue) { this.queue = queue; } public void run() { try { while (true) indexFile(queue.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
主线程
public static void startIndexing(File[] roots) { BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND); FileFilter filter = new FileFilter() { public boolean accept(File file) { return true; } }; for (File root : roots) new Thread(new FileCrawler(queue, filter, root)).start(); for (int i = 0; i < N_CONSUMERS; i++) new Thread(new Indexer(queue)).start(); }
同步工具类
CountDownLatch
闭锁 相当于一个门, 当 到达特定的状态 之前, 门是 关闭 的, 此时所有线程将被 阻塞 , 只有到达了特定状态, 线程才能通过
- 计算 直到所有的资源初始化 才开始计算
- 服务 直到依赖的服务启动完毕 才启动
- 会话 直到所有的参加者都就绪 才开始
CountDownLatch是闭锁的具体实现: 内部维护了一个计数器, 初始化CountDownLatch时需要指定计数器的初始值,该初始值表示需要等待完成的事件的个数 每调用一次countDown方法, 表示其中一个事件已经完成, 计数器的值将减一 当计数器减为0时, 门才会打开
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { // startGate用来控制子线程,当所有的子线程准备就绪时候,主线程执行startGate的countDown操作,让子线程一起运行 final CountDownLatch startGate = new CountDownLatch(1); //endGate用来控制主线程,每个子线程运行完毕后,对endGate执行countDown操作,当所有的子线程结束后,主线程恢复运行 final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread t = new Thread() { @Override public void run() { try { //n个线程通过start gate处于等待状态 startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException ignored) { } } }; t.start(); } long start = System.nanoTime(); //start gate开启,n个线程同时开始运行 startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; }
FutureTask
FutureTask :用于 执行任务
- 常用的构造函数 FutureTask(Callable<V> callable) :
- Callable : 封装任务
- get 方法:返回任务的 执行结果
- 三种状态: 等待运行 , 正在运行 , 已完成
- 处于 已完成 状态时调用 get 方法, 将 立即返回计算结果
- 否则 get 方法会 阻塞 , 直到FutureTask转变为 已完成状态
- 计算完成 , 抛出异常 , 或者 被取消 都会使得FutureTask的 状态变为已完成
FutureTask的常见使用场景是封装一个耗时任务 提前开始计算, 当需要计算结果时, 再调用其get方法 这样可以减少等待计算完成的时间
public class Preloader { private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>() { public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { // get可能阻塞,获得计算结果,抛出异常 return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) throw (DataLoadException) cause; // 主线程直接处理业务异常 else throw launderThrowable(cause); } } }
异常处理:
- 业务异常 : 主线程 处理
- RuntimeException :直接返回,然后在 主线程抛出
- Error :直接抛出
抛出 IllegalStateException
/** If the Throwable is an Error, throw it; if it is a * RuntimeException return it, otherwise throw IllegalStateException */ public static RuntimeException launderThrowable(Throwable t) { if (t instanceof RuntimeException) return (RuntimeException) t; // 返回RuntimeException,在主线程再次抛出 else if (t instanceof Error) throw (Error) t; // 直接抛出Error else throw new IllegalStateException("Not unchecked", t); }
Semphore
Semaphore :用于 管理permit
- 创建Semaphore对象时, 需要 指定 permit的 最大个数
- acquire() 方法: 申请从Semaphore对象中 获取一个permit
- 如果当前semaphore对象 没有可用的permit , 线程将被 阻塞 , 直到 有可用的 permit
- release() 方法: permit放回 Semaphore对象
- permit 不与线程绑定 , 一个线程申请的permit, 可以在另一个线程里release
Semaphore 通常用于实现资源池这种有界的集合:如数据库连接池 等
public class BoundedHashSet<T> { private final Set<T> set; private final Semaphore sem; public BoundedHashSet(int bound) { this.set = Collections.synchronizedSet(new HashSet<>()); // 设定Semaphore对象中的permit的最大个数 sem = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { // 每次add时就向semaphore对象申请一个permit sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) { // set里已经存在元素时候release permit sem.release(); } } } public boolean remove(T o) { boolean wasRemoved = set.remove(o); if (wasRemoved) { // 成功移除后将release permit sem.release(); } return wasRemoved; } }
Barrier
栅栏允许一组线程互相等待 , 直到该组线程全部到达某个公共屏障点 闭锁:一组线程等待事件 栅栏 : 一组线程互相等待其他线程
CyclicBarrier :栅栏的 一种实现
- 创建CyclicBarrier时需要指定线程组中线程的数量
- await 方法:表示 当前线程 已 到达 公共屏障点 , 然后 等待其他线程到达
- 当 所有线程 到达 公共屏障点 后, CyclicBarrier对象将 释放线程组 , 然后 重置 CyclicBarrier对象的状态
- 因此CyclicBarrier对象是可以 循环使用 的(闭锁的门一旦打开就不会再次关闭)
如果 有线程在等待期间 超时 或者 被中断
- 该CyclicBarrier对象被视为 已损坏 ,随后对 await 方法的调用都要抛出 BrokenBarrierException 异常
栅栏可以用于计算中主线程等待另外一组线程并行计算完毕
public class CellularAutomata { private final Board mainBoard; private final CyclicBarrier barrier; private final Worker[] workers; public CellularAutomata(Board board) { this.mainBoard = board; int count = Runtime.getRuntime().availableProcessors(); this.barrier = new CyclicBarrier(count, new Runnable() { public void run() { mainBoard.commitNewValues(); }}); this.workers = new Worker[count]; for (int i = 0; i < count; i++) workers[i] = new Worker(mainBoard.getSubBoard(count, i)); } private class Worker implements Runnable { private final Board board; public Worker(Board board) { this.board = board; } public void run() { while (!board.hasConverged()) { for (int x = 0; x < board.getMaxX(); x++) for (int y = 0; y < board.getMaxY(); y++) board.setNewValue(x, y, computeValue(x, y)); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public void start() { for (int i = 0; i < workers.length; i++) new Thread(workers[i]).start(); mainBoard.waitForConvergence(); } }
高效线程安全的缓存
使用 HashMap 做缓存
public interface Computable<A, V> { V compute(A arg) throws InterruptedException; } public class ExpensiveFunction implements Computable<String, BigInteger> { // 模拟一个耗时操作 public BigInteger compute(String arg) { // ... return new BigInteger(arg); } } public class Memorizer1<A, V> implements Computable<A, V> { private final Map<A, V> cache = new HashMap<A, V>(); private final Computable<A, V> c; public Memorizer1(Computable<A, V> c) { this.c = c; } // 使用synchronized同步整个方法解决线程安全 public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } }
由于HashMap不是线程安全的, Memorizer1同步整个compute方法 避免重复计算的同时, 牺牲了并发执行compute方法的机会 此种设计甚至可能导致性能比没有缓存更差
避免枷锁 :使用 ConcurrentHashMap
public class Memorizer2<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); private final Computable<A, V> c; public Memorizer2(Computable<A, V> c) { this.c = c; } public V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = c.compute(arg); cache.put(arg, result); } return result; } }
Memorizer2的问题在于一个线程在执行compute方法的过程中 其他线程以相同的参数调用compute方法时, 无法从缓存中获知已有线程正在进行该参数的计算的信息 因此造成了“重复计算”的发生
缓存 Future 对象: 返回 该 Future 对象的 get 方法
如果实际的计算还在进行当中, get方法将被阻塞, 直到计算完成
public class Memorizer3<A, V> implements Computable<A, V> { // 改为缓存Future private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memorizer3(Computable<A, V> c) { this.c = c; } public V compute(final A arg) throws InterruptedException { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = ft; // 在计算开始前就将Future对象存入缓存中. cache.put(arg, ft); ft.run(); // call to c.compute happens here } try { // 如果缓存中存在arg对应的Future对象, 就直接调用该Future对象的get方法. // 如果实际的计算还在进行当中, get方法将被阻塞, 直到计算完成 return f.get(); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }
不幸的是Memorizer3仍然存在重复计算的问题,只是相对于Memorizer2重复计算的概率降低了一些 因为 get, if, put这三个操作并不是原子性的
使用 putIfAbsent : 合并 get, if , put这三个操作
public class Memorizer<A, V> implements Computable<A, V> { private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); private final Computable<A, V> c; public Memorizer(Computable<A, V> c) { this.c = c; } public V compute(final A arg) throws InterruptedException { while (true) { Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); // 使用putIfAbsent测试是否真的将ft存入了缓存, 如果存入失败, 说明cache中已经存在arg对应的future对象 // 否则才进行计算. f = cache.putIfAbsent(arg, ft); if (f == null) { f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { // 当计算被取消时, 从缓存中移除arg-f键值对 cache.remove(arg, f); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } } }