侧边栏壁纸
博主头像
翻斗

开始一件事最好是昨天,其次是现在

  • 累计撰写 44 篇文章
  • 累计创建 42 个标签
  • 累计收到 2 条评论

Java中的线程池介绍

翻斗
2020-03-07 / 0 评论 / 0 点赞 / 490 阅读 / 9,331 字

Baeldung 翻译系列之Java并发基础:

Java中的concurrent包
Java中的Synchronized关键字
Future介绍
ThreadLocal介绍
Java线程的生命周期
如何杀掉一个Java线程
Java中的线程池介绍
实现Runnable接口还是继承Thread类
Java中的wait和notify方法
Runnable vs Callable
wait和sleep的区别
Thread.join方法介绍
Java中使用锁对象
ThreadPoolTaskExecutor中的corePoolSize和maxPoolSize
Java中的异步编程

1. 概览

这个教程是关于Java中的线程池的探讨。我们将从标准Java库中的不同实现开始,然后看一下Google的Guava库。

2. 线程池的概念

在Java中,线程被映射到系统级线程,这些线程是操作系统的资源。如果我们无节制地创建线程,我们可能会很快耗尽这些资源。

操作系统也会在线程之间进行上下文切换 —— 以模拟并行性。简单来说,我们创建的线程越多,每个线程实际工作的时间就越少。

线程池模式有助于在多线程应用中节省资源,并将并行性限制在一定的预定义范围内。

当我们使用线程池时,我们将并发代码写成并行任务的形式,并将它们提交给一个线程池的实例进行执行。这个实例控制几个重复使用的线程来执行这些任务。

这种模式允许我们控制应用程序创建的线程数量及其生命周期。我们还能够调度任务的执行并将即将到来的任务保留在队列中。

译者:各种xx池的用法,都是差不多的理念,提前准备一些资源,集中管理这些资源,这样会减少各种调度使用时间

3. Java中的线程池

3.1. Executors, Executor 以及 ExecutorService

Executors 辅助类包含了几个用于创建预配置线程池实例的方法。这些类是一个好的起点。如果我们不需要应用任何自定义的微调,我们可以使用它们。

我们使用 ExecutorExecutorService 接口来处理 Java 中的不同线程池实现。通常,我们应该让我们的代码与线程池的实际实现解耦,并在我们的应用程序中使用这些接口。

3.1.1. Executor

译者:ExecutorExecutors就差一个字母,根据我们学英文的基础,这个s表示复数形式,这里也一样,Executors就是代表内置的多个Executor,而Executor就是单独的一个执行器

Executor 接口有一个单独的 execute 方法来提交 Runnable 实例以供执行。

让我们看一个快速的例子,如何使用 Executors API 获取一个由单个线程池支持的 Executor 实例,并使用一个无界队列来顺序执行任务。

在这里,我们运行一个单一的任务,它只是在屏幕上打印 “Hello World”。我们将任务作为一个 lambda(Java 8 的特性)提交,它被推断为 Runnable

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));

3.1.2. ExecutorService

ExecutorService 接口包含大量的方法来控制任务的进度和管理服务的终止。使用这个接口,我们可以提交任务以供执行,也可以使用返回的 Future 实例来控制它们的执行。

现在我们将创建一个 ExecutorService,提交一个任务,然后使用返回的 Futureget 方法等待提交的任务完成并返回值:

ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");
// some operations
String result = future.get();

当然,在实际场景中,我们通常不希望立即调用 future.get(),而是推迟调用它,直到我们实际需要计算的值。

在这里,我们重载了 submit 方法,使其可以接受 RunnableCallable。这两者都是函数式接口,我们可以将它们作为 lambda(从 Java 8 开始)传递。

Runnable 的单个方法不会抛出异常,也不会返回值。Callable 接口可能更方便,因为它允许我们抛出异常并返回值。

最后,为了让编译器推断出 Callable 类型,只需从 lambda 返回一个值。

如果想要查看更多关于使用 ExecutorService 接口和 futures 的例子,可以参考《Java ExecutorService 指南》。

3.2. ThreadPoolExecutor

ThreadPoolExecutor 是一个可扩展的线程池实现,具有许多参数和用于微调的钩子。

我们将在这里讨论的主要配置参数是 corePoolSize``,maximumPoolSizekeepAliveTime

译者:其他几个参数也很重要,特别是 workQueue,理解这个参数的含义会让你更加地理解线程池的底层原理

线程池由一定数量的核心线程组成,这些线程始终保持在内部。它还包含一些可能被创建并在不再需要时终止的额外线程。

corePoolSize 参数是将被实例化并保留在池中的核心线程的数量。当新任务到来时,如果所有核心线程都忙碌并且内部队列已满,池被允许增长到 maximumPoolSize

keepAliveTime 参数是允许超出核心线程(超出 corePoolSize 实例化)在空闲状态存在的时间间隔。默认情况下,ThreadPoolExecutor 只考虑移除非核心线程。为了将相同的移除策略应用于核心线程,我们可以使用 allowCoreThreadTimeOut(true) 方法。

这些参数涵盖了广泛的使用案例,但最典型的配置在 Executors 的静态方法中已经预定义

3.2.1. newFixedThreadPool

让我们看一个例子,newFixedThreadPool 方法创建了一个 ThreadPoolExecutor,其 corePoolSizemaximumPoolSize 参数值相等,keepAliveTime 为零。这意味着这个线程池中的线程数量始终相同

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());

在这里,我们实例化了一个固定线程数为2的ThreadPoolExecutor。这意味着如果同时运行的任务数量始终小于或等于两个,它们会立即执行。否则,这些任务中的一些可能会被放入队列等待轮到他们(译者:这就是前面说的工作队列 workQueue)。

我们创建了三个 Callable 任务,它们通过睡眠1000毫秒来模拟重型工作。前两个任务将立即运行,第三个任务将不得不在队列中等待。我们可以通过在提交任务后立即调用 getPoolSize()getQueue().size() 方法来验证这一点。

3.2.2. Executors.newCachedThreadPool()

我们可以使用 Executors.newCachedThreadPool() 方法创建另一个预配置的 ThreadPoolExecutor。这个方法根本不接收线程数量。我们将 corePoolSize 设置为0,将 maximumPoolSize 设置为 Integer.MAX_VALUE``。最后,keepAliveTime 是60秒:

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());

这些参数值意味着缓存线程池可能无限制地增长,以适应任何数量的提交任务。但是当线程不再需要时,它们将在60秒的不活动时间后被处理掉。一个典型的用例是我们的应用程序中有很多短期存在的任务

队列大小将始终为零,因为内部使用了一个 SynchronousQueue 实例。在 SynchronousQueue 中,插入和删除操作总是同时发生。所以,队列实际上从不包含任何东西。

3.2.3. Executors.newSingleThreadExecutor()

Executors.newSingleThreadExecutor() API 创建了另一种典型形式的 ThreadPoolExecutor,其中包含一个单独的线程。单线程执行器非常适合创建事件循环。corePoolSizemaximumPoolSize 参数等于1,keepAliveTime 为0。

在下面的例子中,任务将按顺序运行,所以在任务完成后,标志值将为2:

AtomicInteger counter = new AtomicInteger();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    counter.set(1);
});
executor.submit(() -> {
    counter.compareAndSet(1, 2);
});

另外,这个 ThreadPoolExecutor 被装饰成了一个不可变的包装器,所以在创建后不能重新配置。注意,这也是我们不能将其转换为 ThreadPoolExecutor 的原因。

3.3. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 扩展了 ThreadPoolExecutor 类,并实现了 ScheduledExecutorService 接口,增加了几个额外的方法:

  • schedule 方法允许我们在指定的延迟后运行一次任务。
  • scheduleAtFixedRate 方法允许我们在指定的初始延迟后运行任务,然后以一定的周期重复运行。period 参数是任务开始时间之间的时间,所以执行率是固定的。
  • scheduleWithFixedDelay 方法类似于 scheduleAtFixedRate,它重复运行给定的任务,但指定的延迟是在上一个任务结束和下一个任务开始之间测量的。执行率可能会根据运行任何给定任务所需的时间而变化。
    我们通常使用 Executors.newScheduledThreadPool() 方法来创建一个具有给定 corePoolSize、无界 maximumPoolSize 和零 keepAliveTimeScheduledThreadPoolExecutor

以下是如何安排一个任务在500毫秒后执行的方法:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
    System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

以下代码展示了如何在500毫秒的延迟后运行一个任务,然后每100毫秒重复一次。安排任务后,我们使用 CountDownLatch 锁等待它触发三次。然后我们使用 Future.cancel() 方法取消它:

CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
    System.out.println("Hello World");
    lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool 是 Java 7 引入的 fork/join 框架的核心部分。它解决了在递归算法中产生多个任务的常见问题。如果使用简单的 ThreadPoolExecutor,我们很快就会耗尽线程,因为每个任务或子任务都需要自己的线程来运行。

译者:递归不可控,如果每次都产生多个任务,那么会制造非常多任务,消耗非常多线程

fork/join 框架中,任何任务都可以产生(fork)一些子任务,并使用 join 方法等待它们的完成。fork/join 框架的好处是它不为每个任务或子任务创建新的线程,而是实现了工作窃取算法。

让我们看一个简单的例子,使用 ForkJoinPool 遍历一个节点树,并计算所有叶子节点值的总和。以下是一个简单的树实现,由一个节点、一个 int 值和一组子节点组成:

static class TreeNode {

    int value;

    Set<TreeNode> children;

    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children);
    }
}

如果我们想并行计算树中所有值的总和,我们需要实现 RecursiveTask<Integer> 接口。每个任务接收自己的节点并将其值加到其子节点值的总和中。为了计算子节点值的总和,任务实现会执行以下操作:

  • 流式处理子节点集
  • 在此流上进行映射,为每个元素创建一个新的 CountingTask
  • 使用新的线程(fork)运行每个子任务
  • 通过在每个子任务上调用 join 方法来收集结果
  • 使用 Collectors.summingInt 收集器对结果进行求和
public static class CountingTask extends RecursiveTask<Integer> {

    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
          .map(childNode -> new CountingTask(childNode).fork())
          .collect(Collectors.summingInt(ForkJoinTask::join));
    }
}

实际运行计算的代码非常简单:

TreeNode tree = new TreeNode(5,
  new TreeNode(3), new TreeNode(2,
    new TreeNode(2), new TreeNode(8)));

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Guava中的线程池实现

Guava 是一个受欢迎的 Google 实用程序库。它有许多有用的并发类,包括几个方便的 ExecutorService 的实现。实现类不能直接实例化或子类化,所以创建它们的实例的唯一入口点是 MoreExecutors 辅助类。

4.1. 使用maven添加Guava依赖

我们在 Maven pom 文件中添加以下依赖,将 Guava 库包含到我们的项目中。在 Maven Central Repository 中找到 Guava 库的最新版本:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0.1-jre</version>
</dependency>

4.2. Direct Executor 和 Direct Executor Service

有时我们想根据一些条件在当前线程或线程池中运行任务。我们更愿意使用单一的 Executor 接口并只切换实现。尽管实现一个在当前线程中运行任务的 ExecutorExecutorService 并不难,但这仍然需要编写一些样板代码。

幸运的是,Guava 为我们提供了预定义的实例。

以下是一个示例,演示了在同一线程中执行任务。尽管提供的任务需要睡眠500毫秒,但它阻塞了当前线程,结果在 execute 调用完成后立即可用:

Executor executor = MoreExecutors.directExecutor();

AtomicBoolean executed = new AtomicBoolean();

executor.execute(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executed.set(true);
});

assertTrue(executed.get());

directExecutor() 方法返回的实例实际上是一个静态单例,所以使用这个方法在对象创建上根本没有任何开销。

我们应该优先选择这个方法而不是 MoreExecutors.newDirectExecutorService(),因为那个 API 在每次调用时都会创建一个全功能的executor service 实现。

4.3. exitingExecutorService

另一个常见问题是在线程池仍在运行其任务时关闭虚拟机。即使有取消机制,也不能保证任务会在执行器服务关闭时停止工作。这可能导致 JVM 无限期地挂起,而任务继续执行

为了解决这个问题,Guava 引入了一系列的exitingExecutorService 。它们基于守护线程,与 JVM 一起终止。

这些服务还通过 Runtime.getRuntime().addShutdownHook() 方法添加了一个关闭钩子,并阻止 VM 在放弃挂起任务之前等待配置的时间。

在下面的例子中,我们提交了一个包含无限循环的任务,但我们使用了一个exitingExecutorService ,配置了在 VM 终止时可以等待任务100毫秒。

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService = 
  MoreExecutors.getExitingExecutorService(executor, 
    100, TimeUnit.MILLISECONDS);

executorService.submit(() -> {
    while (true) {
    }
});

如果没有设置 exitingExecutorService,这个任务将导致虚拟机无限期地挂起。

4.4. Listening Decorators

Listening decorators 允许我们包装 ExecutorService,并在任务提交时接收 ListenableFuture 实例,而不是简单的 Future 实例。ListenableFuture 接口扩展了 Future,增加了一个名为 addListener 的方法。这个方法允许添加一个在 future 完成时被调用的监听器。

我们很少直接使用 ListenableFuture.addListener() 方法。但是,它对 Futures 工具类中的大多数辅助方法都是必要的。

例如,使用 Futures.allAsList() 方法,我们可以将几个 ListenableFuture 实例合并在一个单独的 ListenableFuture 中,该 future 在所有合并的 futures 成功完成时完成:

ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = 
  MoreExecutors.listeningDecorator(executorService);

ListenableFuture<String> future1 = listeningExecutorService.submit(() -> "Hello");
ListenableFuture<String> future2 = listeningExecutorService.submit(() -> "World");

String greeting = Futures.allAsList(future1, future2).get()
  .stream()
  .collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);

5. 总结

在这篇文章中,我们讨论了线程池模式及其在标准 Java 库和 Google 的 Guava 库中的实现。

文章的源代码在GitHub 或者 Gitee上可以获取。

0

评论区