java获取双异步返回值时,如何保证主线程不阻塞?

一、前情提要

在上一篇文章中,使用双异步后,如何保证数据一致性?,通过Future获取异步返回值,轮询判断Future状态,如果执行完毕或已取消,则通过get()获取返回值,get()是阻塞的方法,因此会阻塞当前线程,如果通过new Runnable()执行get()方法,那么还是需要返回AsyncResult,然后再通过主线程去get()获取异步线程返回结果。

写法很繁琐,还会阻塞主线程。

下面是FutureTask异步执行流程图:

在这里插入图片描述

二、JDK8的CompletableFuture

1、ForkJoinPool

Java8中引入了CompletableFuture,它实现了对Future的全面升级,可以通过回调的方式,获取异步线程返回值。

CompletableFuture的异步执行通过ForkJoinPool实现, 它使用守护线程去执行任务。

ForkJoinPool在于可以充分利用多核CPU的优势,把一个任务拆分成多个小任务,把多个小任务放到多个CPU上并行执行,当多个小任务执行完毕后,再将其执行结果合并起来。

Future的异步执行是通过ThreadPoolExecutor实现的。

在这里插入图片描述

2、从ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的区别

  1. ForkJoinPool中的每个线程都会有一个队列,而ThreadPoolExecutor只有一个队列,并根据queue类型不同,细分出各种线程池;
  2. ForkJoinPool在使用过程中,会创建大量的子任务,会进行大量的gc,但是ThreadPoolExecutor不需要,因为ThreadPoolExecutor是任务分配平均的;
  3. ThreadPoolExecutor中每个异步线程之间是相互独立的,当执行速度快的线程执行完毕后,它就会一直处于空闲的状态,等待其它线程执行完毕;
  4. ForkJoinPool中每个异步线程之间并不是绝对独立的,在ForkJoinPool线程池中会维护一个队列来存放需要执行的任务,当线程自身任务执行完毕后,它会从其它线程中获取未执行的任务并帮助它执行,直至所有线程执行完毕。

因此,在多线程任务分配不均时,ForkJoinPool的执行效率更高。但是,如果任务分配均匀,ThreadPoolExecutor的执行效率更高,因为ForkJoinPool会创建大量子任务,并对其进行大量的GC,比较耗时。

三、通过CompletableFuture优化 “通过Future获取异步返回值”

1、通过Future获取异步返回值关键代码

(1)将异步方法的返回值改为Future<Integer>,将返回值放到new AsyncResult<>();中;
@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
    	// 此代码为简化关键性代码
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入数据异常:",e);
    }
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        // 此代码为简化关键性代码
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}
(2)通过Future<Integer>.get()获取返回值:
public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i<futureList.size();i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("获取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在执行---获取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("获取Future返回值异常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag);
    return insertFlag;
}

2、通过CompletableFuture获取异步返回值关键代码

(1)将异步方法的返回值改为 int
@Async("async-executor")
public void readXls(String filePath, String filename) {
	List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();
   	for (int time = 0; time < times; time++) {
    	// 此代码为简化关键性代码
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
	        @Override
	        public Integer get() {
	            return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
	        }
	    }).thenApply((result) -> {// 回调方法
	        return thenApplyTest2(result);// supplyAsync返回值 * 1
	    }).thenApply((result) -> {
	        return thenApplyTest5(result);// thenApply返回值 * 1
	    }).exceptionally((e) -> { // 如果执行异常:
	        logger.error("CompletableFuture.supplyAsync----异常:", e);
	        return null;
	    });
	
	    completableFutureList.add(completableFuture);
    }
}
@Async("async-executor")
public int readXlsCacheAsync() {
    try {
        // 此代码为简化关键性代码
        return sum;
    }catch (Exception e){
        return -1;
    }
}
(2)通过completableFuture.get()获取返回值
public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){
    logger.info("通过completableFuture.get()获取每个异步线程的插入结果----开始");

    int sum = 0;
    for (int i = 0; i < list.size(); i++) {
        Integer result = list.get(i).get();
        sum += result;
    }

    boolean insertFlag = excelRow == sum;
    logger.info("全部执行完毕,excelRow={},入库={}, 数据是否一致={}",excelRow,sum,insertFlag);
    return insertFlag;
}

3、效率对比

(1)测试环境
  1. 12个逻辑处理器的电脑;
  2. Excel中包含10万条数据;
  3. Future的自定义线程池,核心线程数为24;
  4. ForkJoinPool的核心线程数为24;
(2)统计四种情况下10万数据入库时间
  1. 不获取异步返回值
  2. 通过Future获取异步返回值
  3. 通过CompletableFuture获取异步返回值,默认ForkJoinPool线程池的核心线程数为本机逻辑处理器数量,测试电脑为12;
  4. 通过CompletableFuture获取异步返回值,修改ForkJoinPool线程池的核心线程数为24。

备注:因为CompletableFuture不阻塞主线程,主线程执行时间只有2秒,表格中统计的是异步线程全部执行完成的时间。

(3)设置核心线程数

将核心线程数CorePoolSize设置成CPU的处理器数量,是不是效率最高的?

// 获取CPU的处理器数量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 测试电脑是24

因为在接口被调用后,开启异步线程,执行入库任务,因为测试机最多同时开启24线程处理任务,故将10万条数据拆分成等量的24份,也就是10万/24 = 4166,那么我设置成4200,是不是效率最佳呢?

测试的过程中发现,好像真的是这样的。

自定义ForkJoinPool线程池
@Autowired
@Qualifier("asyncTaskExecutor")
private Executor asyncTaskExecutor;

@Override
public void readXls(String filePath, String filename) {
 	List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();
    for (int time = 0; time < times; time++) {
		CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
	        @Override
	        public Integer get() {
	            try {
	                return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);
	            } catch (Exception e) {
	                logger.error("CompletableFuture----readXlsCacheAsync---异常:", e);
	                return -1;
	            }
	        };
	    },asyncTaskExecutor);
	
	    completableFutureList.add(completableFuture);
	}

	// 不会阻塞主线程
    CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
        try {
            int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
        } catch (Exception ex) {
            return;
        }
    });
}
自定义线程池
/**
 * 自定义异步线程池
 */
@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //设置线程名称
    executor.setThreadNamePrefix("asyncTask-Executor");
    //设置最大线程数
    executor.setMaxPoolSize(200);
    //设置核心线程数
    executor.setCorePoolSize(24);
    //设置线程空闲时间,默认60
    executor.setKeepAliveSeconds(200);
    //设置队列容量
    executor.setQueueCapacity(50);
    /**
     * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
     * 通常有以下四种策略:
     * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
     * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
     * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
     * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
     */
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}

在这里插入图片描述

(4)统计分析

效率对比:

③通过CompletableFuture获取异步返回值(12线程) < ②通过Future获取异步返回值 < ④通过CompletableFuture获取异步返回值(24线程) < ①不获取异步返回值

不获取异步返回值时性能最优,这不废话嘛~

核心线程数相同的情况下,CompletableFuture的入库效率要优于Future的入库效率,10万条数据大概要快4秒钟,这还是相当惊人的,优化的价值就在于此。

在这里插入图片描述

四、通过CompletableFuture.allOf解决阻塞主线程问题

1、语法

CompletableFuture.allOf(CompletableFuture的可变数组).whenComplete((r,e) -> {})

2、代码实例

getCompletableFutureResult方法在 “3.2.2 通过completableFuture.get()获取返回值”。

// 不会阻塞主线程
CompletableFuture.allOf(completableFutureList.toArray(new 		CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {
    logger.info("全部执行完毕,解决主线程阻塞问题~");
    try {
        int insertSum = getCompletableFutureResult(completableFutureList, excelRow);
    } catch (Exception ex) {
        logger.error("全部执行完毕,解决主线程阻塞问题,异常:", ex);
        return;
    }
});

// 会阻塞主线程
//getCompletableFutureResult(completableFutureList, excelRow);

logger.info("CompletableFuture----会阻塞主线程吗?");

在这里插入图片描述

五、CompletableFuture中花俏的语法糖

1、runAsync

runAsync 方法不支持返回值。

可以通过runAsync执行没有返回值的异步方法。

不会阻塞主线程。

// 分批异步读取Excel内容并入库
int finalEnd = end;
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();

2、supplyAsync

supplyAsync也可以异步处理任务,传入的对象实现了Supplier接口。将Supplier作为参数并返回CompletableFuture结果值,这意味着它不接受任何输入参数,而是将result作为输出返回。

会阻塞主线程。

supplyAsync()方法关键代码:

int finalEnd = end;
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
});
@Override
public int readXlsCacheAsyncMybatis() {
    // 不为人知的操作
   	// 返回异步方法执行结果即可
	return 100;
}

六、顺序执行异步任务

1、thenRun

thenRun()不接受参数,也没有返回值,与runAsync()配套使用,恰到好处。

// JDK8的CompletableFuture
CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis())
.thenRun(() -> logger.info("CompletableFuture----.thenRun()方法测试"));

在这里插入图片描述

2、thenAccept

thenAccept()接受参数,没有返回值。

supplyAsync + thenAccept

  1. 异步线程顺序执行
  2. supplyAsync的异步返回值,可以作为thenAccept的参数使用
  3. 不会阻塞主线程
CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
}).thenAccept(x -> logger.info(".thenAccept()方法测试:" + x));

在这里插入图片描述

但是,此时无法通过completableFuture.get()获取supplyAsync的返回值了。

3、thenApply

thenApply在thenAccept的基础上,可以再次通过completableFuture.get()获取返回值。

supplyAsync + thenApply,典型的链式编程。

  1. 异步线程内方法顺序执行
  2. supplyAsync 的返回值,作为第 1 个thenApply的参数,进行业务处理
  3. 第 1 个thenApply的返回值,作为第 2 个thenApply的参数,进行业务处理
  4. 最后,通过future.get()方法获取最终的返回值
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
	@Override
    public Integer get() {
        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();
    }
}).thenApply((result) -> {
    return thenApplyTest2(result);// supplyAsync返回值 * 2
}).thenApply((result) -> {
    return thenApplyTest5(result);// thenApply返回值 * 5
});

logger.info("readXlsCacheAsyncMybatis插入数据 * 2 * 5 = " + completableFuture.get());

在这里插入图片描述

七、CompletableFuture合并任务

  1. thenCombine,多个异步任务并行处理,有返回值,最后合并结果返回新的CompletableFuture对象;
  2. thenAcceptBoth,多个异步任务并行处理,无返回值;
  3. acceptEither,多个异步任务并行处理,无返回值;
  4. applyToEither,,多个异步任务并行处理,有返回值;

CompletableFuture合并任务的代码实例,这里就不多赘述了,一些语法糖而已,大家切记陷入低水平勤奋的怪圈。

八、CompletableFuture VS Future总结

本文中以下几个方面对比了CompletableFuture和Future的差异:

  1. ForkJoinPool和ThreadPoolExecutor的实现原理,探索了CompletableFuture和Future的差异;
  2. 通过代码实例的形式简单介绍了CompletableFuture中花俏的语法糖;
  3. 通过CompletableFuture优化了 “通过Future获取异步返回值”;
  4. 通过CompletableFuture.allOf解决阻塞主线程问题。

Future提供了异步执行的能力,但Future.get()会通过轮询的方式获取异步返回值,get()方法还会阻塞主线程。

轮询的方式非常消耗CPU资源,阻塞的方式显然与我们的异步初衷背道而驰。

JDK8提供的CompletableFuture实现了Future接口,添加了很多Future不具备的功能,比如链式编程、异常处理回调函数、获取异步结果不阻塞不轮询、合并异步任务等。

获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。


相关文章

  • SpringBoot底层原理

    springboot3.x中,META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports文件中,配置了需要创建Bean对象的全类名。当Springboot项目启动后,Springboot中的一些配置类,bean对象就会自动存入到IOC容器中,不需要我们手动去申明。,它负责管理Spring应用程序中所有的bean,同时提供了一些方法来获取Bean,注册Bean,是整个Spring应用的核心。

  • SQL实现模糊查询的四种方法总结

    SQL实现模糊查询的四种方法总结

  • Spring两大核心思想:IOC和AOP

    1.先自定义一个注解@Target(ElementType.METHOD) //作用范围 此处是方法@Retention(RetentionPolicy.RUNTIME) //生命周期 此处是运行时2.在切点表达式中加入该注解@Aspect@Component@Slf4j@Around(&quot;@annotation(com.example.demo.aspect.TestAnnotation)&quot;) //此处加自定义注解log.info(&quot;around继续&quot;);try {

  • 十分钟带你彻底清楚Sora模型原理

    OpenAI Sora文生视频(图像看作单帧视频)一放出就炸翻整个AI 圈,也是ChatGPT掀起GenAI热潮时隔一年后,OpenAI再次史诗级的更新。OpenAI 随后公布的技术综述[文献1],难掩其勃勃雄心:视频生成模型作为世界模拟器。笔者春节前原计划整理一下对Google Lumiere 文生视频的认知,多个因素遗憾推迟。对比看两者大的技术方向均选择了扩散模型,却也有许多关键细节不同。恰好可以借着 OpenAI 技术综述来提纲挈领,一起梳理一下,为什么笔者觉得这是又一史诗级的更新。

  • SpringCloud-搭建Nacos配置中心

    本文详细介绍了如何在Spring Cloud项目中使用Nacos实现配置管理。首先,通过简洁的步骤指导了Nacos的安装和配置。然后,通过在项目中引入Nacos的依赖和配置,实现了与Nacos配置中心的连接。在Nacos控制台上演示了如何新建配置,并通过Spring Cloud项目实现了动态读取配置的操作。这种灵活的配置管理方案为微服务架构提供了高度可维护性和实时性的优势,使得项目能够在运行时动态调整配置,而无需重启服务

  • C#使用重载方法实现不同类型数据的计算

    为了避免异常,可以先使用Decimal.Parse(string)方法将字符串转换为小数,然后再使用Convert.ToInt32(decimal)方法将小数转换为整数。如果一个类中存在两个以上的同名方法,并且方法的参数类型、个数或者顺序不同,当调用这样的方法时,编译器会根据传入的参数自动进行判断,决定调用哪个方法。例如,字符串是&quot;123.456&quot;,包含非数字字符&quot;.&quot;。重载方法就是方法名称相同,但是每个方法中参数的数据类型、个数或顺序不同的方法。如果字符串包含非数字字符,例如小数点,该方法将引发异常。

  • 小程序 自定义组件和生命周期

    类似vue或者react中的自定义组件⼩程序允许我们使⽤⾃定义组件的⽅式来构建⻚⾯。类似于页面,一个自定义组件由 json wxml wxss js 4个文件组成可以在微信开发者⼯具中快速创建组件的⽂件结构在⽂件夹内 components/myHeader ,创建组件 名为 myHeader⾸先要在⻚⾯的 json ⽂件中进⾏引⽤声明。还要提供对应的组件名和组件路径index.wxml// 引用声明// 要使用的组件的名称 // 组件的路径&lt; view &gt; _微信小程序 自定义组件生命周期

  • DockerUI如何部署结合内网穿透实现公网环境管理本地docker容器

    DockerUI是一个docker容器镜像的可视化图形化管理工具。DockerUI可以用来轻松构建、管理和维护docker环境。它是完全开源且免费的。基于容器安装方式,部署方便高效,浏览和维护docker单节点或集群节点worker和manager。DockerUI具有易于使用的界面。它不需要记住 docker 指令。只需下载镜像即可立即加入并完成部署。使用DockerUI并结合cpolar内网穿透可以更加轻松的管理docker和swarm,实现后台公网访问并管理,视觉性更加直观,后台开发更加便利。

  • Java把列表数据导出为PDF文件,同时加上PDF水印

    可以看到字体文件在jar目录下面是有的,但是发现classes后面多了个叹号。这是引入外部字体方式不对,后改用问题2参考文章的第三种写法就没问题了。网上都是说jar包的版本不对,导致的字体兼容性问题。换了jar包版本发现没效果,后来索性直接把字体下载到本地直接引入。字体文件资源自己百度,直接搜。

  • HarmonyOS 鸿蒙应用开发( 六、实现自定义弹窗CustomDialog)

    自定义弹窗(CustomDialog)可用于广告、中奖、警告、软件更新等与用户交互响应操作。开发者可以通过CustomDialogController类显示自定义弹窗。具体用法请参考自定义弹窗。

  • Vue3前端开发,如何获取组件内dom对象以及子组件的属性和方法

    传统的vue2里面,我们访问dom时的代码,还是的借助于this对象的【this.$refs.userName】。毕竟,父子组件之间,各司其职。下面展示的是,借助于ref来访问子组件的实例对象。(声明:默认情况下,子组件内部的属性和方法,不会主动对外暴漏的。Vue3前端开发,借助Ref来获取组件内dom对象,借助defineExpose编译宏可以获取到子组件的属性和方法。如图,确实是可以拿到子组件的属性和方法了,如果不使用编译宏,是访问不到的。这个是子组件里面的内容,我们定义了一个常量,一个方法。

  • 掌握Spring MVC拦截器整合技巧,实现灵活的请求处理与权限控制!

    在这个方法中可以通过返回值来决定是否要进行放行,我们可以把业务逻辑放在该方法中,如果满足业务则返回true放行,不满足则返回false拦截。方法,如果返回true,则代表放行,会执行原始Controller类中要请求的方法,如果返回false,则代表拦截,后面的就不会再执行了。(7)如果满足规则,则进行处理,找到其对应的controller类中的方法进行执行,完成后返回结果。配置多个后,执行顺序是什么?(5)在找到具体的方法之前,我们可以去配置过滤器(可以配置多个),按照顺序进行执行。

  • 如何把自有数据接入GPT大模型?

    ChatGPT引发了AI革命,大家都想探究如何让它发挥更大价值。以它为代表的大模型并未完全掌握所有专业知识,这也正是我们创业的契机。_docsgpt如何使用自己的数据

  • Java编程模型:VO,BO,PO,DO,DTO

    Java编程模型中的VO,BO,PO,DO,DTO提供了一种结构化和组织代码的方法。通过合理运用这些概念,可以使代码更具可读性、可维护性和可扩展性。在实际项目中,根据需求和架构设计,合理选择和运用这些概念将有助于构建清晰、高效的Java应用程序。