千家信息网

如何进行java并发FutureTask的实现

发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,如何进行java并发FutureTask的实现,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。概述在使用java多线程解决问题的时候,
千家信息网最后更新 2025年11月07日如何进行java并发FutureTask的实现

如何进行java并发FutureTask的实现,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

概述

在使用java多线程解决问题的时候,为了提高效率,我们常常会异步处理一些计算任务并在最后异步的获取计算结果,这个过程的实现离不开Future接口及其实现类FutureTask。FutureTask类实现了Runnable, Future接口,接下来我会通过源码对该类的实现进行详解。

使用

我们先看下FutureTask中的主要方法如下,可以看出FutureTask实现了任务及异步结果的集合功能。看到这块的方法,大家肯定会有疑问,Runnable任务的run方法返回空,FutureTask如何依靠该方法获取线程异步执行结果,这个问题,我们在下面给大家介绍。

//以下五个方法实现接口Future中方法public boolean isCancelled(); public boolean isDone(); public boolean cancel();public V get() throws InterruptedException, ExecutionException;public V get(long timeout, TimeUnit unit);//实现接口Runnable中方法public void run();

我们在使用中会构造一个FutureTask对象,然后将FutureTask扔到另一个线程中执行,而主线程继续执行其他业务逻辑,一段时间后主线程调用FutureTask的get方法获取执行结果。下面我们看一个简单的例子:

/*** Created by yuanqiongqiong on 2019/4/9.*/public class FutureTaskTest {private static ExecutorService executorService = Executors.newFixedThreadPool(1);public static void main(String []args) {Callable callable = new AccCallable(1, 2);FutureTask futureTask = new FutureTask(callable);executorService.execute(futureTask);System.out.println("go to do other things in main thread");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("go back in main thread");try {int result = (int) futureTask.get();System.out.println("result is " + result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}static class AccCallable implements Callable {private int a;private int b;public AccCallable(int a, int b) {this.a = a;this.b = b;}@Overridepublic Integer call() throws Exception {System.out.println("acc a and b in threadId = " + Thread.currentThread().getName());return a + b;}}}

输出结果为:

go to do other things in main threadacc a and b in threadId = pool-1-thread-1go back in main threadresult is 3

实现分析

在分析实现前,我们先想下如果让我们实现一个类似FutureTask的功能,我们会如何做?因为需要获取执行结果,需要一个Object对象来存执行结果。任务执行时间不可控性,我们需要一个变量表示执行状态。其他线程会调用get方法获取结果,在没达到超时的时候需要将线程阻塞或挂起。

因此需要一个队列类似的结构存储等待该结果的线程信息,这样在任务执行线程完成后就可以唤醒这些阻塞或挂起的线程,得到结果。FutureTask的实际实现也是类似的逻辑,具体如下。

首先看下FutureTask的主要成员变量如下:

//futureTask执行状态private volatile int state;//具体的执行任务,会在run方法中抵用callable.call()private Callable callable;//执行结果private Object outcome; //获取结果的等待线程节点private volatile WaitNode waiters;

对于执行状态,在源码中已经有了非常清晰的解释,这里我只是贴出源码,不在进行说明,具体如下:

/*** Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;

然后我们看下FutureTask的构造函数,如下:

public FutureTask(Callable callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable}public FutureTask(Runnable runnable, V result) {//构造函数传入runnable对象时调用静态工具类Executors的方法转换为一个callable对象this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable}

如前所述,FutureTask的执行线程中会调用其run()方法执行任务,我们看下这块逻辑:

public void run() {//1.如果执行状态不是NEW或者有其他线程执行该任务,直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable c = callable;//2.如果执行状态是NEW,即任务还没执行,直接调用callable.call()方法获取执行结果if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;//3.发生异常,更新status为EXCEPTIONAL,唤醒挂起线程setException(ex);}//4.如果结果成功返回,调用set方法将设置outcome,更改status执行状态,唤醒挂起线程if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

我们看下set函数的实现,具体看下4中的执行:

protected void set(V v) {//将执行状态变更为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//设置执行结果outcome = v;//设置执行状态为NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//执行完成后处理操作,具体就是遍历阻塞链表,删除链表节点,并唤醒每个节点关联的线程finishCompletion();}}

以上就是任务执行线程做的逻辑,以上逻辑也回答了FutureTask如何得到执行结果的疑问。下面我们看下用户调用get方法获取执行结果时的实现逻辑,这个时候FutureTask可能处理各种状态,即可能没有执行,执行中,已完成,发生异常等,具体如下:

public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;//执行状态是NEW或者COMPLETING时执行awaitDone将线程加入等待队列中并挂起线程if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();//根据执行状态status进行结果封装return report(s);}//我理解这块是get的核心逻辑private int awaitDone(boolean timed, long nanos)throws InterruptedException {//如果设置了超时时间,计算还有多长时间超时final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {//如果当前线程被中断,删除等待队列中的节点,并抛出异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;//如果执行状态已经完成或者发生异常,直接跳出自旋返回if (s > COMPLETING) {if (q != null)q.thread = null;return s;}//如果执行状态是正在执行,说明线程已经被加入到等待队列中,放弃cpu进入下次循环(真正的自旋)else if (s == COMPLETING) // cannot time out yetThread.yield();//第一次进入循环,创建节点else if (q == null)q = new WaitNode();//将节点加入到等待队列中,waiters相当于头阶段,不断将头结点更新为新节点else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {//如果设置了超时时间,在进行下次循环前查看是否已经超时,如果超时删除该节点进行返回nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}//挂起当前节点LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}

这里需要说明一点,FutureTask中的阻塞队列新加入的节点都在头结点并且next指向之前的头结点,waitars指针总是指向新加入节点,通过waitars可以遍历整个等待队列,具体截图如下。此外等待队列节点结构很简单成员变量只有线程引用和next指针,这里再列出器接口。

futureTask等待队列

读到这里,相信大家已经对FutureTask的实现细节有了一定的认识。此外,FutureTask没有使用锁而是使用Unsafe的是CAS的原子操作来解决竞争问题,减少了锁带来的上下文切换的开销,提高了效率。

关于如何进行java并发FutureTask的实现问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。

线程 结果 方法 状态 节点 任务 队列 逻辑 问题 接口 对象 阻塞 函数 变量 时候 时间 源码 结点 分析 处理 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 东北财经大学网络安全 软件开发过程中的几个阶段 app数据库服务器运维 8g服务器内存容量怎么看 网络安全ppt课件幼儿园 福州兼职的app软件开发 佳明飞耐时5s无法连接服务器 江苏ios软件开发流程 丽水电脑软件开发需要学什么 Linux服务器查看显卡日志 数据库和服务器的连接方式 武汉在线培训软件开发 海康服务器未授权到期 hp服务器日志收集 网络安全督查工作 服务器linux英特尔 信息系统开发与软件开发 数据库记录当前时间 郑州网络技术哪家好 苏州江苏大容量服务器供货厂 锐捷网络技术支持工程师工资 网络服务器配置与管理期末考试 海南本地软件开发成本价 linux服务器宕机只有重启吗 数据库专家宋晓霞 杭州美创科技网络安全工程师 达州健康行业直销软件开发 珠海溢动网络技术有限公司 数据库主键外键作用 黑客可以入侵到游戏的服务器吗
0