UP | HOME

构建模块

Table of Contents

同步集合类

java提供了多种 synchronized 集合:

  • Vector , HashTable
  • CollectionssynchronizedXxx 方法的返回值.
  这些集合封装了状态变量,对每个 public 方法添加 synchronized 关键字

同步集合类的问题

@NotThreadSafe
public static Object getLast(Vector list) {   
    int lastIndex = list.size() - 1;   
    return list.get(lastIndex);   
}   

public static void deleteLast(Vector list) {   
    int lastIndex = list.size() - 1;   
    list.remove(lastIndex);   
}  
假设Vector对象中含有10个元素, 多线程环境下可能出现这样的场景: 

1. 线程1调用getLast方法, 计算得知 lastIndex 为9
2. 线程1失去CPU使用权
3. 线程2调用deleteLast方法, 其lastIndex也为9
4. 线程2删除了第9个元素
5. 线程1重新获得CPU时间, 线程1会试图获取第9个元素, 但是该元素已经被线程2删除了, 此时将抛出ArrayIndexOutOfBoundsException异常

对于 列表的复合操作 仍然需要 额外的同步 保证原子性

ConcurrentModificationException 

  • 迭代集合时, 每个集合内部都拥有一个名为 modCount 的成员变量, 如果集合发生了变化, 就会 更改modCount的值 .
  • 使用 Iterator 开始迭代时,会将modCount的赋值给 expectedModCount
    • 在迭代过程中, 通过 每次比较两者是否相等 来判断集合 是否在内部或被其它线程修改
    • 如果 expectedModCount和modCount不相等 , 将抛出 ConcurrentModificationException 异常 

      Iterator<Integer> it = list.iterator();  
      while(it.hasNext()) {  
          System.out.println(it.next());  
          // remove操作会导致modCount的值被修改, 从而引发ConcurrentModificationException异常  
          list.remove(0);  
      }  
      
  • 可以使用 并发集合类
  • 先同步的 clone一份集合 然后对 clone集合进行迭代 , clone的过程仍然需要同步
ArrayList<Integer> listClone = null;  
// clone时依然需要同步  
synchronized (list) {  
    listClone = (ArrayList<Integer>) list.clone();  
}  
Iterator<Integer> it = listClone.iterator();  
while (it.hasNext()) {  
    doSomething(it.next());  
}  

并发集合类 

  • ConcurrentHashMap 代替散列Map
  • CopyOnWriteArrayList 代替同步List
  • CopyOnWriteArraySet 代替同步Set
  • ConcurrentSkipListMap 代替同步SortMap
  • ConcurrentSkipListSet 代替同步SortSet

ConcurrentHashMap

  • ConcurrentHashMap具有 更好的并发性能 . ConcurrentHashMap是 线程安全 的, 但是其同步策略和SynchronizedMap有很大不同
    • ConcurrentHashMap在 read几乎不用加锁
    • write 时使用的是 细粒度的分段锁 ,甚至可以做到 并发write
  • 由于ConcurrentHashMap的 分段加锁机制 , 使用时 调用方无法再自行加锁

    • 因此ConcurrentHashMap类提供了一些常见的 复合操作
    // 只有key不是集合中的键时才插入该键值对
    V putIfAbsent(K key, V value); 
    // 集合中存在该键值对时才删除
    boolean remove(K key, V value); 
    // 只有key和oldValue是集合中的键值对是才进行替换
    boolean replace(K key, V oldValue, V newValue); 
    // 只有key是集合中的key时才进行替换
    V replace(K key, V newValue);
    
迭代时不需要调用方进行额外的同步. ConcurrentHashMap使用的迭代器被称为weakly consistent(弱一致)迭代器,

迭代开始后, 如果其他线程删除了ConcurrentHashMap集合的某个元素, 且被删除的元素尚未由next方法返回, 则该元素就不会被迭代器返回给调用方.
如果迭代开始后其他线程往ConcurrentHashMap集合中插入了新的元素, 那么新的元素可能会也可能不会被返回给调用方.

无论如何, 弱一致迭代器都保证不会将同一个元素多次返回给调用方,弱一致迭代器也不会在迭代期间抛出ConcurrentModificationException异常

调用ConcurrentHashMap对象的size, isEmpty等方法时(这些方法是针对整体Map的操作), 性能比较差
ConcurrentHashMap适合在要求高并发高性能的场合下使用, 在这些场景下, size或者isEmpty等方法用处不大, 这是可以接受的权衡

CopyOnWriteArrayList

  • 线程安全
    • 处理读操作 不需要进行同步和加锁 . 所以读操作具有很好的并发性
    • 每次写操作 都会把 底层的数组进行拷贝 ,然后对拷贝数组进行修改,代价很高
  • 无法在调用方进行额外加锁 :但也提供了一些常用的复合操作, 如 putIfAbsent

    // Insert into map only if no value is mapped from K
    V putIfAbsent(K key, V value);
    // Remove only if K is mapped to V
    boolean remove(K key, V value);
    // Replace value only if K is mapped to oldValue
    boolean replace(K key, V oldValue, V newValue);
    // Replace value only if K is mapped to some value
    V replace(K key, V newValue);
    
CopyOnWriteArrayList的迭代只能反应迭代开始时所持有的集合.

迭代期间不会抛出ConcurrentModificationException异常, 调用方不需要进行额外的加锁

CopyOnWriteArrayList只适用于读操作频率远远大于写操作频率的场景

BlockingQueue接口

BlockingQueue的 容量 可以是 无限的 , 也可以是有限的. 无限容量的BlockingQueue 永远也不会发生队列已满的事件

  • put 方法:用于 将数据放入队列
    • 如果 队列已满 , put方法所在的线程阻塞 , 直到队列不满
  • take 方法:用于从队列中 取出数据
    • 如果 队列为空 , take方法所在的线程阻塞 , 直到队列不为空
  • offer 方法:用于将 数据放入队列
    • 如果 队列已满 , 将 最多等待 指定的时间
    • 返回 true :说明 数据成功入队 , 否则说明没有成功
  • poll 方法:从队列中 取出数据
    • 如果 队列为空 , 最多等待 指定的时间
    • 返回值为 null :说明 没有取到数据

常见实现类

  • ArrayBlockingQueue : 底层使用 循环数组 实现
  • LinkedBlockingQueue : 底层使用 链表 实现
  • PriorityBlockingQueue : 一个 可排序 的阻塞队列, 可以按照:
    • 元素的自然顺序 :元素需要实现 Comparable 接口
    • 指定的 Comparator 排序
  • SynchronousQueue不用保存元素到Queue ,直接交付给消费线程

生产消费模式 

生产者

public class FileCrawler implements Runnable {
    private final BlockingQueue<File> fileQueue;
    private final FileFilter fileFilter;
    private final File root;
    ...
    public void run() {
        try {
            crawl(root);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    private void crawl(File root) throws InterruptedException {
        File[] entries = root.listFiles(fileFilter);
        if (entries != null) {
            for (File entry : entries)
                if (entry.isDirectory())
                    crawl(entry);
                else if (!alreadyIndexed(entry))
                    fileQueue.put(entry);
        }
    }
}

消费者

public class Indexer implements Runnable {
    private final BlockingQueue<File> queue;
    public Indexer(BlockingQueue<File> queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            while (true)
                indexFile(queue.take());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

主线程

public static void startIndexing(File[] roots) {
    BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
    FileFilter filter = new FileFilter() {
            public boolean accept(File file) { return true; }
        };
    for (File root : roots)
        new Thread(new FileCrawler(queue, filter, root)).start();
    for (int i = 0; i < N_CONSUMERS; i++)
        new Thread(new Indexer(queue)).start();
}

同步工具类

CountDownLatch

闭锁 相当于一个门, 当 到达特定的状态 之前, 门是 关闭 的, 此时所有线程将被 阻塞 , 只有到达了特定状态, 线程才能通过

  1. 计算 直到所有的资源初始化 才开始计算
  2. 服务 直到依赖的服务启动完毕 才启动
  3. 会话 直到所有的参加者都就绪 才开始
CountDownLatch是闭锁的具体实现:

内部维护了一个计数器, 初始化CountDownLatch时需要指定计数器的初始值,该初始值表示需要等待完成的事件的个数

每调用一次countDown方法, 表示其中一个事件已经完成, 计数器的值将减一

当计数器减为0时, 门才会打开
public long timeTasks(int nThreads, final Runnable task)
    throws InterruptedException {
    // startGate用来控制子线程,当所有的子线程准备就绪时候,主线程执行startGate的countDown操作,让子线程一起运行
    final CountDownLatch startGate = new CountDownLatch(1);
    //endGate用来控制主线程,每个子线程运行完毕后,对endGate执行countDown操作,当所有的子线程结束后,主线程恢复运行
    final CountDownLatch endGate = new CountDownLatch(nThreads);
    for (int i = 0; i < nThreads; i++) {
        Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        //n个线程通过start gate处于等待状态
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) {
                    }
                }
            };
        t.start();
    }
    long start = System.nanoTime();
    //start gate开启,n个线程同时开始运行
    startGate.countDown();

    endGate.await();
    long end = System.nanoTime();
    return end - start;
}

FutureTask

FutureTask :用于 执行任务

  • 常用的构造函数 FutureTask(Callable<V> callable)
    • Callable封装任务
  • get 方法:返回任务的 执行结果
  • 三种状态: 等待运行正在运行已完成
    • 处于 已完成 状态时调用 get 方法, 将 立即返回计算结果
    • 否则 get 方法会 阻塞 , 直到FutureTask转变为 已完成状态
    • 计算完成 , 抛出异常 , 或者 被取消 都会使得FutureTask的 状态变为已完成
FutureTask的常见使用场景是封装一个耗时任务

提前开始计算, 当需要计算结果时, 再调用其get方法

这样可以减少等待计算完成的时间
public class Preloader {
    private final FutureTask<ProductInfo> future =
        new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
                public ProductInfo call() throws DataLoadException {
                    return loadProductInfo();
                }
            });
    private final Thread thread = new Thread(future);
    public void start() { thread.start(); }
    public ProductInfo get()
        throws DataLoadException, InterruptedException {
        try {
              // get可能阻塞,获得计算结果,抛出异常                          
            return future.get();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException)
                throw (DataLoadException) cause; // 主线程直接处理业务异常
            else
                throw launderThrowable(cause);
        }
    }
}

异常处理:

  1. 业务异常主线程 处理
  2. RuntimeException :直接返回,然后在 主线程抛出
  3. Error :直接抛出
  4. 抛出 IllegalStateException

    /** If the Throwable is an Error, throw it; if it is a
     * RuntimeException return it, otherwise throw IllegalStateException
     */
    public static RuntimeException launderThrowable(Throwable t) {
        if (t instanceof RuntimeException)
            return (RuntimeException) t; // 返回RuntimeException,在主线程再次抛出
        else if (t instanceof Error)
            throw (Error) t; // 直接抛出Error
        else
            throw new IllegalStateException("Not unchecked", t);
    }
    

Semphore

Semaphore :用于 管理permit

  • 创建Semaphore对象时, 需要 指定 permit的 最大个数
  • acquire() 方法: 申请从Semaphore对象中 获取一个permit
    • 如果当前semaphore对象 没有可用的permit , 线程将被 阻塞 , 直到 有可用的 permit
  • release() 方法: permit放回 Semaphore对象
  • permit 不与线程绑定 , 一个线程申请的permit, 可以在另一个线程里release
Semaphore 通常用于实现资源池这种有界的集合:如数据库连接池 等   

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<>());
        // 设定Semaphore对象中的permit的最大个数  
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        // 每次add时就向semaphore对象申请一个permit  
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded) {
                // set里已经存在元素时候release permit  
                sem.release();
            }
        }
    }

    public boolean remove(T o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            // 成功移除后将release permit 
            sem.release();
        }
        return wasRemoved;
    }
}

Barrier

     栅栏允许一组线程互相等待 , 直到该组线程全部到达某个公共屏障点

     闭锁:一组线程等待事件 
     栅栏 : 一组线程互相等待其他线程 

CyclicBarrier :栅栏的 一种实现

  • 创建CyclicBarrier时需要指定线程组中线程的数量
  • await 方法:表示 当前线程到达 公共屏障点 , 然后 等待其他线程到达
  • 所有线程 到达 公共屏障点 后, CyclicBarrier对象将 释放线程组 , 然后 重置 CyclicBarrier对象的状态
    • 因此CyclicBarrier对象是可以 循环使用 的(闭锁的门一旦打开就不会再次关闭)
  • 如果 有线程在等待期间 超时 或者 被中断

    • 该CyclicBarrier对象被视为 已损坏 ,随后对 await 方法的调用都要抛出 BrokenBarrierException 异常
         栅栏可以用于计算中主线程等待另外一组线程并行计算完毕 
    
    public class CellularAutomata {
        private final Board mainBoard;
        private final CyclicBarrier barrier;
        private final Worker[] workers;
        public CellularAutomata(Board board) {
            this.mainBoard = board;
            int count = Runtime.getRuntime().availableProcessors();
            this.barrier = new CyclicBarrier(count,
                             new Runnable() {
                                 public void run() {
                                     mainBoard.commitNewValues();
                                 }});
            this.workers = new Worker[count];
            for (int i = 0; i < count; i++)
                workers[i] = new Worker(mainBoard.getSubBoard(count, i));
        }
        private class Worker implements Runnable {
            private final Board board;
            public Worker(Board board) { this.board = board; }
            public void run() {
                while (!board.hasConverged()) {
                    for (int x = 0; x < board.getMaxX(); x++)
                        for (int y = 0; y < board.getMaxY(); y++)
                            board.setNewValue(x, y, computeValue(x, y));
                    try {
                        barrier.await();
                    } catch (InterruptedException ex) {
                        return;
                    } catch (BrokenBarrierException ex) {
                        return;
                    }
                }
            }
        }
        public void start() {
            for (int i = 0; i < workers.length; i++)
                new Thread(workers[i]).start();
            mainBoard.waitForConvergence();
        }
    }
    

     

高效线程安全的缓存

  • 使用 HashMap 做缓存

    public interface Computable<A, V> {   
        V compute(A arg) throws InterruptedException;   
    }   
    
    public class ExpensiveFunction   
        implements Computable<String, BigInteger> {   
        // 模拟一个耗时操作  
        public BigInteger compute(String arg) {   
            // ...  
            return new BigInteger(arg);   
        }   
    }   
    
    public class Memorizer1<A, V> implements Computable<A, V> {   
        private final Map<A, V> cache = new HashMap<A, V>();   
        private final Computable<A, V> c;   
    
        public Memorizer1(Computable<A, V> c) {   
            this.c = c;   
        }   
        // 使用synchronized同步整个方法解决线程安全  
        public synchronized V compute(A arg) throws InterruptedException {   
            V result = cache.get(arg);   
            if (result == null) {   
                result = c.compute(arg);   
                cache.put(arg, result);   
            }   
            return result;   
        }   
    }  
    
          由于HashMap不是线程安全的, Memorizer1同步整个compute方法
    
          避免重复计算的同时, 牺牲了并发执行compute方法的机会
    
          此种设计甚至可能导致性能比没有缓存更差 
    
  • 避免枷锁 :使用 ConcurrentHashMap

    public class Memorizer2<A, V> implements Computable<A, V> {   
        private final Map<A, V> cache = new ConcurrentHashMap<A, V>();   
        private final Computable<A, V> c;   
    
        public Memorizer2(Computable<A, V> c) { this.c = c; }   
    
        public V compute(A arg) throws InterruptedException {   
            V result = cache.get(arg);   
            if (result == null) {   
                result = c.compute(arg);   
                cache.put(arg, result);   
            }   
            return result;   
        }   
    }   
    
    Memorizer2的问题在于一个线程在执行compute方法的过程中
    
    其他线程以相同的参数调用compute方法时, 无法从缓存中获知已有线程正在进行该参数的计算的信息
    
    因此造成了“重复计算”的发生 
    
  • 缓存 Future 对象: 返回Future 对象的 get 方法

        如果实际的计算还在进行当中,  get方法将被阻塞,  直到计算完成 
    
    public class Memorizer3<A, V> implements Computable<A, V> {   
        // 改为缓存Future  
        private final Map<A, Future<V>> cache   
        = new ConcurrentHashMap<A, Future<V>>();   
        private final Computable<A, V> c;   
    
        public Memorizer3(Computable<A, V> c) { this.c = c; }   
    
        public V compute(final A arg) throws InterruptedException {   
            Future<V> f = cache.get(arg);   
            if (f == null) {   
                Callable<V> eval = new Callable<V>() {   
                        public V call() throws InterruptedException {   
                            return c.compute(arg);   
                        }   
                    };   
                FutureTask<V> ft = new FutureTask<V>(eval);   
                f = ft;   
                // 在计算开始前就将Future对象存入缓存中.  
                cache.put(arg, ft);   
                ft.run(); // call to c.compute happens here   
            }   
            try {   
                // 如果缓存中存在arg对应的Future对象, 就直接调用该Future对象的get方法.  
                // 如果实际的计算还在进行当中, get方法将被阻塞, 直到计算完成  
                return f.get();   
            } catch (ExecutionException e) {   
                throw launderThrowable(e.getCause());   
            }   
        }   
    }   
    
          不幸的是Memorizer3仍然存在重复计算的问题,只是相对于Memorizer2重复计算的概率降低了一些
    
          因为 get, if, put这三个操作并不是原子性的 
    
  • 使用 putIfAbsent : 合并 get, if , put这三个操作

    public class Memorizer<A, V> implements Computable<A, V> {   
        private final ConcurrentMap<A, Future<V>> cache  = new ConcurrentHashMap<A, Future<V>>();   
        private final Computable<A, V> c;   
    
        public Memorizer(Computable<A, V> c) { this.c = c; }   
    
        public V compute(final A arg) throws InterruptedException {   
            while (true) {   
                Future<V> f = cache.get(arg);   
                if (f == null) {   
                    Callable<V> eval = new Callable<V>() {   
                            public V call() throws InterruptedException {   
                                return c.compute(arg);   
                            }   
                        };   
                    FutureTask<V> ft = new FutureTask<V>(eval);   
                    // 使用putIfAbsent测试是否真的将ft存入了缓存, 如果存入失败, 说明cache中已经存在arg对应的future对象  
                    // 否则才进行计算.  
                    f = cache.putIfAbsent(arg, ft);   
                    if (f == null) { f = ft; ft.run(); }   
                }   
                try {   
                    return f.get();   
                } catch (CancellationException e) {   
                    // 当计算被取消时, 从缓存中移除arg-f键值对  
                    cache.remove(arg, f);   
                } catch (ExecutionException e) {   
                    throw launderThrowable(e.getCause());   
                }   
            }   
        }   
    }   
    

    Next:任务执行

    Previous:对象组合

    Up:目录