侧边栏壁纸
博主头像
翻斗

开始一件事最好是昨天,其次是现在

  • 累计撰写 44 篇文章
  • 累计创建 42 个标签
  • 累计收到 3 条评论

Future介绍

翻斗
2020-03-05 / 0 评论 / 0 点赞 / 841 阅读 / 6,023 字

Baeldung 翻译系列之Java并发基础:

Java中的concurrent包
Java中的Synchronized关键字
Future介绍
ThreadLocal介绍
Java线程的生命周期
如何杀掉一个Java线程
Java中的线程池介绍
实现Runnable接口还是继承Thread类
Java中的wait和notify方法
Runnable vs Callable
wait和sleep的区别
Thread.join方法介绍
Java中使用锁对象
ThreadPoolTaskExecutor中的corePoolSize和maxPoolSize
Java中的异步编程

1. 概览

在这个教程中,我们将学习关于Future的知识。这是一个自Java 1.5以来就存在的接口,当我们处理异步调用和并发处理时,它可能非常有用。

2. 创建Future

简单来说,Future类代表了异步计算的未来结果。这个结果在处理完成后最终会出现在Future中。

长时间运行的方法是异步处理和Future接口的好候选者,因为我们可以在等待封装在Future中的任务完成的同时执行其他进程。

一些利用Future异步性质的操作示例包括:

  • 计算密集型操作(数学和科学计算)
  • 操作大型数据结构(大数据)
  • 远程方法调用(下载文件,HTML抓取,Web服务)

2.1. 用FutureTask实现Future

对于我们的示例,我们将创建一个非常简单的类,用于计算一个整数的平方。这绝对不符合长时间运行的方法类别,但我们将调用Thread.sleep()使其在完成前持续1秒:

public class SquareCalculator {
    private ExecutorService executor = Executors.newSingleThreadExecutor();

    public Future<Integer> calculate(Integer input) {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}

在这个示例中,我们创建了一个名为SquareCalculator的类,它有一个计算方法,该方法接收一个Integer作为输入,然后返回一个Future<Integer>。计算方法使用ExecutorServicesubmit方法提交一个Callable任务,这个任务在执行时会休眠1秒,然后返回输入的平方。

实际执行计算的代码片段包含在call()方法中,并作为lambda表达式提供。如我们所见,除了前面提到的sleep()调用外,没有什么特别的地方。

当我们将注意力转向CallableExecutorService的使用时,情况就变得更有趣了。

Callable是一个表示返回结果的任务的接口,它有一个单一的call()方法。在这里,我们使用了一个lambda表达式创建了它的一个实例。

创建一个Callable的实例并不能让我们得到任何东西;我们仍然需要将这个实例传递给一个执行器,这个执行器将负责在新的线程中启动任务,并将宝贵的Future对象返回给我们。这就是ExecutorService的作用所在。

我们有几种方式可以访问ExecutorService实例,其中大部分都是由工具类Executors的静态工厂方法提供的。在这个例子中,我们使用了基本的newSingleThreadExecutor(),它为我们提供了一个能够一次处理一个线程的ExecutorService

一旦我们有了一个ExecutorService对象,我们只需要调用submit(),并将我们的Callable作为参数传递。然后submit()将启动任务并返回一个FutureTask对象,这是Future接口的一个实现。

3. 消费Futures

到目前为止,我们已经学习了如何创建Future的实例。

在接下来的部分,我们将通过探索Future API中的所有方法,学习如何操作这个实例。

3.1. 使用 isDone() 和 get() 来获取结果

现在我们需要调用 calculate() 方法,并使用返回的 Future 来获取结果的整数。Future API 的两个方法将帮助我们完成这个任务。

Future.isDone() 告诉我们执行器是否已经完成了任务的处理。如果任务完成,它将返回 true;否则,返回 false

返回实际计算结果的方法是 Future.get()。我们可以看到,这个方法会阻塞执行,直到任务完成。然而,在我们的例子中,这不会是问题,因为我们会通过调用 isDone() 来检查任务是否完成。

通过使用这两个方法,我们可以在等待主任务完成的同时运行其他代码:

Future<Integer> future = new SquareCalculator().calculate(10);

while(!future.isDone()) {
    System.out.println("Calculating...");
    Thread.sleep(300);
}

Integer result = future.get();

在这个例子中,我们将在输出上写一个简单的消息,让用户知道程序正在进行计算。

get() 方法将阻塞执行,直到任务完成。再次强调,这在我们的示例中不会是问题,因为在我们的示例中,只有在确保任务已经完成后,才会调用 get()。所以在这种情况下,future.get() 总是会立即返回。

值得一提的是,get() 还有一个重载版本,它接受一个超时时间和一个 TimeUnit 作为参数:

Integer result = future.get(500, TimeUnit.MILLISECONDS);

get(long, TimeUnit)get() 的区别在于,如果任务在指定的超时期限之前没有返回,前者将抛出一个 TimeoutException

3.2. 使用cancel()方法取消一个Future

假设我们触发了一个任务,但出于某种原因,我们不再关心结果。我们可以使用 Future.cancel(boolean) 来告诉执行器停止操作并中断其底层线程:

Future<Integer> future = new SquareCalculator().calculate(4);

boolean canceled = future.cancel(true);

上述代码中的 Future 实例将永远不会完成其操作。实际上,如果我们在调用 cancel() 之后,尝试从该实例调用 get(),结果将是一个 CancellationExceptionFuture.isCancelled() 将告诉我们一个 Future 是否已经被取消。这可以很有用,以避免得到一个 CancellationException

也有可能调用 cancel() 失败。在这种情况下,返回的值将为 false。需要注意的是,cancel() 接受一个 boolean 值作为参数。这个参数控制是否应该中断执行任务的线程。

4. 更多的多线程与线程池使用

我们当前的 ExecutorService 是单线程的,因为它是通过 Executors.newSingleThreadExecutor 获取的。为了突出这个单线程,让我们同时触发两个计算:

SquareCalculator squareCalculator = new SquareCalculator();

Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);

while (!(future1.isDone() && future2.isDone())) {
    System.out.println(
      String.format(
        "future1 is %s and future2 is %s", 
        future1.isDone() ? "done" : "not done", 
        future2.isDone() ? "done" : "not done"
      )
    );
    Thread.sleep(300);
}

Integer result1 = future1.get();
Integer result2 = future2.get();

System.out.println(result1 + " and " + result2);

squareCalculator.shutdown();

现在我们来分析下这段代码的输出:

calculating square for: 10
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
calculating square for: 100
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000

很明显,这个过程并不是并行的。我们可以看到,第二个任务只有在第一个任务完成后才开始,使整个过程需要大约2秒钟才能完成。

为了使我们的程序真正地多线程,我们应该使用不同类型的 ExecutorService。让我们看看如果我们使用由工厂方法 Executors.newFixedThreadPool() 提供的线程池,我们的示例的行为会如何改变:

public class SquareCalculator {
 
    private ExecutorService executor = Executors.newFixedThreadPool(2);
    
    //...
}

通过在我们的 SquareCalculator 类中进行简单的更改,我们现在有了一个能够使用2个并行线程的执行器。

如果我们再次运行完全相同的客户端代码,我们将得到以下输出:

calculating square for: 10
calculating square for: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
100 and 10000

现在看起来好多了。我们可以看到这两个任务同时开始并完成,整个过程大约需要1秒钟就能完成。

还有其他工厂方法可以用来创建线程池,如 Executors.newCachedThreadPool(),它在之前使用过的线程可用时会重用它们,以及 Executors.newScheduledThreadPool(),它会在给定的延迟后安排命令运行。

关于 ExecutorService 的更多信息,可以阅读我们专门针对这个主题的文章。

5. ForkJoinTask介绍

ForkJoinTask是一个抽象类,实现了Future接口,并且能够在ForkJoinPool中的少量实际线程中运行大量任务。

ForkJoinTask的主要特点是,它通常会在完成主任务所需的工作中生成新的子任务。它通过调用fork()生成新任务,并通过join()收集所有结果,因此得名。

有两个抽象类实现了ForkJoinTask:

  • RecursiveTask,在完成时返回一个值
  • RecursiveAction,它不返回任何值。

正如它们的名称所暗示的那样,这些类用于递归任务,比如文件系统导航或复杂的数学计算。

让我们扩展我们之前的示例,创建一个类,根据给定的整数,计算其阶乘元素的平方和。例如,如果我们将数字4传递给我们的计算器,我们应该得到4² + 3² + 2² + 1²的结果,即30。

首先,我们需要创建一个RecursiveTask的具体实现,并实现其compute()方法。这是我们编写业务逻辑的地方:

public class FactorialSquareCalculator extends RecursiveTask<Integer> {
 
    private Integer n;

    public FactorialSquareCalculator(Integer n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }

        FactorialSquareCalculator calculator = new FactorialSquareCalculator(n - 1);

        calculator.fork();

        return n * n + calculator.join();
    }
}

注意,在compute()方法内部通过创建FactorialSquareCalculator的新实例来实现递归。通过调用fork(),一个非阻塞的方法,我们请求ForkJoinPool启动这个子任务的执行。

join()方法将返回该计算的结果,我们将其与当前访问的数字的平方相加。

现在我们只需要创建一个ForkJoinPool来处理执行和线程管理:

ForkJoinPool forkJoinPool = new ForkJoinPool();

FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);

forkJoinPool.execute(calculator);

6. 总结

在本文中,我们全面探讨了Future接口,并涉及了它的所有方法。我们还学习了如何利用线程池的能力来触发多个并行操作。ForkJoinTask类的主要方法fork()join()也进行了简要介绍。

我们还有许多其他关于Java中并行和异步操作的精彩文章。以下是其中三篇与Future接口密切相关的文章,其中一些已在本文中提到:

  • 《CompletableFuture指南》- 介绍了Java 8中引入的Future的实现,具有许多额外的功能。
  • 《Java中的Fork/Join框架指南》- 更多关于我们在第5节中介绍的ForkJoinTask的内容。
  • 《Java ExecutorService指南》- 专门介绍了ExecutorService接口。

如往常一样,本文中使用的源代码可以在我们的GitHubGitee仓库中找到。

0

评论区