Java 多线程 ∶ JUC 并发工具原理
一 . 前言趁着有空 , 赶紧把之前欠的债还上 . 这是多线程一阶段计划的最后一篇 , 后续多线程会转入修订和深入阶段 . 彻底吃透多线程.
二. 工具介绍之前说 AQS 的时候曾经提到过这几个类 , 这几个类有一些各自的特点 , 很符合特定的场景 , 之前在生产上用的还挺舒服.
我们一般使用的并发工具有四种 :
CyclicBarrier : 放学一起走
允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活CountDownLatch : 等人到齐了就触发
在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候需要带入该计数器值,该值就表示了线程的数量。每当一个线程完成自己的任务后,计数器的值就会减 1。当计数器的值变为 0 时,就表示所有的线程均已经完成了任务Semaphore
信号量 Semaphore 是一个控制访问多个共享资源的计数器,和 CountDownLatch 一样,其本质上是一个 “共享锁”。Exchanger
可以在对中对元素进行配对和交换的线程的同步点每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象 , Exchanger 可能被视为 SynchronousQueue 的双向形式三 . 原理解析3 .1 CyclicBarrier作用 :
它允许一组线程互相等待,直到到达某个公共屏障点(Common Barrier Point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 Barrier 在释放等待线程后可以重用,所以称它为循环 (Cyclic) 的 屏障 ( Barrier )。
内部原理 :
内部使用重入锁 ReentrantLock 和 Condition
构造函数 :
CyclicBarrier(int parties):
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。CyclicBarrier(int parties, Runnable barrierAction):
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
使用变量 :
parties 变量 : 表示拦截线程的总数量。count 变量 : 表示拦截线程的剩余需要数量。barrierAction 变量 : 为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行 barrierAction ,用于处理更加复杂的业务场景。generation变量 : 表示 CyclicBarrier 的更新换代M-await:等待状态
M-await(longtimeout,TimeUnitunit):等待超时
M-dowait
-该方法第一步会试着获取锁
-如果分代已经损坏,抛出异常
-如果线程中断,终止CyclicBarrier
-进来线程,--count
-count==0表示所有线程均已到位,触发Runnable任务
-唤醒所有等待线程,并更新generation
跳出等待状态的方法
-最后一个线程到达,即index==0
-超出了指定时间(超时等待)
-其他的某个线程中断当前线程
-其他的某个线程中断另一个等待的线程
-其他的某个线程在等待barrier超时
-其他的某个线程在此barrier调用reset()方法。reset()方法用于将屏障重置为初始状态。
SC- Generation :描述了 CyclicBarrier 的更新换代。
-在CyclicBarrier中,同一批线程属于同一代。
-当有 parties 个线程全部到达 barrier 时,generation 就会被更新换代。
-其中broken属性,标识该当前CyclicBarrier是否已经处于中断状态
M-breakBarrier:终止所有的线程
M-nextGeneration:更新换代操作
- 1. 唤醒所有线程。
- 2. 重置 count 。
- 3. 重置 generation 。
M-reset:重置barrier到初始化状态
M-getNumberWaiting:获得等待的线程数
M-判断CyclicBarrier是否处于中断
复制代码
使用案例 :Gitee CyclicBarrier 使用[1]
问题补充 :
1. 传入总得 Count 数
2.每次进来都会--count,同时判断count==0
3.如果不为0,当前线程就会阻塞
privatefinalReentrantLocklock=newReentrantLock();
privatefinalConditiontrip=lock.newCondition();
复制代码
3.2 CountDownLatch在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次, 计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候需要带入该计数器值,该值就表示了线程的数量。每当一个线程完成自己的任务后,计数器的值就会减 1。当计数器的值变为 0 时,就表示所有的线程均已经完成了任务
CountDownLatch内部依赖Sync实现,而Sync继承AQS
sync:
:tryAcquireShared获取同步状态
:tryReleaseShared释放同步状态
await():
使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断
:sync.acquireSharedInterruptibly(1);
:内部使用AQS的acquireSharedInterruptibly(intarg)
getState()
:获取同步状态,其值等于计数器的值
:从这里我们可以看到如果计数器值不等于0,则会调用doAcquireSharedInterruptibly(intarg)
doAcquireSharedInterruptibly
:自旋方法会尝试一直去获取同步状态
countDown
:CountDownLatch提供countDown()方法递减锁存器的计数,如果计数到达零,则释放所有等待的线程
:内部调用AQS的releaseShared(intarg)方法来释放共享锁同步状态
:tryReleaseShared(intarg)方法被CountDownLatch的内部类Sync重写
复制代码
参考案例Gitee CountDownLatch 使用[2]
总结
CountDownLatch 内部通过共享锁实现。在创建 CountDownLatch 实例时,需要传递一个 int 型的参数:count,该参数为计数器的初始值,也可以理解为该共享锁可以获取的总次数。
当某个线程调用 await() 方法,程序首先判断 count 的值是否为 0,如果不会 0 的话则会一直等待直到为 0 为止 (PS : 可以多个线程都调用 await)
当其他线程调用 countDown() 方法时,则执行释放共享锁状态,使 count 值 – 1 (PS :countDown 并不会阻塞)
当在创建 CountDownLatch 时初始化的 count 参数,必须要有 count 线程调用 countDown 方法才会使计数器 count 等于 0,锁才会释放,前面等待的线程才会继续运行。注意 CountDownLatch 不能回滚重置
3 .3 Semaphore基础点
信号量 Semaphore 是一个控制访问多个共享资源的计数器,和 CountDownLatch 一样,其本质上是一个 “共享锁”。
从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目
当一个线程想要访问某个共享资源时,它必须要先获取 Semaphore,当 Semaphore 0 时,获取该资源并使 Semaphore – 1。如果 Semaphore 值 = 0,则表示全部的共享资源已经被其他线程全部占用,线程必须要等待其他线程释放资源。当线程释放资源时,Semaphore 则 + 1
实现细节
Semaphore 提供了两个构造函数:
Semaphore(int permits) :创建具有给定的许可数和非公平的公平设置的 Semaphore。Semaphore(int permits, boolean fair) :创建具有给定的许可数和给定的公平设置的 Semaphore。Semaphore 默认选择非公平锁。
当信号量 Semaphore = 1 时,它可以当作互斥锁使用。其中 0、1 就相当于它的状态,当 = 1 时表示其他线程可以获取,当 = 0 时,排他,即其他线程必须要等待。
acquire()方法来获取一个许可
:内部调用AQS的acquireSharedInterruptibly(intarg),该方法以共享模式获取同步状态
公平
:判断该线程是否位于CLH队列的列头
:获取当前的信号量许可
:设置“获得acquires个信号量许可之后,剩余的信号量许可数”
:CAS设置信号量
非公平
:不需要判断当前线程是否位于CLH同步队列列头
复制代码
3 .4 Exchanger可以在对中对元素进行配对和交换的线程的同步点
每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象 , Exchanger 可能被视为 SynchronousQueue 的双向形式
Exchanger,它允许在并发任务之间交换数据 : 当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中
TODO : Exchanger 的源代码比较绕 , 而且这个组件使用场景并不多 , 所以先留个坑 , 以后项目上真的有场景了再实际上分析一下
3.5 并发工具使用@ github.com/black-ant/c…[3]
补充 :# CountDownLatch 和 CyclicBarrier 如何理解 ?CyclicBarrier : 小学生去郊游 , 老师下车时统计人数 , 人数到齐了才能一起参观CountDownLatch : 幼儿园老师送孩子 (ChildThread) 放学 , 走一个记一个数 , 当所有的学生放学后 , 老师 (BossThread) 下班CyclicBarrier就是一堵墙,人数到了所有线程才能一起越过墙
CountDownLatch只是一个计数器,数目到了主线程才能执行
CyclicBarrier可以重置计数,CountDownLatch不可以
复制代码
参考资料[1]https://gitee.com/antblack/case/blob/master/case%20Module%20Thread/case%20Thread%20JUC%20Utils/src/main/java/com/gang/juc/demo/service/JUCCyclicBarrierUtils.java
[2]https://gitee.com/antblack/case/blob/master/case%20Module%20Thread/case%20Thread%20JUC%20Utils/src/main/java/com/gang/juc/demo/service/JUCCountDownLatchUtils.java
[3]https://github.com/black-ant/case/tree/master/case%20Module%20Thread/case%20thread_utils
阅读原文
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设、网站改版、域名注册、主机空间、手机网站建设、网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。 项目经理在线