Java并发简述(同步注解、发布和溢出、并发容器、工作窃取、锁等)

前言

阳了之后修养了一段时间所以阅读书籍的进度慢了不少这里对《Java并发编程实战》做一点知识点抽取。

知识点

同步Annotation

可以帮助静态代码分析工具进行检查是否真的符合 Annotation同时能够让代码具有更好的可读性。

类Annotation

常见的描述可预期的类安全性的注解如下

  • @Immutable 不可变且ThreadSafe
  • @ThreadSafe 明确线程安全
  • @NotThreadSafe 明确线程不安全

域Annotation和方法Annotation

最常见的@GuardedBy(lock)记录的是线程只有持有一个特定的锁后才能访问某个域或者方法。lock可能值如下

  • @GuardedBy(“this”):对象中的内部锁
  • @GuardedBy(“fieldName”):与fieldName引用的对象相关联的锁可能是显式锁、隐式锁
  • @GuardedBy(“ClassName.fieldName”):与@GuardedBy(“fieldName”)类似指的是静态域
  • @GuardedBy(“methodName()”):锁对象指的是方法的返回值
  • @GuardedBy(“ClassName.class”):是指ClassName类的对象

发布和溢出

这个模块之前已经做过总结这里不再多说。

发布

让对象能够让其他代码块进行使用。

最常见的发布方式就是将对象的方存储到公共静态域中任何类和线程都能看见这个域在这个过程中就可能发生逃逸——所有该发布的对象中所能获取的数据都变成公有了。

发布对象

  • 不可变对象可以任意机制发布
  • 高效不可变对象对象可变状态不可变必须要安全发布
  • 可变对象必须要安全发布同时必须要线程安全或者被锁保护。

溢出

对象还没有准备好就被发布。

安全发布的模式

  • 通过静态初始化器来初始化对象的引用静态初始化器是由JVM在类的初始化阶段执行即在类被加载后并且被线程使用前。在静态初始化期间内存写入操作将自动对所有线程可见
  • 将引用存储到volatile或者AtomicReference中
  • 将引用存储到final域中
  • 用锁保护存储的域

并发容器

提高了曾经的同步容器的吞吐量并发性有所提升。

  • ConcurrentHashMap代替同步的哈希Map
  • 当多数操作为读取时CopyOnWriteArrayList是List的同步实现。
  • Queue相较于List有着更高的并发实现。 FIFO队列ConcurrentLinkedQueueBlockingQueue扩展了Queue增加了可阻塞的插入和获取。

着重讲常见的 ConcurrentHashMap、BlockingQueue、CopyOnWriteArrayList

ConcurrentHashMap

前置知识

在讲ConcurrentHashMap之前先提两个锁的概念

  • 分拆锁一个锁分成两个
  • 分离锁分拆锁进一步扩展分成可大可小加锁块地集合并且他们归属于相互独立地对象这样的情况就是分离锁。分离锁的副作用就是如果要独占容器需要获取所有的锁。

两者的目的都是减小锁的粒度以此提高并发的可能。

ConcurrentHashMap 简述

ConcurrentHashMap就是基于分离锁实现的。

  • Java7之前ConcurrentHashMap的实现使用一个包含16个锁的Array每个锁都守护HashBucket的1/16;Bucket
    N 由第 N mod 16 个锁来守护。这将把锁的请求减少到原来的1/16。这项技术使得ConcurrentHashMap 能够支持16个并发的Writer。

  • Java8之后做了对Array其实进行了进一步优化当容量达到一个的阈值会把存储结构从Array转为红黑树以此提高性能。

CopyOnWriteArrayList

是同步List的一个并发替代品避免了迭代期间对容器加锁和复制。每次返回的都是当时底层数据状态的拷贝之后的修改并不会影响已经返回的数据。读多写少的时候使用否则每次改变容器就复制数组开销可能过多了。

  1. CopyOnWrite适用于读多写少的情况最大程度的提高读的效率
  2. CopyOnWrite是最终一致性在写的过程中原有的读的数据是不会发生更新的只有新的读才能读到最新数据
  3. 如何使其他线程能够及时读到新的数据需要使用volatile变量
  4. 写的时候不能并发写需要对写操作进行加锁

如事件通知系统递交一个通知需要迭代已经注册的监听器并且调用其中每一个。注册和注销事件监听器就少的多的情况。

BlockingQueue 阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take与定时的offer和poll等价。支持生产者-消费者模式该模式不需要把任务立即处理而是存入到一个todo清单这样消费者和生产者就相当于解耦了可以以不同的速度进行生产和消费简化了工作负荷的管理。如线程池和工作队列进行结合。

  • LinkedBlockingQueueArrayBlockingQueue FIFO队列拥有比同步List更好的并发性能
  • PriorityBlockingQueue排序后的阻塞队列
  • SynchronousQueue非真正的队列不会为队列元素维护任何存储空概念。而是维护一个排队的线程清单。平常都是需要先入任务队列之后取出来使用。而该队列直接进行任务的移交减少了生产者和消费者之间移动数据的延迟。

双端队列和窃取工作

双端队列

Deque和BlockingDeque分别扩展了Queue和BlockingQueue。基于ArrayDeque和LinkedBlockingDeque实现。双端队列适用于窃取工作。

窃取工作

线程在完成了自己队列的所有任务之后能够偷取其他消费者的双端队列中末尾的任务。

Synchronizer

除了上述讲的阻塞队列以外还包括其他类型的Synchronizer信号量semaphore、关卡barrier、闭锁latch

Semaphore

计数信号量可以用来实现资源池或者给定容器的边界。semaphore跟操作系统中一致。没有可用的资源阻塞acquire直到其他线程release方法返回了一个许可。

/**
 * @description 用信号量来约束容器大小
 */
public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet (new HashSet<> ( ));
        sem = new Semaphore (bound);
    }

    public boolean add(T o) throws InterruptedException {
    	// 尝试获取信号量看是否有空余
        sem.acquire ( );
        boolean wasAdded = false;
        try {
            wasAdded = set.add (o);
            return wasAdded;
        } finally {
        	// 添加失败就释放信号量
            if (!wasAdded) {
                sem.release ( );
            }
        }
    }

    public boolean remove(T o) {
        boolean wasRemoved = false;
        try {
            wasRemoved = set.remove (o);
        }finally {
        	// 删除成功就释放信号量
            if (wasRemoved){
                sem.release ();
            }
        }
        return wasRemoved;
    }
}

Barrier

类似于闭锁能够阻塞一组线程直到某些事件发生。闭锁要求所有的等待事件发生才能开启后续指定的事情而关卡要求所有线程到达但是后续事情没指定。好似家庭聚会所有人到达再决定之后的事情。

CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点常用于并行迭代算法。把问题拆分成相互独立的子问题当线程到达关卡就会await进行阻塞直到所有线程都到达。

public class PutTakeTest {
    private static final ExecutorService pool = Executors.newCachedThreadPool ();
    private final AtomicInteger putSum = new AtomicInteger (0);
    private final AtomicInteger takeSum = new AtomicInteger (0);
    private final CyclicBarrier barrier;
    private final BoundedBuffer<Integer> bb;
    private final int nTrials, nPairs;
    private final BarrierTimer timer;

    public PutTakeTest(int capacity, int npairs, int ntrials) {
        this.bb = new BoundedBuffer<> (capacity);
        this.nPairs = npairs;
        this.nTrials = ntrials;
        // 统计 关卡设限到冲破时间的 的计时器
        this.timer = new BarrierTimer ();
        // 关卡冲破就执行这个计时动作 以及需要等待到达的线程数
        this.barrier = new CyclicBarrier (npairs * 2 + 1,timer);
    }

    static int xorShift(int y) {
        y ^= (y << 6);
        y ^= (y >>> 21);
        y ^= (y << 7);
        return y;
    }

    void test() {
        try {
            timer.clear ();
            // 2 * npairs 的线程到达
            for (int i = 0; i < nPairs; i++) {
                pool.execute (new Producer ());
                pool.execute (new Consumer ());
            }
            barrier.await (); // 等待所有线程做好准备 第一次冲关之后 关卡恢复 npairs * 2 + 1 到达
            barrier.await (); // 等待所有线程最终完成 第二次冲关等待 npairs * 2 + 1 到达
            long nsPerItem = timer.getTime () / (nPairs * (long) nTrials);
            System.out.print ("Throughput : " + nsPerItem + " ns / item");
            assertEquals (putSum.get (), takeSum.get ());
        } catch (BrokenBarrierException | InterruptedException e) {
            throw new RuntimeException (e);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new PutTakeTest (10, 10, 100000).test ();
        pool.shutdown ();
    }
    
	// 生成随机数的任务
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                int seed = (this.hashCode () ^ (int) System.nanoTime ());
                int sum = 0;
               // 冲关行为 即当前线程到达准备执行后续任务
                barrier.await ();
                for (int i = nTrials; i > 0; i--) {
                    bb.put (seed);
                    sum += seed;
                    seed = xorShift (seed);
                }
                putSum.getAndAdd (sum);
                // 冲关行为 即当前线程结束
                barrier.await ();
            } catch (BrokenBarrierException | InterruptedException e) {
                throw new RuntimeException (e);
            }
        }
    }
	// 消费者 进行求和统计
    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
            	// 冲关行为 即当前线程到达准备执行后续任务
                barrier.await ();
                int sum = 0;
                for (int i = nTrials; i > 0; i--) {
                    sum += bb.take ();
                }
                takeSum.getAndAdd (sum);
                // 冲关行为 即当前线程结束
                barrier.await ();
            } catch (BrokenBarrierException | InterruptedException e) {
                throw new RuntimeException (e);
            }
        }
    }

关卡放行之后会再次重置状态可以重复使用。上述代码得以体现

Latch

CountDownLatch是一个灵活的闭锁实现。允许一个或者多个线程等待一个事件集的发生。闭锁的状态包括一个计数器初始化时注入需要等待的事件数。

  • countDown对计数器做减操表示一个事件已经发生。
  • await表示等待计数器达到0。
public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
		// 开始的信号
        final CountDownLatch startGate = new CountDownLatch (1);
        // 结束信号
        final CountDownLatch endGate = new CountDownLatch (nThreads);
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread (() ->{
                try {
                	// 所有线程都在这里阻塞等待开始信号减成0
                    startGate.await ();
                    try {
                        task.run ();
                    }finally {
                    	// 每个任务结束结束信号-1
                        endGate.countDown ();
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException (e);
                }
            });
            t.start ();
        }
        long start = System.nanoTime ();
        // 开始信号减为0瞬间所有的任务开始
        startGate.countDown ();
        // 等待所有的任务结束
        endGate.await ();
        long end = System.nanoTime ();
        return (end - start)/1000;
    }

一旦闭锁打开就无法再恢复

锁的类型

常见的锁Synchronized和Lock。

  • Sychronized语义上的实现它就是一个非公平、悲观、独享、互斥、可重入的重量级锁。
    以下两个锁都在JUC包下是API层面上的实现
  • ReentrantLock它是一个默认非公平但可实现公平的、悲观、独享、互斥、可重入、重量级锁。
  • ReentrantReadWriteLock它是一个默认非公平但是可实现公平的、悲观、写独享、读共享、读写、可重入、重量级锁。

常见的名词

悲观锁

独占锁是悲观锁机制不管怎么样我要对数据进行操作我就上锁。并发性自然差一些安全性高。

乐观锁

乐观锁就是读数据并进行计算当需要进行写入的操作的时候会比较当时读的数据和内存中的版本是否一致如果一致就能进行交换否则自旋等待到一定次数就阻塞或者直接失败。

CAS

比较并交换其实就是一个乐观锁的应用版本号的添加就是为了防止ABA问题。

ABA

试想读数据并进行计算当需要进行写入的操作的时候会比较当时读的数据和内存中的数据的具体值是否一致如果一致就进行更新。看似没有问题实际很有可能其他线程已经修改成B之后改回了A只是这些都是在当前线程不知情的情况下发生的。这就是ABA问题。

总结

并发编程光理论当然远远不够的还是得实践。如果本文有误欢迎指正~

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java

“Java并发简述(同步注解、发布和溢出、并发容器、工作窃取、锁等)” 的相关文章