Baeldung 翻译系列之Java并发基础:
Java中的concurrent包
Java中的Synchronized关键字
Future介绍
ThreadLocal介绍
Java线程的生命周期
如何杀掉一个Java线程
Java中的线程池介绍
实现Runnable接口还是继承Thread类
Java中的wait和notify方法
Runnable vs Callable
wait和sleep的区别
Thread.join方法介绍
Java中使用锁对象
ThreadPoolTaskExecutor中的corePoolSize和maxPoolSize
Java中的异步编程
1.概览
java.util.concurrent
包提供了于创建并发应用程序的工具。
在本文中,我们将对整个包进行概述。
2.主要组件
java.util.concurrent
包包含了太多的功能,无法在一篇文章中进行详细讨论。在本文中,我们将主要关注该包中一些最有用的工具,例如:
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
- BlockingQueue
- DelayQueue
- Locks
- Phaser
您还可以在这里找到许多专门介绍单个类的文章。
2.1. Executor
Executor
是一个表示执行提供的任务的对象的接口。这取决于特定实现(从哪里发起调用)是否应该在新线程或当前线程上运行任务。因此,使用这个接口,我们可以将任务执行流程与实际的任务执行机制解耦(译者:就是说你打包好行李,具体是汽车还是火车还是飞机来给你运走,看你心情,这个在打包行李的时候不做捆绑销售)。
需要注意的一点是,Executor
并不严格要求任务执行是异步的。在最简单的情况下,执行器可以立即在调用线程中调用提交的任务。
我们需要创建一个调用者来创建执行器实例:
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}
那我们现在就可以执行这个任务了
public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
// task to be performed
});
}
需要注意的是,如果执行器无法接受任务进行执行,它将抛出 RejectedExecutionException
异常。
2.2. ExecutorService
ExecutorService
是一种完整的异步处理解决方案。它管理一个内存中的队列,并根据线程的可用性来调度提交的任务。
要使用 ExecutorService
,我们需要创建一个 Runnable
类。
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}
现在我们可以创建 ExecutorService
实例并分配这个任务。在创建时,我们需要指定线程池的大小。
ExecutorService executor = Executors.newFixedThreadPool(10);
如果我们想要创建一个单线程的 ExecutorService
实例,我们可以使用 newSingleThreadExecutor(ThreadFactory threadFactory)
方法来创建实例。
一旦创建了执行器,我们就可以使用它来提交任务。
public void execute() {
executor.submit(new Task());
}
在提交任务的同时,我们也可以创建 Runnable
实例。
executor.submit(() -> {
new Task();
});
ExecutorService
还提供了两种现成的执行终止方法。第一种是 shutdown()
方法,它会等待所有提交的任务执行完成。另一个方法是 shutdownNow()
,它尝试终止所有正在执行的任务并停止等待任务的处理。
还有另一个方法 awaitTermination(long timeout, TimeUnit unit)
,它会强制阻塞,直到所有任务在触发关闭事件或执行超时后完成执行,或者执行线程本身被中断。
try {
executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
e.printStackTrace();
}
2.3. ScheduledExecutorService
ScheduledExecutorService
是与 ExecutorService
类似的接口,但它可以定期执行任务。
Executor
和 ExecutorService
的方法会立即进行调度,而不会引入任何人为的延迟。零或任何负值表示请求需要立即执行。
我们可以使用 Runnable
和 Callable
接口来定义任务。
public void execute() {
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();
Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);
executorService.shutdown();
}
ScheduledExecutorService
也可以在给定的固定延迟之后安排任务执行:
executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
在这里,scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
方法创建并执行一个周期性操作,首先在提供的初始延迟之后调用,然后按照给定的周期进行调用`,直到服务实例关闭。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
方法创建并执行一个周期性操作,首先在提供的初始延迟之后调用,然后在执行一个操作终止和下一个操作调用之间以给定的延迟进行重复调用。
2.4. Future
Future
用于表示异步操作的结果。它提供了一些方法来检查异步操作是否已完成、获取计算结果等(译者:就是一个堆未来的抽象,类似js中的Promise)。
此外,cancel(boolean mayInterruptIfRunning)
方法可以取消操作并释放执行线程。如果 mayInterruptIfRunning
的值为 true
,执行任务的线程将立即终止。
否则,正在进行的任务将被允许完成。
我们可以使用以下代码片段来创建一个 Future
实例:
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}
我们可以使用以下代码片段来检查 Future
的结果是否已准备好,并在计算完成后获取数据:
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
我们还可以为给定的操作指定超时时间。如果任务花费的时间超过这个时间,将抛出 TimeoutException
异常:
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
2.5. CountDownLatch
CountDownLatch
(JDK 5 中引入)是一个实用类,它可以阻塞一组线程,直到某个操作完成。
CountDownLatch
初始化时使用一个计数器(整数类型),这个计数器会随着依赖线程的执行而递减。但是一旦计数器达到零,其他线程就会被释放。
2.6. CyclicBarrier
CyclicBarrier
和 CountDownLatch
工作方式几乎相同,不同之处在于我们可以重用 CyclicBarrier。与
CountDownLatch
不同,它允许多个线程在调用最终任务之前使用 await()
方法(称为屏障条件)互相等待(译者:这俩经常拿来作比较,后面我也会写一些相关文章)。
我们需要创建一个 Runnable
任务实例来触发屏障条件:
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
现在我们可以调用一些线程来竞争屏障条件:
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
在这里,isBroken()
方法检查是否有任何线程在执行期间被中断。我们应该在执行实际处理之前始终进行这个检查。
2.7. Semaphore
Semaphore
用于阻塞线程级别对某个物理或逻辑资源的访问。一个信号量包含一组许可证;当一个线程尝试进入临界区时,它需要检查信号量是否有可用的许可证。
如果没有许可证可用(通过 tryAcquire()
方法),线程将不被允许进入临界区;然而,如果许可证可用,访问将被授予,并且许可证计数减少。
一旦执行线程释放了临界区,许可证计数再次增加(通过 release()
方法完成)。
我们可以使用 tryAcquire(long timeout, TimeUnit unit)
方法为获取访问权限指定超时时间。
我们还可以检查可用许可证的数量或等待获取信号量的线程数量。
以下代码片段可以用来实现一个信号量:
static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}
}
(译者:上大学期间如果学了操作系统原理课程的同学应该会对信号量有印象,它在操作系统作业调度、资源抢占中很常见)
2.8. ThreadFactory
正如其名称所示,ThreadFactory
充当一个线程(不存在的)池,根据需要创建新线程。它消除了为实现高效的线程创建机制而编写大量样板代码的需求。
我们可以定义一个 ThreadFactory
:
public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}
我们可以使用 newThread(Runnable r)
方法在运行时创建一个新线程:
BaeldungThreadFactory factory = new BaeldungThreadFactory(
"BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}
2.9. BlockingQueue
在异步编程中,最常见的集成模式之一是生产者-消费者模式。java.util.concurrent
包提供了一个称为 BlockingQueue
的数据结构,在这些异步场景中非常有用。
(译者:这是个数据结构,看名字就知道是一个阻塞队列,他和上面的各种同步机制不一样,只是一个底层结构,后续的内容中会遇到一些跟它有关的代码)
2.10. DelayQueue
DelayQueue
是一个无限大小的阻塞队列,其中的元素只有在其到期时间(即用户定义的延迟)到达后才能被取出。因此,队列的顶部元素(头部)将具有最长的延迟时间,并且它将在最后被轮询出队。
2.11. 锁(Locks)
不出所料,锁是一种用于阻塞其他线程访问某个代码段的工具,除了当前正在执行它的线程。
锁(Lock)
和 synchronized
块的主要区别在于 synchronized
块完全包含在方法中;然而,我们可以将 Lock API
的 lock()
和 unlock()
操作放在单独的方法中。
2.12. Phaser
Phaser
是一个比 CyclicBarrier
和 CountDownLatch
更灵活的解决方案,用于在执行继续之前等待动态数量的线程。我们可以协调多个执行阶段,并为每个程序阶段重用一个 Phaser
实例。
3. 结论
在这篇高层次的概述文章中,我们专注于介绍了 java.util.concurrent
包中提供的不同工具。
如往常一样,完整的源代码可以在 GitHub 上找到。
(译者:为了方便国内读者,我这边fork了一份到gitee上,这篇文章对应的地址是这个 )
评论区