千家信息网

java8异步调用该怎么使用

发表于:2025-11-11 作者:千家信息网编辑
千家信息网最后更新 2025年11月11日,这篇文章跟大家分析一下"java8异步调用该怎么使用"。内容详细易懂,对"java8异步调用该怎么使用"感兴趣的朋友可以跟着小编的思路慢慢深入来阅读一下,希望阅读后能够对大家有所帮助。下面跟着小编一起
千家信息网最后更新 2025年11月11日java8异步调用该怎么使用

这篇文章跟大家分析一下"java8异步调用该怎么使用"。内容详细易懂,对"java8异步调用该怎么使用"感兴趣的朋友可以跟着小编的思路慢慢深入来阅读一下,希望阅读后能够对大家有所帮助。下面跟着小编一起深入学习"java8异步调用该怎么使用"的知识吧。

    一、异步调用方式分析

    今天在写代码的时候,想要调用异步的操作,这里我是用的java8的流式异步调用,但是使用过程中呢,发现这个异步方式有两个方法,如下所示:

    区别是一个 需要指定线程池,一个不需要。

    • 那么指定线程池有哪些好处呢?直观的说有以下两点好处:

      • 可以根据我们的服务器性能,通过池的管理更好的规划我们的线程数。

      • 可以对我们使用的线程自定义名称,这里也是阿里java开发规范所提到的。

    1.1 java8异步调用默认线程池方式

    当然常规使用默认的也没什么问题。我们通过源码分析下使用默认线程池的过程。

       public static CompletableFuture runAsync(Runnable runnable) {        return asyncRunStage(asyncPool, runnable);    }

    看下这个asyncPool是什么?

    如下所示,useCommonPool如果为真,就使用ForkJoinPool.commonPool(),否则创建一个new ThreadPerTaskExecutor()

        private static final Executor asyncPool = useCommonPool ?        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    看看useCommonPool 是什么?

        private static final boolean useCommonPool =        (ForkJoinPool.getCommonPoolParallelism() > 1);
      /**    * 公共池的目标并行度级别    */    public static int getCommonPoolParallelism() {        return commonParallelism;    }

    最终这个并行级别并没有给出默认值

    static final int commonParallelism;

    通过找到这个常量的调用,我们看看是如何进行初始化的,在ForkJoinPool中有一个静态代码块,启动时会对commonParallelism进行初始化,我们只关注最后一句话就好了,:

        // Unsafe mechanics    private static final sun.misc.Unsafe U;    private static final int  ABASE;    private static final int  ASHIFT;    private static final long CTL;    private static final long RUNSTATE;    private static final long STEALCOUNTER;    private static final long PARKBLOCKER;    private static final long QTOP;    private static final long QLOCK;    private static final long QSCANSTATE;    private static final long QPARKER;    private static final long QCURRENTSTEAL;    private static final long QCURRENTJOIN;    static {        // initialize field offsets for CAS etc        try {            U = sun.misc.Unsafe.getUnsafe();            Class k = ForkJoinPool.class;            CTL = U.objectFieldOffset                (k.getDeclaredField("ctl"));            RUNSTATE = U.objectFieldOffset                (k.getDeclaredField("runState"));            STEALCOUNTER = U.objectFieldOffset                (k.getDeclaredField("stealCounter"));            Class tk = Thread.class;            PARKBLOCKER = U.objectFieldOffset                (tk.getDeclaredField("parkBlocker"));            Class wk = WorkQueue.class;            QTOP = U.objectFieldOffset                (wk.getDeclaredField("top"));            QLOCK = U.objectFieldOffset                (wk.getDeclaredField("qlock"));            QSCANSTATE = U.objectFieldOffset                (wk.getDeclaredField("scanState"));            QPARKER = U.objectFieldOffset                (wk.getDeclaredField("parker"));            QCURRENTSTEAL = U.objectFieldOffset                (wk.getDeclaredField("currentSteal"));            QCURRENTJOIN = U.objectFieldOffset                (wk.getDeclaredField("currentJoin"));            Class ak = ForkJoinTask[].class;            ABASE = U.arrayBaseOffset(ak);            int scale = U.arrayIndexScale(ak);            if ((scale & (scale - 1)) != 0)                throw new Error("data type scale not a power of two");            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);        } catch (Exception e) {            throw new Error(e);        }        commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;        defaultForkJoinWorkerThreadFactory =            new DefaultForkJoinWorkerThreadFactory();        modifyThreadPermission = new RuntimePermission("modifyThread");        common = java.security.AccessController.doPrivileged            (new java.security.PrivilegedAction() {                public ForkJoinPool run() { return makeCommonPool(); }});         // 即使线程被禁用也是1,至少是个1        int par = common.config & SMASK;        commonParallelism = par > 0 ? par : 1;    }

    如下所示,默认是7:

    所以接着下面的代码看:

        private static final boolean useCommonPool =        (ForkJoinPool.getCommonPoolParallelism() > 1);

    这里一定是返回true,证明当前是并行的。

        private static final Executor asyncPool = useCommonPool ?        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    上面会返回一个大小是七的的默认线程池

    其实这个默认值是当前cpu的核心数,我的电脑是八核,在代码中默认会将核心数减一,所以显示是七个线程。

            if (parallelism < 0 && //默认是1,小于核心数            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)            parallelism = 1;        if (parallelism > MAX_CAP)            parallelism = MAX_CAP;

    下面我们写个main方法测试一下,10个线程,每个阻塞10秒,看结果:

        public static void main(String[] args) {        // 创建10个任务,每个任务阻塞10秒        for (int i = 0; i < 10; i++) {            CompletableFuture.runAsync(() -> {                try {                    Thread.sleep(10000);                    System.out.println(new Date() + ":" + Thread.currentThread().getName());                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }        try {            Thread.sleep(30000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }

    结果如下所示,前面七个任务先完成,另外三个任务被阻塞10秒后,才完成:

    Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-5Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-4Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-2Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-7Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-3Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-6Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-1-----------------------------------------------------------  Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-2Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-5Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-4

    结论:当我们使用默认的线程池进行异步调用时,如果异步任务是一个IO密集型,简单说处理时间占用长,将导致其他使用共享线程池的任务阻塞,造成系统性能下降甚至异常。甚至当一部分调用接口时,如果接口超时,那么也会阻塞与超时时长相同的时间;实际在计算密集的场景下使用是能提高性能的。

    二、使用自定义的线程池

    上面说到如果是IO密集型的场景,在异步调用时还是使用自定义线程池比较好。

    • 针对开篇提到的两个显而易见的好处,此处新增一条:

      • 可以根据我们的服务器性能,通过池的管理更好的规划我们的线程数。

      • 可以对我们使用的线程自定义名称,这里也是阿里java开发规范所提到的。

      • 不会因为阻塞导致使用共享线程池的其他线程阻塞甚至异常。

    我们自定义下面的线程池:

    /** * @description: 全局通用线程池 * @author:weirx * @date:2021/9/9 18:09 * @version:3.0 */@Slf4jpublic class GlobalThreadPool {    /**     * 核心线程数     */    public final static int CORE_POOL_SIZE = 10;    /**     * 最大线程数     */    public final static int MAX_NUM_POOL_SIZE = 20;    /**     * 任务队列大小     */    public final static int BLOCKING_QUEUE_SIZE = 30;    /**     * 线程池实例     */    private final static ThreadPoolExecutor instance = getInstance();    /**     * description: 初始化线程池     *     * @return: java.util.concurrent.ThreadPoolExecutor     * @author: weirx     * @time: 2021/9/10 9:49     */    private synchronized static ThreadPoolExecutor getInstance() {        // 生成线程池        ThreadPoolExecutor executor = new ThreadPoolExecutor(                CORE_POOL_SIZE,                MAX_NUM_POOL_SIZE,                60,                TimeUnit.SECONDS,                new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE),                new NamedThreadFactory("Thread-wjbgn-", false));        return executor;    }    private GlobalThreadPool() {    }    public static ThreadPoolExecutor getExecutor() {        return instance;    }}

    调用:

        public static void main(String[] args) {        // 创建10个任务,每个任务阻塞10秒        for (int i = 0; i < 10; i++) {            CompletableFuture.runAsync(() -> {                try {                    Thread.sleep(10000);                    System.out.println(new Date() + ":" + Thread.currentThread().getName());                } catch (InterruptedException e) {                    e.printStackTrace();                }            },GlobalThreadPool.getExecutor());        }        try {            Thread.sleep(30000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }

    输出我们指定线程名称的线程:

    Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-1Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-10Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-2Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-9Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-5Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-6Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-3Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-7Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-8Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-4

    三、题外话,动态线程池

    3.1 什么是动态线程池?

    在我们使用线程池的时候,是否有的时候很纠结,到底设置多大的线程池参数是最合适的呢?如果不够用了怎么办,要改代码重新部署吗?

    其实是不需要的,记得当初看过美团的一篇文章,真的让人茅塞顿开啊,动态线程池。

    ThreadPoolExecutor这个类其实是提供对于线程池的属性进行修改的,支持我们动态修改以下的属性:

    从上至下分别是:

    • 线程工厂(用于指定线程名称)

    • 核心线程数

    • 最大线程数

    • 活跃时间

    • 拒绝策略。

    在美团的文章当中呢,是监控服务器线程的使用率,当达到阈值就进行告警,然后通过配置中心去动态修改这些数值。

    我们也可以这么做,使用@RefreshScope加nacos就可以实现了。

    3.2 实践

    我写了一个定时任务,监控当前服务的线程使用率,小了就扩容,一段时间后占用率下降,就恢复初始值。

    其实还有很多地方需要改进的,请大家多提意见,监控的是文章前面的线程池GlobalThreadPool,下面调度任务的代码:

    /** * @description: 全局线程池守护进程 * @author:weirx * @date:2021/9/10 16:32 * @version:3.0 */@Slf4j@Componentpublic class DaemonThreadTask {    /**     * 服务支持最大线程数     */    public final static int SERVER_MAX_SIZE = 50;    /**     * 最大阈值Maximum threshold,百分比     */    private final static int MAXIMUM_THRESHOLD = 8;    /**     * 每次递增最大线程数     */    private final static int INCREMENTAL_MAX_NUM = 10;    /**     * 每次递增核心线程数     */    private final static int INCREMENTAL_CORE_NUM = 5;    /**     * 当前线程数     */    private static int currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;    /**     * 当前核心线程数     */    private static int currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;    @Scheduled(cron = "0 */5 * * * ?")    public static void execute() {        threadMonitor();    }    /**     * description: 动态监控并设置线程参数     *     * @return: void     * @author: weirx     * @time: 2021/9/10 13:20     */    private static void threadMonitor() {        ThreadPoolExecutor instance = GlobalThreadPool.getExecutor();        int activeCount = instance.getActiveCount();        int size = instance.getQueue().size();        log.info("GlobalThreadPool: the active thread count is {}", activeCount);        // 线程数不足,增加线程        if (activeCount > GlobalThreadPool.MAX_NUM_POOL_SIZE % MAXIMUM_THRESHOLD                && size >= GlobalThreadPool.BLOCKING_QUEUE_SIZE) {            currentSize = currentSize + INCREMENTAL_MAX_NUM;            currentCoreSize = currentCoreSize + INCREMENTAL_CORE_NUM;            //当前设置最大线程数小于服务最大支持线程数才可以继续增加线程            if (currentSize <= SERVER_MAX_SIZE) {                instance.setMaximumPoolSize(currentSize);                instance.setCorePoolSize(currentCoreSize);                log.info("this max thread size is {}", currentSize);            } else {                log.info("current size is more than server max size, can not add");            }        }        // 线程数足够,降低线程数,当前活跃数小于默认核心线程数        if (activeCount < GlobalThreadPool.MAX_NUM_POOL_SIZE                && size == 0                && currentSize > GlobalThreadPool.MAX_NUM_POOL_SIZE) {            currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;            currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;            instance.setMaximumPoolSize(currentSize);            instance.setCorePoolSize(currentCoreSize);        }    }}

    3.3 动态线程池有什么意义?

    有的朋友其实问过我,我直接把线程池设置大一点不就好了,这种动态线程池有什么意义呢?

    其实这是一个好问题。在以前的传统软件当中,单机部署,硬件部署,确实,我们能使用的线程数取决于服务器的核心线程数,而且基本没有其他服务来争抢这些线程。

    但是现在是容器的时代,云原生的时代。

    多个容器部署在一个宿主机上,那么当高峰期的时候,某个容器就需要占用大量的cpu资源,如果所有的容器都将大部分资源占据,那么这个容器必然面临阻塞甚至瘫痪的风险。

    当高峰期过了,释放这部分资源可以被释放掉,用于其他需要的容器。。

    再结合到目前的云服务器节点扩容,都是需要动态扩缩容的的,和线程相似,在满足高可用的情况下,尽量的节约成本。

    关于java8异步调用该怎么使用就分享到这里啦,希望上述内容能够让大家有所提升。如果想要学习更多知识,请大家多多留意小编的更新。谢谢大家关注一下网站!

    线程 任务 动态 核心 服务 阻塞 最大 代码 容器 服务器 名称 性能 时候 时间 监控 好处 方式 资源 分析 支持 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 多层数据库应用 注册服务 陕西宝鸡网络安全部门 服务器更新后无法登录 邮箱解析带出服务器地址 黄浦区咨询软件开发销售方法 我的世界手机版空岛服务器 喵神网络安全 完美世界的网络安全的工资 服务器维修保养服务商 森土网络技术 php更新数据库返回结果 以色列网络安全研究中心 广东省网络安全等级保护专家 网络安全法 打印 战地一首页一进去就服务器中断 成都发么么互联网科技有限公司 数据库连接包括哪两种类型 小学学校网络安全宣传资料 网络安全跟it有什么区别 生物信息学中的核酸二级数据库 软件开发中产生的文档 ftp 服务器 端口 有趣网络安全宣传标语 先创建数据库在搭建表 网络安全符合国家的什么要求 江阴进口网络技术厂家现货 数据库应用与管理第二版习题答案 软件开发工具能下载吗 数据库与web技术 小米像苹果一样连接服务器
    0