线程池介绍
实现原理
下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型。
要实现这样一个线程池,有几个问题需要考虑:
- 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
- 线程池中的线程个数是固定的,还是动态变化的?
- 每次提交新任务,是放入队列?还是开新线程?
- 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
针对问题4,有3种做法:
- 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线程池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询
- 不使用阻塞队列,但在队列外部、线程池内部实现了阻塞/唤醒机制
- 使用阻塞队列
很显然,做法3最完善,既避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡眠/轮询带来的资源消耗和延迟。正因为如此,ThreadPoolExector
/ScheduledThreadPoolExecutor
都是基于阻塞队列来实现的,而不是一般的队列,至此,各式各样的阻塞队列就要派上用场了。
线程池的类继承体系
线程池的类继承体系如下图所示:
在这里,有两个核心的类: ThreadPoolExector
和 ScheduledThreadPoolExecutor
,后者不仅可以执行某个任务,还可以周期性地执行任务。
向线程池中提交的每个任务,都必须实现Runnable
接口,通过最上面的Executor
接口中的execute(Runnable command)
向线程池提交任务。
然后,在ExecutorService
中,定义了线程池的关闭接口shutdown()
,还定义了可以有返回值的任务,也就是Callable
。
ThreadPoolExecutor
核心数据结构
基于线程池的实现原理,下面看一下ThreadPoolExector
的核心数据结构:
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 对线程池内部各种变量进行互斥访问控制
private final ReentrantLock mainLock = new ReentrantLock();
// 线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//...
}
每一个线程是一个Worker
对象。Worker
是ThreadPoolExector
的内部类,核心数据结构如下:
private final class Worker extends AbstractQueuedSynchronizer implements
Runnable {
// ...
final Thread thread; // Worker封装的线程
Runnable firstTask; // Worker接收到的第1个任务
volatile long completedTasks; // Worker执行完毕的任务个数
// ...
}
由定义会发现,Worker
继承于AQS,也就是说Worker
本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。
核心配置参数解释
ThreadPoolExecutor
在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池:
上面的各个参数,解释如下:
corePoolSize
:在线程池中始终维护的线程个数maxPoolSize
:在corePoolSize
已满、队列也满的情况下,扩充线程至此值keepAliveTime/TimeUnit
:maxPoolSize
中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize
blockingQueue
:线程池所用的队列类型threadFactory
:线程创建工厂,可以自定义,有默认值Executors.defaultThreadFactory()
RejectedExecutionHandle
r:corePoolSize
已满,队列已满,maxPoolSize
已满,最后的拒绝策略
下面来看这6个配置参数在任务的提交过程中是怎么运作的。在每次往线程池中提交任务的时候,有如下的处理流程:
步骤一:判断当前线程数是否大于或等于corePoolSize
。如果小于,则新建线程执行;如果大于,则进入步骤二。
步骤二:判断队列是否已满。如未满,则放入;如已满,则进入步骤三。
步骤三:判断当前线程数是否大于或等于maxPoolSize
。如果小于,则新建线程执行;如果大于,则进入步骤四。
步骤四:根据拒绝策略,拒绝任务。
总结一下:首先判断corePoolSize
,其次判断blockingQueue
是否已满,接着判断maxPoolSize
,最后使用拒绝策略。
很显然,基于这种流程,如果队列是无界的,将永远没有机会走到步骤三,也即maxPoolSize
没有使用,也一定不会走到步骤四。
线程池的优雅关闭
线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。
线程池的生命周期
在JDK 7中,把线程数量(workerCount
)和线程池状态(runState
)这两个变量打包存储在一个字段里面,即ctl
变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。
由上面的代码可以看到,ctl
变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。线程池的状态有五种,分别是RUNNING
、SHUTDOWN
、STOP
、TIDYING
和TERMINATED
。
下面分析状态之间的迁移过程,如图所示:
线程池有两个关闭方法,shutdown()
和shutdownNow()
,这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING
状态;最后执行一个钩子方法terminated()
,进入TERMINATED
状态,线程池才真正关闭。
这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2
时,接下来只可能迁移到TERMINATED=3
,不可能迁移回STOP=1
或者其他状态。
除terminated()
之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
正确关闭线程池的步骤
关闭线程池的过程为:在调用shutdown()
或者shutdownNow()
之后,线程池并不会立即关闭,接下来需要调用 awaitTermination()
来等待线程池关闭。关闭线程池的正确步骤如下:
// executor.shutdownNow();
executor.shutdown();
try {
boolean flag = true;
do {
flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} while (flag);
} catch (InterruptedException e) {
// ...
}
awaitTermination(…)
方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态TERMINATED
,如果是,就返回;如果不是,则通过termination
条件变量阻塞一段时间,之后继续判断。
shutdown()与shutdownNow()的区别
shutdown()
不会清空任务队列,会等所有任务执行完成,shutdownNow()
清空任务队列shutdown()
只会中断空闲的线程,shutdownNow()
会中断所有线程
下面看一下在上面的代码里中断空闲线程和中断所有线程的区别:
shutdown()
方法中的interruptIdleWorkers()
方法的实现:
关键区别点在tryLock()
:一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock()
如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。
tryLock()
方法:
shutdownNow()
调用了interruptWorkers()
方法:
在上面的代码中,shutdown()
和shutdownNow()
都调用了tryTer
minate()方法,如下所示:
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminate()
不会强行终止线程池,只是做了一下检测:当workerCount
为0,workerQueue
为空时,先把状态切换到TIDYING
,然后调用钩子方法terminated()
。当钩子方法执行完成时,把状态从TIDYING
改为TERMINATED
,接着调用termination.sinaglAll()
,通知前面阻塞在awaitTermination
的所有调用者线程。
所以,TIDYING
和TREMINATED
的区别是在二者之间执行了一个钩子方法terminated()
,目前是一个空实现。
任务的提交过程分析
提交任务的方法如下:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
任务的执行过程分析
在上面的任务提交过程中,可能会开启一个新的Worker
,并把任务本身作为firstTask
赋给该Worker
。但对于一个Worker
来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
下面来看Woker
的run()
方法的实现过程:
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
shutdown()与任务执行过程综合分析
把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调用shutdown()
的时候,可能出现以下几种场景:
- 当调用
shutdown()
的时候,所有线程都处于空闲状态。这意味着任务队列一定是空的。此时,所有线程都会阻塞在getTask()
方法的地方。然后,所有线程都会收到interruptIdleWorkers()
发来的中断信号,getTask()
返回null
,所有Worker
都会退出while
循环,之后执行processWorkerExit
- 当调用
shutdown()
的时候,所有线程都处于忙碌状态。此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()
内部的tryLock
调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()
才会返回null
。之后,就和场景1一样了,退出while
循环 - 当调用
shutdown()
的时候,部分线程忙碌,部分线程空闲。有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在getTask()
方法的地方。空闲的这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理
下面看一下getTask()
方法的内部细节:
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
shutdownNow() 与任务执行过程综合分析
和上面的shutdown()
类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。
当一个Worker
最终退出的时候,会执行清理工作:
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池的4种拒绝策略
在execute(Runnable command)
的最后,调用了reject(command)
执行拒绝策略,代码如下所示:
handler
就是我们可以设置的拒绝策略管理器:
RejectedExecutionHandler
是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是AbortPolicy
。
package java.util.concurrent;
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
ThreadPoolExecutor
类中默认的实现是:
四种策略的实现代码如下:
策略1:调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操作吧:
策略2:线程池抛异常:
策略3:线程池直接丢掉任务,神不知鬼不觉:
策略4:删除队列中最早的任务,将当前任务入队列:
Executors工具类
concurrent包提供了Executors
工具类,利用它可以创建各种不同类型的线程池。
四种对比
单线程的线程池:
固定数目线程的线程池:
每接收一个请求,就创建一个线程来执行:
单线程具有周期调度功能的线程池:
多线程,有调度功能的线程池:
最佳实践
不同类型的线程池,其实都是由前面的几个关键配置参数配置而成的。
在《阿里巴巴Java开发手册》中,明确禁止使用Executors
创建线程池,并要求开发者直接使用ThreadPoolExector
或ScheduledThreadPoolExecutor
进行创建。这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数皆做到心中有数,以规避因使用不当而造成资源耗尽的风险。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
实现了按时间调度来执行任务:
延迟执行任务:
周期执行任务:
区别如下:
AtFixedRate
:按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5sWithFixedDelay
:按固定间隔执行,与任务本身执行时间有关。例如,任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s
延迟执行和周期性执行的原理
ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
,这意味着其内部的数据结构和ThreadPoolExecutor
是基本一样的,那它是如何实现延迟执行任务和周期性执行任务的呢?
延迟执行任务依靠的是DelayQueue
。DelayQueue
是BlockingQueue
的一种,其实现原理是二叉堆。
而周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。
不过这里并没有使用DelayQueue
,而是在ScheduledThreadPoolExecutor
内部又实现了一个特定的DelayQueue
:
其原理和DelayQueue
一样,但针对任务的取消进行了优化。下面主要讲延迟执行和周期性执行的实现过程。
延迟执行
传进去的是一个Runnable
,外加延迟时间delay
。在内部通过decorateTask(…)
方法把Runnable
包装成一个ScheduleFutureTask
对象,而DelayedWorkQueue
中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed
接口。
从上面的代码中可以看出,schedule()
方法本身很简单,就是把提交的Runnable
任务加上delay
时间,转换成ScheduledFutureTask
对象,放入DelayedWorkerQueue
中。任务的执行过程还是复用的ThreadPoolExecutor
,延迟的控制是在DelayedWorkerQueue
内部完成的。
周期性执行
和schedule(…)
方法的框架基本一样,也是包装一个ScheduledFutureTask
对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkerQueue
就结束了。
两个方法的区别在于一个传入的周期是一个负数,另一个传入的周期是一个正数,为什么要这样做呢?
用于生成任务序列号的sequencer
,创建ScheduledFutureTask
的时候使用:
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
private volatile long time;
private final long period;
ScheduledFutureTask(Runnable r, V result, long triggerTime,
long period, long sequenceNumber) {
super(r, result);
this.time = triggerTime; // 延迟时间
this.period = period; // 周期
this.sequenceNumber = sequenceNumber;
}
// 实现Delayed接口
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
// 实现Comparable接口
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
// 延迟时间相等,进一步比较序列号
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
// 实现Runnable接口
public void run() {
if (!canRunInCurrentRunState(this))
cancel(false);
// 如果不是周期执行,则执行一次
else if (!isPeriodic())
super.run();
// 如果是周期执行,则重新设置下一次运行的时间,重新入队列
else if (super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
// 下一次执行时间
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
}
// 下一次触发时间
long triggerTime(long delay) {
return System.nanoTime() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 放到队列中,等待下一次执行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(task)) {
super.getQueue().add(task);
if (canRunInCurrentRunState(task) || !remove(task)) {
ensurePrestart();
return;
}
}
task.cancel(false);
}
withFixedDelay
和atFixedRate
的区别就体现在setNextRunTime
里面。如果是atFixedRate
,period>0
,下一次开始执行时间等于上一次开始执行时间+period
;如果是withFixedDelay
,period < 0
,下一次开始执行时间等于triggerTime(-p)
,为now+(-period)
,now
即上一次执行的结束时间。
以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~
文章评论