当前位置:首页 > 科技  > 软件

Java并发编程:深入剖析CyclicBarrier源码

来源: 责编: 时间:2024-04-29 16:16:45 45观看
导读引言CyclicBarrier中文叫做循环栅栏,用来控制线程的执行速率。适用场景:一组线程在到达栅栏之前,需要相互等待,到达栅栏之后(满足了特定条件),再一起执行。适用场景好像跟CountDownLatch一样,前面介绍过CountDownLatch的适用

引言

CyclicBarrier中文叫做循环栅栏,用来控制线程的执行速率。HLj28资讯网——每日最新资讯28at.com

适用场景:一组线程在到达栅栏之前,需要相互等待,到达栅栏之后(满足了特定条件),再一起执行。HLj28资讯网——每日最新资讯28at.com

适用场景好像跟CountDownLatch一样,前面介绍过CountDownLatch的适用场景,跟第二种场景很像,不过还是有点区别:HLj28资讯网——每日最新资讯28at.com

  1. CountDownLatch需要手动调用countDown()方法,这组线程才能一起执行,而CyclicBarrier无需调用调用任何方法,线程会自动执行。
  2. CountDownLatch只能使用一次,而CyclicBarrier可以循环使用。

再提一下CountDownLatch的两个适用场景:HLj28资讯网——每日最新资讯28at.com

  1. 当前线程等待其他线程都执行完成之后,再执行。
  2. 所有线程满足条件后,再一起执行。

使用示例

CyclicBarrier常用的方法就一个await()方法,调用await()方法之后,会阻塞当前线程,直到栅栏前的所有线程都调用了await()方法,才会放行,并且一起执行。HLj28资讯网——每日最新资讯28at.com

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @author 一灯架构 * @apiNote CyclicBarrier测试类 **/@Slf4jpublic class CyclicBarrierTest {    public static void main(String[] args) throws InterruptedException {        // 1. 创建一个线程池,用来执行任务        ExecutorService executorService = Executors.newCachedThreadPool();        // 2. 创建一个循环栅栏,线程数是3        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);        // 3. 提交9个任务,刚好可以循环3轮        for (int i = 0; i < 9; i++) {            // 4. 睡眠100ms再提交任务,避免并发提交            Thread.sleep(100);            executorService.execute(() -> {                try {                    // 5. 睡眠1秒,模拟任务准备阶段                    Thread.sleep(1000);                    log.info(Thread.currentThread().getName() + " 准备 " + cyclicBarrier.getNumberWaiting());                    // 6. 阻塞当前任务,直到3个线程都到达栅栏                    cyclicBarrier.await();                    log.info(Thread.currentThread().getName() + " 执行完成");                } catch (Exception e) {                }            });        }        // 7. 关闭线程池        executorService.shutdown();    }}

输出结果:HLj28资讯网——每日最新资讯28at.com

10:00:00.001 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 准备 010:00:00.002 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 准备 110:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 准备 210:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 执行完成10:00:00.003 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 执行完成10:00:00.004 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 执行完成10:00:00.010 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 准备 010:00:00.011 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 准备 110:00:01.003 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 准备 210:00:01.004 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 执行完成10:00:01.004 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 执行完成10:00:01.004 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 执行完成10:00:01.114 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 准备 010:00:01.213 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 准备 110:00:01.317 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 准备 210:00:01.318 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 执行完成10:00:01.318 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 执行完成10:00:01.319 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 执行完成

示例中CyclicBarrier包含3个线程,提交9个任务,每3个任务为一组,调用await()方法后会相互等待,直到3个线程都调用了await()方法,然后放行,并且一起执行,9个任务会循环3轮,从输出结果中可以看出。HLj28资讯网——每日最新资讯28at.com

示例中getNumberWaiting()方法可以查看CyclicBarrier中已经等待的线程数。HLj28资讯网——每日最新资讯28at.com

看完了CyclicBarrier的使用方式,再看一下CyclicBarrier的源码实现。HLj28资讯网——每日最新资讯28at.com

类属性

public class CyclicBarrier {    /**     * 互斥锁,用来保证线程安全     */    private final ReentrantLock lock = new ReentrantLock();    /**     * 栅栏条件操作     */    private final Condition trip = lock.newCondition();        /**     * 栅栏初始线程数     */    private final int parties;        /**     * 到达栅栏后的操作     */    private final Runnable barrierCommand;    /**     * 栅栏前未到达的线程数     */    private int count;    /**     * 当前循环轮数     */    private Generation generation = new Generation();        private static class Generation {        boolean broken = false;    }}

CyclicBarrier内部使用了ReentrantLock来保证线程安全,又使用了Condition来实现线程的等待与唤醒操作。HLj28资讯网——每日最新资讯28at.com

初始化

CyclicBarrier初始化的可以指定线程数和到达栅栏后的操作。HLj28资讯网——每日最新资讯28at.com

/** * 指定线程数 */public CyclicBarrier(int parties) {    this(parties, null);}/** * 指定线程数和到达栅栏后的操作 * @param parties 线程数 * @param barrierAction 到达栅栏后的操作 */public CyclicBarrier(int parties, Runnable barrierAction) {    if (parties <= 0) {        throw new IllegalArgumentException();    }    this.parties = parties;    this.count = parties;    this.barrierCommand = barrierAction;}

比如到达栅栏后,关闭线程池:HLj28资讯网——每日最新资讯28at.com

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> executorService.shutdown());

看一下await()方法源码。HLj28资讯网——每日最新资讯28at.com

await方法源码

/**     * await方法入口     */    public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }    /**     * await方法核心逻辑     * @param timed 是否允许超时,false表示不允许     * @param nanos 超时时间     */    private int dowait(boolean timed, long nanos)            throws InterruptedException, BrokenBarrierException, TimeoutException {        // 1. 加锁        final ReentrantLock lock = this.lock;        lock.lock();        try {            // 2. 获取当前循环轮数            final Generation g = generation;            if (g.broken) {                throw new BrokenBarrierException();            }            // 3. 如果当前线程已中断,就打破栅栏            if (Thread.interrupted()) {                breakBarrier();                throw new InterruptedException();            }            // 4. 计数器减一,如果计数器为零,表示所有线程都到达了栅栏            int index = --count;            if (index == 0) {                boolean ranAction = false;                try {                    // 5. 如果初始化时指定了barrierCommand,就执行                    final Runnable command = barrierCommand;                    if (command != null) {                        command.run();                    }                    ranAction = true;                    nextGeneration();                    return 0;                } finally {                    if (!ranAction) {                        breakBarrier();                    }                }            }            for (; ; ) {                try {                    // 6. 如果不允许超时,就阻塞当前线程                    if (!timed) {                        trip.await();                    } else if (nanos > 0L) {                        nanos = trip.awaitNanos(nanos);                    }                } catch (InterruptedException ie) {                    if (g == generation && !g.broken) {                        breakBarrier();                        throw ie;                    } else {                        Thread.currentThread().interrupt();                    }                }                if (g.broken) {                    throw new BrokenBarrierException();                }                if (g != generation) {                    return index;                }                // 7. 如果已超时,就打破栅栏                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            // 8. 释放锁            lock.unlock();        }    }

await()方法源码很长,但是逻辑很简单,主要分为以下四步:HLj28资讯网——每日最新资讯28at.com

  1. 加锁,保证线程安全。
  2. 统计栅栏前等待的线程数,如果所有线程都到达了栅栏,就执行初始化时指定的barrierCommand。
  3. 如果线程没有指定了超时时间,就直接阻塞当前线程。如果指定了超时时间,就等待直到超时,如果已超时,就打破栅栏。
  4. 释放锁

再看一下打破栅栏的源码:HLj28资讯网——每日最新资讯28at.com

/** * 打破栅栏 */private void breakBarrier() {    // 1. 设置当前循环轮数的状态为已打破    generation.broken = true;    // 2. 重置线程数    count = parties;    // 3. 唤醒所有等待的线程    trip.signalAll();}

其他常用方法

CyclicBarrier还有一些常用的方法:HLj28资讯网——每日最新资讯28at.com

/** * 等待(带超时时间) * @param timeout 超时时间 * @param unit 时间单位 */public int await(long timeout, TimeUnit unit)        throws InterruptedException,        BrokenBarrierException,        TimeoutException {    ...}/** * 重置栅栏(当栅栏出现异常情况时使用) */public void reset() {    ...}

总结

看完了CyclicBarrier的所有源码,是不是觉得CyclicBarrier逻辑很简单。HLj28资讯网——每日最新资讯28at.com

CyclicBarrier主要用来控制线程的执行速率,初始化时指定线程数,线程调用await()方法时会阻塞,直到到达的线程数等于初始线程数,才会放行,并且一起执行。与CountDownLatch区别是,CyclicBarrier可以循环执行,而CountDownLatch只能执行一次。HLj28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-86508-0.htmlJava并发编程:深入剖析CyclicBarrier源码

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 请尽快升级你的 jQuery!

下一篇: Vue 超实用技巧!建立逻辑与动画样式的通道

标签:
  • 热门焦点
  • K60至尊版狂暴引擎2.0加持:超177万跑分斩获性能第一

    K60至尊版狂暴引擎2.0加持:超177万跑分斩获性能第一

    Redmi的后性能时代战略发布会今天下午如期举办,在本次发布会上,Redmi公布了多项关于和联发科的深度合作,以及新机K60 Ultra在软件和硬件方面的特性,例如:“K60 至尊版,双芯旗舰
  • 掘力计划第 20 期:Flutter 混合开发的混乱之治

    掘力计划第 20 期:Flutter 混合开发的混乱之治

    在掘力计划系列活动第20场,《Flutter 开发实战详解》作者,掘金优秀作者,Github GSY 系列目负责人恋猫的小郭分享了Flutter 混合开发的混乱之治。Flutter 基于自研的 Skia 引擎
  • 从 Pulsar Client 的原理到它的监控面板

    从 Pulsar Client 的原理到它的监控面板

    背景前段时间业务团队偶尔会碰到一些 Pulsar 使用的问题,比如消息阻塞不消费了、生产者消息发送缓慢等各种问题。虽然我们有个监控页面可以根据 topic 维度查看他的发送状态,
  • 一篇聊聊Go错误封装机制

    一篇聊聊Go错误封装机制

    %w 是用于错误包装(Error Wrapping)的格式化动词。它是用于 fmt.Errorf 和 fmt.Sprintf 函数中的一个特殊格式化动词,用于将一个错误(或其他可打印的值)包装在一个新的错误中。使
  • JavaScript学习 -AES加密算法

    JavaScript学习 -AES加密算法

    引言在当今数字化时代,前端应用程序扮演着重要角色,用户的敏感数据经常在前端进行加密和解密操作。然而,这样的操作在网络传输和存储中可能会受到恶意攻击的威胁。为了确保数据
  • 2天涨粉255万,又一赛道在抖音爆火

    2天涨粉255万,又一赛道在抖音爆火

    来源:运营研究社作者 | 张知白编辑 | 杨佩汶设计 | 晏谈梦洁这个暑期,旅游赛道彻底火了:有的「地方」火了&mdash;&mdash;贵州村超旅游收入 1 个月超过 12 亿;有的「博主」火了&m
  • 超闭合精工铰链 彻底消灭缝隙 三星Galaxy Z Flip5与Galaxy Z Fold5发布

    超闭合精工铰链 彻底消灭缝隙 三星Galaxy Z Flip5与Galaxy Z Fold5发布

    2023年7月26日,三星电子正式发布了Galaxy Z Flip5与Galaxy Z Fold5。三星新一代折叠屏手机采用超闭合精工铰链,让折叠后的缝隙不再可见。同时,配合处
  • 自研Exynos回归!三星Galaxy S24系列将提供Exynos和骁龙双版本

    自研Exynos回归!三星Galaxy S24系列将提供Exynos和骁龙双版本

    年初,全新的三星Galaxy S23系列发布,包含Galaxy S23、Galaxy S23+和Galaxy S23 Ultra三个版本,全系搭载超频版骁龙8 Gen 2,虽同样采用台积电4nm工艺制
  • SN570 NVMe SSD固态硬盘 价格与性能兼具

    SN570 NVMe SSD固态硬盘 价格与性能兼具

    SN570 NVMe SSD固态硬盘是西部数据发布的最新一代WD Blue系列的固态硬盘,不仅闪存技术更为精进,性能也得到了进一步的跃升。WD Blue SN570 NVMe SSD的包装外
Top