当前位置:首页 > 科技  > 软件

获取双异步返回值时,如何保证主线程不阻塞?

来源: 责编: 时间:2024-01-25 10:41:34 347观看
导读一、前情提要在上一篇文章中,使用双异步后,如何保证数据一致性?,通过Future获取异步返回值,轮询判断Future状态,如果执行完毕或已取消,则通过get()获取返回值,get()是阻塞的方法,因此会阻塞当前线程,如果通过new Runnable()执行

7Oa28资讯网——每日最新资讯28at.com

一、前情提要

在上一篇文章中,使用双异步后,如何保证数据一致性?,通过Future获取异步返回值,轮询判断Future状态,如果执行完毕或已取消,则通过get()获取返回值,get()是阻塞的方法,因此会阻塞当前线程,如果通过new Runnable()执行get()方法,那么还是需要返回AsyncResult,然后再通过主线程去get()获取异步线程返回结果。7Oa28资讯网——每日最新资讯28at.com

写法很繁琐,还会阻塞主线程。7Oa28资讯网——每日最新资讯28at.com

下面是FutureTask异步执行流程图:7Oa28资讯网——每日最新资讯28at.com

7Oa28资讯网——每日最新资讯28at.com

二、JDK8的CompletableFuture

1、ForkJoinPool

Java8中引入了CompletableFuture,它实现了对Future的全面升级,可以通过回调的方式,获取异步线程返回值。7Oa28资讯网——每日最新资讯28at.com

CompletableFuture的异步执行通过ForkJoinPool实现, 它使用守护线程去执行任务。7Oa28资讯网——每日最新资讯28at.com

ForkJoinPool在于可以充分利用多核CPU的优势,把一个任务拆分成多个小任务,把多个小任务放到多个CPU上并行执行,当多个小任务执行完毕后,再将其执行结果合并起来。7Oa28资讯网——每日最新资讯28at.com

Future的异步执行是通过ThreadPoolExecutor实现的。7Oa28资讯网——每日最新资讯28at.com

7Oa28资讯网——每日最新资讯28at.com

2、从ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的区别

  • ForkJoinPool中的每个线程都会有一个队列,而ThreadPoolExecutor只有一个队列,并根据queue类型不同,细分出各种线程池;
  • ForkJoinPool在使用过程中,会创建大量的子任务,会进行大量的gc,但是ThreadPoolExecutor不需要,因为ThreadPoolExecutor是任务分配平均的;
  • ThreadPoolExecutor中每个异步线程之间是相互独立的,当执行速度快的线程执行完毕后,它就会一直处于空闲的状态,等待其它线程执行完毕;
  • ForkJoinPool中每个异步线程之间并不是绝对独立的,在ForkJoinPool线程池中会维护一个队列来存放需要执行的任务,当线程自身任务执行完毕后,它会从其它线程中获取未执行的任务并帮助它执行,直至所有线程执行完毕。

因此,在多线程任务分配不均时,ForkJoinPool的执行效率更高。但是,如果任务分配均匀,ThreadPoolExecutor的执行效率更高,因为ForkJoinPool会创建大量子任务,并对其进行大量的GC,比较耗时。7Oa28资讯网——每日最新资讯28at.com

三、通过CompletableFuture优化 “通过Future获取异步返回值”

1、通过Future获取异步返回值关键代码

(1)将异步方法的返回值改为Future<Integer>,将返回值放到new AsyncResult<>();中

@Async("async-executor")public void readXls(String filePath, String filename) {    try {     // 此代码为简化关键性代码        List<Future<Integer>> futureList = new ArrayList<>();        for (int time = 0; time < times; time++) {            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();            futureList.add(sumFuture);        }    }catch (Exception e){        logger.error("readXlsCacheAsync---插入数据异常:",e);    }}
@Async("async-executor")public Future<Integer> readXlsCacheAsync() {    try {        // 此代码为简化关键性代码        return new AsyncResult<>(sum);    }catch (Exception e){        return new AsyncResult<>(0);    }}

(2)通过Future<Integer>.get()获取返回值

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {    int[] futureSumArr = new int[futureList.size()];    for (int i = 0;i<futureList.size();i++) {        try {            Future<Integer> future = futureList.get(i);            while (true) {                if (future.isDone() && !future.isCancelled()) {                    Integer futureSum = future.get();                    logger.info("获取Future返回值成功"+"----Future:" + future                            + ",Result:" + futureSum);                    futureSumArr[i] += futureSum;                    break;                } else {                    logger.info("Future正在执行---获取Future返回值中---等待3秒");                    Thread.sleep(3000);                }            }        } catch (Exception e) {            logger.error("获取Future返回值异常: ", e);        }    }        boolean insertFlag = getInsertSum(futureSumArr, excelRow);    logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag);    return insertFlag;}

2、通过CompletableFuture获取异步返回值关键代码

(1)将异步方法的返回值改为 int

@Async("async-executor")public void readXls(String filePath, String filename) { List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();    for (int time = 0; time < times; time++) {     // 此代码为简化关键性代码        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {         @Override         public Integer get() {             return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();         }     }).thenApply((result) -> {// 回调方法         return thenApplyTest2(result);// supplyAsync返回值 * 1     }).thenApply((result) -> {         return thenApplyTest5(result);// thenApply返回值 * 1     }).exceptionally((e) -> { // 如果执行异常:         logger.error("CompletableFuture.supplyAsync----异常:", e);         return null;     });      completableFutureList.add(completableFuture);    }}
@Async("async-executor")public int readXlsCacheAsync() {    try {        // 此代码为简化关键性代码        return sum;    }catch (Exception e){        return -1;    }}

(2)通过completableFuture.get()获取返回值

public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){    logger.info("通过completableFuture.get()获取每个异步线程的插入结果----开始");    int sum = 0;    for (int i = 0; i < list.size(); i++) {        Integer result = list.get(i).get();        sum += result;    }    boolean insertFlag = excelRow == sum;    logger.info("全部执行完毕,excelRow={},入库={}, 数据是否一致={}",excelRow,sum,insertFlag);    return insertFlag;}

3、效率对比

(1)测试环境

  • 12个逻辑处理器的电脑;
  • Excel中包含10万条数据;
  • Future的自定义线程池,核心线程数为24;
  • ForkJoinPool的核心线程数为24;

(2)统计四种情况下10万数据入库时间

  • 不获取异步返回值
  • 通过Future获取异步返回值
  • 通过CompletableFuture获取异步返回值,默认ForkJoinPool线程池的核心线程数为本机逻辑处理器数量,测试电脑为12;
  • 通过CompletableFuture获取异步返回值,修改ForkJoinPool线程池的核心线程数为24。

备注:因为CompletableFuture不阻塞主线程,主线程执行时间只有2秒,表格中统计的是异步线程全部执行完成的时间。7Oa28资讯网——每日最新资讯28at.com

(3)设置核心线程数

将核心线程数CorePoolSize设置成CPU的处理器数量,是不是效率最高的?7Oa28资讯网——每日最新资讯28at.com

// 获取CPU的处理器数量int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 测试电脑是24

因为在接口被调用后,开启异步线程,执行入库任务,因为测试机最多同时开启24线程处理任务,故将10万条数据拆分成等量的24份,也就是10万/24 = 4166,那么我设置成4200,是不是效率最佳呢?7Oa28资讯网——每日最新资讯28at.com

测试的过程中发现,好像真的是这样的。7Oa28资讯网——每日最新资讯28at.com

自定义ForkJoinPool线程池
@Autowired@Qualifier("asyncTaskExecutor")private Executor asyncTaskExecutor;@Overridepublic void readXls(String filePath, String filename) {  List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();    for (int time = 0; time < times; time++) {  CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {         @Override         public Integer get() {             try {                 return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);             } catch (Exception e) {                 logger.error("CompletableFuture----readXlsCacheAsync---异常:", e);                 return -1;             }         };     },asyncTaskExecutor);      completableFutureList.add(completableFuture); } // 不会阻塞主线程    CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {        try {            int insertSum = getCompletableFutureResult(completableFutureList, excelRow);        } catch (Exception ex) {            return;        }    });}
自定义线程池
/** * 自定义异步线程池 */@Bean("asyncTaskExecutor")public AsyncTaskExecutor asyncTaskExecutor() {    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();    //设置线程名称    executor.setThreadNamePrefix("asyncTask-Executor");    //设置最大线程数    executor.setMaxPoolSize(200);    //设置核心线程数    executor.setCorePoolSize(24);    //设置线程空闲时间,默认60    executor.setKeepAliveSeconds(200);    //设置队列容量    executor.setQueueCapacity(50);    /**     * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略     * 通常有以下四种策略:     * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。     * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。     * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)     * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功     */    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    executor.initialize();    return executor;}

7Oa28资讯网——每日最新资讯28at.com

(4)统计分析

效率对比:7Oa28资讯网——每日最新资讯28at.com

③通过CompletableFuture获取异步返回值(12线程) <  ②通过Future获取异步返回值 <  ④通过CompletableFuture获取异步返回值(24线程) <  ①不获取异步返回值7Oa28资讯网——每日最新资讯28at.com

不获取异步返回值时性能最优,这不废话嘛~7Oa28资讯网——每日最新资讯28at.com

核心线程数相同的情况下,CompletableFuture的入库效率要优于Future的入库效率,10万条数据大概要快4秒钟,这还是相当惊人的,优化的价值就在于此。7Oa28资讯网——每日最新资讯28at.com

7Oa28资讯网——每日最新资讯28at.com

四、通过CompletableFuture.allOf解决阻塞主线程问题

1、语法

CompletableFuture.allOf(CompletableFuture的可变数组).whenComplete((r,e) -> {})。7Oa28资讯网——每日最新资讯28at.com

2、代码实例

getCompletableFutureResult方法在 “3.2.2 通过completableFuture.get()获取返回值”。7Oa28资讯网——每日最新资讯28at.com

// 不会阻塞主线程CompletableFuture.allOf(completableFutureList.toArray(new   CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {    logger.info("全部执行完毕,解决主线程阻塞问题~");    try {        int insertSum = getCompletableFutureResult(completableFutureList, excelRow);    } catch (Exception ex) {        logger.error("全部执行完毕,解决主线程阻塞问题,异常:", ex);        return;    }});// 会阻塞主线程//getCompletableFutureResult(completableFutureList, excelRow);logger.info("CompletableFuture----会阻塞主线程吗?");

7Oa28资讯网——每日最新资讯28at.com

五、CompletableFuture中花俏的语法糖

1、runAsync

runAsync 方法不支持返回值。7Oa28资讯网——每日最新资讯28at.com

可以通过runAsync执行没有返回值的异步方法。7Oa28资讯网——每日最新资讯28at.com

不会阻塞主线程。7Oa28资讯网——每日最新资讯28at.com

// 分批异步读取Excel内容并入库int finalEnd = end;CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();

2、supplyAsync

supplyAsync也可以异步处理任务,传入的对象实现了Supplier接口。将Supplier作为参数并返回CompletableFuture结果值,这意味着它不接受任何输入参数,而是将result作为输出返回。7Oa28资讯网——每日最新资讯28at.com

会阻塞主线程。7Oa28资讯网——每日最新资讯28at.com

supplyAsync()方法关键代码:7Oa28资讯网——每日最新资讯28at.com

int finalEnd = end;CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {    @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }});
@Overridepublic int readXlsCacheAsyncMybatis() {    // 不为人知的操作    // 返回异步方法执行结果即可 return 100;}

六、顺序执行异步任务

1、thenRun

thenRun()不接受参数,也没有返回值,与runAsync()配套使用,恰到好处。7Oa28资讯网——每日最新资讯28at.com

// JDK8的CompletableFutureCompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis()).thenRun(() -> logger.info("CompletableFuture----.thenRun()方法测试"));

7Oa28资讯网——每日最新资讯28at.com

2、thenAccept

thenAccept()接受参数,没有返回值。7Oa28资讯网——每日最新资讯28at.com

supplyAsync + thenAccept7Oa28资讯网——每日最新资讯28at.com

  • 异步线程顺序执行
  • supplyAsync的异步返回值,可以作为thenAccept的参数使用
  • 不会阻塞主线程
CompletableFuture.supplyAsync(new Supplier<Integer>() {    @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }}).thenAccept(x -> logger.info(".thenAccept()方法测试:" + x));

7Oa28资讯网——每日最新资讯28at.com

但是,此时无法通过completableFuture.get()获取supplyAsync的返回值了。7Oa28资讯网——每日最新资讯28at.com

3、thenApply

thenApply在thenAccept的基础上,可以再次通过completableFuture.get()获取返回值。7Oa28资讯网——每日最新资讯28at.com

supplyAsync + thenApply,典型的链式编程。7Oa28资讯网——每日最新资讯28at.com

  • 异步线程内方法顺序执行。
  • supplyAsync 的返回值,作为第 1 个thenApply的参数,进行业务处理。
  • 第 1 个thenApply的返回值,作为第 2 个thenApply的参数,进行业务处理。
  • 最后,通过future.get()方法获取最终的返回值。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }}).thenApply((result) -> {    return thenApplyTest2(result);// supplyAsync返回值 * 2}).thenApply((result) -> {    return thenApplyTest5(result);// thenApply返回值 * 5});logger.info("readXlsCacheAsyncMybatis插入数据 * 2 * 5 = " + completableFuture.get());

7Oa28资讯网——每日最新资讯28at.com

七、CompletableFuture合并任务

  • thenCombine,多个异步任务并行处理,有返回值,最后合并结果返回新的CompletableFuture对象。
  • thenAcceptBoth,多个异步任务并行处理,无返回值。
  • acceptEither,多个异步任务并行处理,无返回值。
  • applyToEither,,多个异步任务并行处理,有返回值。

CompletableFuture合并任务的代码实例,这里就不多赘述了,一些语法糖而已,大家切记陷入低水平勤奋的怪圈。7Oa28资讯网——每日最新资讯28at.com

八、CompletableFuture VS Future总结

本文中以下几个方面对比了CompletableFuture和Future的差异:7Oa28资讯网——每日最新资讯28at.com

  • ForkJoinPool和ThreadPoolExecutor的实现原理,探索了CompletableFuture和Future的差异。
  • 通过代码实例的形式简单介绍了CompletableFuture中花俏的语法糖。
  • 通过CompletableFuture优化了 “通过Future获取异步返回值”。
  • 通过CompletableFuture.allOf解决阻塞主线程问题。

Future提供了异步执行的能力,但Future.get()会通过轮询的方式获取异步返回值,get()方法还会阻塞主线程。7Oa28资讯网——每日最新资讯28at.com

轮询的方式非常消耗CPU资源,阻塞的方式显然与我们的异步初衷背道而驰。7Oa28资讯网——每日最新资讯28at.com

JDK8提供的CompletableFuture实现了Future接口,添加了很多Future不具备的功能,比如链式编程、异常处理回调函数、获取异步结果不阻塞不轮询、合并异步任务等。7Oa28资讯网——每日最新资讯28at.com

获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。7Oa28资讯网——每日最新资讯28at.com

异步多线程情况下如何实现事务?7Oa28资讯网——每日最新资讯28at.com

有的小伙伴可能会说:7Oa28资讯网——每日最新资讯28at.com

这还不简单?添加@Transactional注解,如果发生异常或入库数据量不符,直接回滚就可以了~7Oa28资讯网——每日最新资讯28at.com

那么,真的是这样吗?我们下期见~7Oa28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-67849-0.html获取双异步返回值时,如何保证主线程不阻塞?

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: Node问题:如何正确安装nvm?Mac和Win双教程!

下一篇: lowcode-cms开源社区源码设计分享

标签:
  • 热门焦点
  • 红魔电竞平板评测:大屏幕硬实力

    前言:三年的疫情因为要上网课的原因激活了平板市场,如今网课的时代已经过去,大家的生活都恢复到了正轨,这也就意味着,真正考验平板电脑生存的环境来了。也就是面对着这种残酷的
  • 5月iOS设备好评榜:iPhone 14仅排第43?

    来到新的一月,安兔兔的各个榜单又重新汇总了数据,像安卓阵营的榜单都有着比较大的变动,不过iOS由于设备的更新换代并没有那么快,所以相对来说变化并不大,特别是iOS好评榜,老款设
  • 三言两语说透设计模式的艺术-简单工厂模式

    一、写在前面工厂模式是最常见的一种创建型设计模式,通常说的工厂模式指的是工厂方法模式,是使用频率最高的工厂模式。简单工厂模式又称为静态工厂方法模式,不属于GoF 23种设计
  • 服务存储设计模式:Cache-Aside模式

    Cache-Aside模式一种常用的缓存方式,通常是把数据从主存储加载到KV缓存中,加速后续的访问。在存在重复度的场景,Cache-Aside可以提升服务性能,降低底层存储的压力,缺点是缓存和底
  • 一文搞定Java NIO,以及各种奇葩流

    大家好,我是哪吒。很多朋友问我,如何才能学好IO流,对各种流的概念,云里雾里的,不求甚解。用到的时候,现百度,功能虽然实现了,但是为什么用这个?不知道。更别说效率问题了~下次再遇到,
  • JVM优化:实战OutOfMemoryError异常

    一、Java堆溢出堆内存中主要存放对象、数组等,只要不断地创建这些对象,并且保证 GC Roots 到对象之间有可达路径来避免垃 圾收集回收机制清除这些对象,当这些对象所占空间超过
  • 四年持续更迭坚持探索行业无人之境,HarmonyOS 4带来五大升级多项创新

    除了华为每年新发布的旗舰手机系列,上亿花粉更加期待鸿蒙系统每次的跨版本大更新。8月4日,HarmonyOS 4于HDC 2023正式发布,这也是该系统历经四年的再
  • AMD的AI芯片转单给三星可能性不大 与台积电已合作至2nm制程

    据 DIGITIMES 消息,英伟达 AI GPU 出货逐季飙升,接下来 AMD MI 300 系列将在第 4 季底量产。而半导体业内人士表示,近日传出 AMD 的 AI 芯片将转单给
  • AI艺术欣赏体验会在上海梅赛德斯奔驰中心音乐俱乐部上演

    光影交错的镜像世界,虚实幻化的视觉奇观,虚拟偶像与真人共同主持,这些场景都出现在2019世界人工智能大会的舞台上。8月29日至31日,“AI艺术欣赏体验会”在上海
Top