Rubin's Blog

  • 首页
  • 关于作者
  • 隐私政策
享受恬静与美好~~~
分享生活的点点滴滴~~~
  1. 首页
  2. 并发编程
  3. 正文

java并发编程之线程池

2022年 1月 1日 719点热度 0人点赞 0条评论

线程池介绍

实现原理

下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型。

要实现这样一个线程池,有几个问题需要考虑:

  1. 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
  2. 线程池中的线程个数是固定的,还是动态变化的?
  3. 每次提交新任务,是放入队列?还是开新线程?
  4. 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?

针对问题4,有3种做法:

  1. 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线程池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询
  2. 不使用阻塞队列,但在队列外部、线程池内部实现了阻塞/唤醒机制
  3. 使用阻塞队列

很显然,做法3最完善,既避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡眠/轮询带来的资源消耗和延迟。正因为如此,ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻塞队列来实现的,而不是一般的队列,至此,各式各样的阻塞队列就要派上用场了。

线程池的类继承体系

线程池的类继承体系如下图所示:

在这里,有两个核心的类: ThreadPoolExector 和 ScheduledThreadPoolExecutor ,后者不仅可以执行某个任务,还可以周期性地执行任务。

向线程池中提交的每个任务,都必须实现Runnable接口,通过最上面的Executor接口中的execute(Runnable command)向线程池提交任务。

然后,在ExecutorService中,定义了线程池的关闭接口shutdown(),还定义了可以有返回值的任务,也就是Callable。

ThreadPoolExecutor

核心数据结构

基于线程池的实现原理,下面看一下ThreadPoolExector的核心数据结构:

public class ThreadPoolExecutor extends AbstractExecutorService {
  //...
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  // 存放任务的阻塞队列
  private final BlockingQueue<Runnable> workQueue;
  // 对线程池内部各种变量进行互斥访问控制
  private final ReentrantLock mainLock = new ReentrantLock();
  // 线程集合
  private final HashSet<Worker> workers = new HashSet<Worker>();
  //...
}

每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:

private final class Worker extends AbstractQueuedSynchronizer implements
 Runnable {
  // ...
  final Thread thread; // Worker封装的线程
  Runnable firstTask; // Worker接收到的第1个任务
  volatile long completedTasks; // Worker执行完毕的任务个数
  // ...
}

由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。

核心配置参数解释

ThreadPoolExecutor在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池:

上面的各个参数,解释如下:

  1. corePoolSize:在线程池中始终维护的线程个数
  2. maxPoolSize:在corePoolSize已满、队列也满的情况下,扩充线程至此值
  3. keepAliveTime/TimeUnit:maxPoolSize中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize
  4. blockingQueue:线程池所用的队列类型
  5. threadFactory:线程创建工厂,可以自定义,有默认值Executors.defaultThreadFactory()
  6. RejectedExecutionHandler:corePoolSize已满,队列已满,maxPoolSize已满,最后的拒绝策略

下面来看这6个配置参数在任务的提交过程中是怎么运作的。在每次往线程池中提交任务的时候,有如下的处理流程:

步骤一:判断当前线程数是否大于或等于corePoolSize。如果小于,则新建线程执行;如果大于,则进入步骤二。

步骤二:判断队列是否已满。如未满,则放入;如已满,则进入步骤三。

步骤三:判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,则进入步骤四。

步骤四:根据拒绝策略,拒绝任务。

总结一下:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。

很显然,基于这种流程,如果队列是无界的,将永远没有机会走到步骤三,也即maxPoolSize没有使用,也一定不会走到步骤四。

线程池的优雅关闭

线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。

线程池的生命周期

在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面,即ctl变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。

由上面的代码可以看到,ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。

下面分析状态之间的迁移过程,如图所示:

线程池有两个关闭方法,shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才真正关闭。

这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。

除terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

正确关闭线程池的步骤

关闭线程池的过程为:在调用shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用 awaitTermination()来等待线程池关闭。关闭线程池的正确步骤如下:

// executor.shutdownNow();
executor.shutdown();
try {
  boolean flag = true;
  do {
    flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
   } while (flag);
} catch (InterruptedException e) {
  // ...
}

awaitTermination(…)方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,之后继续判断。

shutdown()与shutdownNow()的区别

  • shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列
  • shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程

下面看一下在上面的代码里中断空闲线程和中断所有线程的区别:

shutdown()方法中的interruptIdleWorkers()方法的实现:

关键区别点在tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。

tryLock()方法:

shutdownNow()调用了interruptWorkers()方法:

在上面的代码中,shutdown()和shutdownNow()都调用了tryTerminate()方法,如下所示:

/**
  * Transitions to TERMINATED state if either (SHUTDOWN and pool
  * and queue empty) or (STOP and pool empty).  If otherwise
  * eligible to terminate but workerCount is nonzero, interrupts an
  * idle worker to ensure that shutdown signals propagate. This
  * method must be called following any action that might make
  * termination possible -- reducing worker count or removing tasks
  * from the queue during shutdown. The method is non-private to
  * allow access from ScheduledThreadPoolExecutor.
  */
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子方法terminated()。当钩子方法执行完成时,把状态从TIDYING改为TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。

所以,TIDYING和TREMINATED的区别是在二者之间执行了一个钩子方法terminated(),目前是一个空实现。

任务的提交过程分析

提交任务的方法如下:

/**
  * Executes the given task sometime in the future.  The task
  * may execute in a new thread or in an existing pooled thread.
  *
  * If the task cannot be submitted for execution, either because this
  * executor has been shutdown or because its capacity has been reached,
  * the task is handled by the current {@code RejectedExecutionHandler}.
  *
  * @param command the task to execute
  * @throws RejectedExecutionException at discretion of
  *         {@code RejectedExecutionHandler}, if the task
  *         cannot be accepted for execution
  * @throws NullPointerException if {@code command} is null
  */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

/**
  * Checks if a new worker can be added with respect to current
  * pool state and the given bound (either core or maximum). If so,
  * the worker count is adjusted accordingly, and, if possible, a
  * new worker is created and started, running firstTask as its
  * first task. This method returns false if the pool is stopped or
  * eligible to shut down. It also returns false if the thread
  * factory fails to create a thread when asked.  If the thread
  * creation fails, either due to the thread factory returning
  * null, or due to an exception (typically OutOfMemoryError in
  * Thread.start()), we roll back cleanly.
  *
  * @param firstTask the task the new thread should run first (or
  * null if none). Workers are created with an initial first task
  * (in method execute()) to bypass queuing when there are fewer
  * than corePoolSize threads (in which case we always start one),
  * or when the queue is full (in which case we must bypass queue).
  * Initially idle threads are usually created via
  * prestartCoreThread or to replace other dying workers.
  *
  * @param core if true use corePoolSize as bound, else
  * maximumPoolSize. (A boolean indicator is used here rather than a
  * value to ensure reads of fresh values after checking other pool
  * state).
  * @return true if successful
  */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

任务的执行过程分析

在上面的任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。

下面来看Woker的run()方法的实现过程:

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

/**
  * Main worker run loop.  Repeatedly gets tasks from queue and
  * executes them, while coping with a number of issues:
  *
  * 1. We may start out with an initial task, in which case we
  * don't need to get the first one. Otherwise, as long as pool is
  * running, we get tasks from getTask. If it returns null then the
  * worker exits due to changed pool state or configuration
  * parameters.  Other exits result from exception throws in
  * external code, in which case completedAbruptly holds, which
  * usually leads processWorkerExit to replace this thread.
  *
  * 2. Before running any task, the lock is acquired to prevent
  * other pool interrupts while the task is executing, and then we
  * ensure that unless pool is stopping, this thread does not have
  * its interrupt set.
  *
  * 3. Each task run is preceded by a call to beforeExecute, which
  * might throw an exception, in which case we cause thread to die
  * (breaking loop with completedAbruptly true) without processing
  * the task.
  *
  * 4. Assuming beforeExecute completes normally, we run the task,
  * gathering any of its thrown exceptions to send to afterExecute.
  * We separately handle RuntimeException, Error (both of which the
  * specs guarantee that we trap) and arbitrary Throwables.
  * Because we cannot rethrow Throwables within Runnable.run, we
  * wrap them within Errors on the way out (to the thread's
  * UncaughtExceptionHandler).  Any thrown exception also
  * conservatively causes thread to die.
  *
  * 5. After task.run completes, we call afterExecute, which may
  * also throw an exception, which will also cause thread to
  * die. According to JLS Sec 14.20, this exception is the one that
  * will be in effect even if task.run throws.
  *
  * The net effect of the exception mechanics is that afterExecute
  * and the thread's UncaughtExceptionHandler have as accurate
  * information as we can provide about any problems encountered by
  * user code.
  *
  * @param w the worker
  */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

shutdown()与任务执行过程综合分析

把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调用shutdown()的时候,可能出现以下几种场景:

  • 当调用shutdown()的时候,所有线程都处于空闲状态。这意味着任务队列一定是空的。此时,所有线程都会阻塞在 getTask()方法的地方。然后,所有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都会退出while循环,之后执行processWorkerExit
  • 当调用shutdown()的时候,所有线程都处于忙碌状态。此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null。之后,就和场景1一样了,退出while循环
  • 当调用shutdown()的时候,部分线程忙碌,部分线程空闲。有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask()方法的地方。空闲的这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理

下面看一下getTask()方法的内部细节:

/**
  * Performs blocking or timed wait for a task, depending on
  * current configuration settings, or returns null if this worker
  * must exit because of any of:
  * 1. There are more than maximumPoolSize workers (due to
  *    a call to setMaximumPoolSize).
  * 2. The pool is stopped.
  * 3. The pool is shutdown and the queue is empty.
  * 4. This worker timed out waiting for a task, and timed-out
  *    workers are subject to termination (that is,
  *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
  *    both before and after the timed wait, and if the queue is
  *    non-empty, this worker is not the last thread in the pool.
  *
  * @return task, or null if the worker must exit, in which case
  *         workerCount is decremented
  */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

shutdownNow() 与任务执行过程综合分析

和上面的shutdown()类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。

当一个Worker最终退出的时候,会执行清理工作:

/**
  * Performs blocking or timed wait for a task, depending on
  * current configuration settings, or returns null if this worker
  * must exit because of any of:
  * 1. There are more than maximumPoolSize workers (due to
  *    a call to setMaximumPoolSize).
  * 2. The pool is stopped.
  * 3. The pool is shutdown and the queue is empty.
  * 4. This worker timed out waiting for a task, and timed-out
  *    workers are subject to termination (that is,
  *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
  *    both before and after the timed wait, and if the queue is
  *    non-empty, this worker is not the last thread in the pool.
  *
  * @return task, or null if the worker must exit, in which case
  *         workerCount is decremented
  */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

线程池的4种拒绝策略

在execute(Runnable command)的最后,调用了reject(command)执行拒绝策略,代码如下所示:

handler就是我们可以设置的拒绝策略管理器:

RejectedExecutionHandler是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是AbortPolicy。

package java.util.concurrent;
public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

ThreadPoolExecutor类中默认的实现是:

四种策略的实现代码如下:

策略1:调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操作吧:

策略2:线程池抛异常:

策略3:线程池直接丢掉任务,神不知鬼不觉:

策略4:删除队列中最早的任务,将当前任务入队列:

Executors工具类

concurrent包提供了Executors工具类,利用它可以创建各种不同类型的线程池。

四种对比

单线程的线程池:

固定数目线程的线程池:

每接收一个请求,就创建一个线程来执行:

单线程具有周期调度功能的线程池:

多线程,有调度功能的线程池:

最佳实践

不同类型的线程池,其实都是由前面的几个关键配置参数配置而成的。

在《阿里巴巴Java开发手册》中,明确禁止使用Executors创建线程池,并要求开发者直接使用ThreadPoolExector或ScheduledThreadPoolExecutor进行创建。这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数皆做到心中有数,以规避因使用不当而造成资源耗尽的风险。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor实现了按时间调度来执行任务:

延迟执行任务:

周期执行任务:

区别如下:

  • AtFixedRate:按固定频率执行,与任务本身执行时间无关。但有个前提条件,任务执行时间必须小于间隔时间,例如间隔时间是5s,每5s执行一次任务,任务的执行时间必须小于5s
  • WithFixedDelay:按固定间隔执行,与任务本身执行时间有关。例如,任务本身执行时间是10s,间隔2s,则下一次开始执行的时间就是12s

延迟执行和周期性执行的原理

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,这意味着其内部的数据结构和ThreadPoolExecutor是基本一样的,那它是如何实现延迟执行任务和周期性执行任务的呢?

延迟执行任务依靠的是DelayQueue。DelayQueue是BlockingQueue的一种,其实现原理是二叉堆。

而周期性执行任务是执行完一个任务之后,再把该任务扔回到任务队列中,如此就可以对一个任务反复执行。

不过这里并没有使用DelayQueue,而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue:

其原理和DelayQueue一样,但针对任务的取消进行了优化。下面主要讲延迟执行和周期性执行的实现过程。

延迟执行

传进去的是一个Runnable,外加延迟时间delay。在内部通过decorateTask(…)方法把Runnable包装成一个ScheduleFutureTask对象,而DelayedWorkQueue中存放的正是这种类型的对象,这种类型的对象一定实现了Delayed接口。

从上面的代码中可以看出,schedule()方法本身很简单,就是把提交的Runnable任务加上delay时间,转换成ScheduledFutureTask对象,放入DelayedWorkerQueue中。任务的执行过程还是复用的ThreadPoolExecutor,延迟的控制是在DelayedWorkerQueue内部完成的。

周期性执行

和schedule(…)方法的框架基本一样,也是包装一个ScheduledFutureTask对象,只是在延迟时间参数之外多了一个周期参数,然后放入DelayedWorkerQueue就结束了。

两个方法的区别在于一个传入的周期是一个负数,另一个传入的周期是一个正数,为什么要这样做呢?

用于生成任务序列号的sequencer,创建ScheduledFutureTask的时候使用:

private class ScheduledFutureTask<V>
 extends FutureTask<V> implements RunnableScheduledFuture<V> {
  private final long sequenceNumber;
  private volatile long time;
  private final long period;
 
  ScheduledFutureTask(Runnable r, V result, long triggerTime,
 long period, long sequenceNumber) {
    super(r, result);
    this.time = triggerTime; // 延迟时间
    this.period = period; // 周期
    this.sequenceNumber = sequenceNumber;
   }
 
  // 实现Delayed接口
  public long getDelay(TimeUnit unit) {
    return unit.convert(time - System.nanoTime(), NANOSECONDS);
   }
 
  // 实现Comparable接口
  public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      if (diff < 0)
        return -1;
      else if (diff > 0)
        return 1;
      // 延迟时间相等,进一步比较序列号
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
     }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
   }
 
  // 实现Runnable接口
  public void run() {
    if (!canRunInCurrentRunState(this))
      cancel(false);
    // 如果不是周期执行,则执行一次
    else if (!isPeriodic())
      super.run();
       // 如果是周期执行,则重新设置下一次运行的时间,重新入队列
    else if (super.runAndReset()) {
      setNextRunTime();
      reExecutePeriodic(outerTask);
     }
   }
 
  // 下一次执行时间
  private void setNextRunTime() {
    long p = period;
    if (p > 0)
      time += p;
    else
      time = triggerTime(-p);
   }
}

// 下一次触发时间
long triggerTime(long delay) {
  return System.nanoTime() +
   ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

// 放到队列中,等待下一次执行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  if (canRunInCurrentRunState(task)) {
    super.getQueue().add(task);
    if (canRunInCurrentRunState(task) || !remove(task)) {
      ensurePrestart();
      return;
     }
   }
  task.cancel(false);
}

withFixedDelay和atFixedRate的区别就体现在setNextRunTime里面。如果是atFixedRate,period>0,下一次开始执行时间等于上一次开始执行时间+period;如果是withFixedDelay,period < 0,下一次开始执行时间等于triggerTime(-p),为now+(-period),now即上一次执行的结束时间。

以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: 并发编程
最后更新:2022年 6月 9日

RubinChu

一个快乐的小逗比~~~

打赏 点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复
文章目录
  • 线程池介绍
    • 实现原理
    • 线程池的类继承体系
  • ThreadPoolExecutor
    • 核心数据结构
    • 核心配置参数解释
    • 线程池的优雅关闭
      • 线程池的生命周期
      • 正确关闭线程池的步骤
      • shutdown()与shutdownNow()的区别
    • 任务的提交过程分析
    • 任务的执行过程分析
      • shutdown()与任务执行过程综合分析
      • shutdownNow() 与任务执行过程综合分析
    • 线程池的4种拒绝策略
  • Executors工具类
    • 四种对比
    • 最佳实践
  • ScheduledThreadPoolExecutor
    • 延迟执行和周期性执行的原理
    • 延迟执行
    • 周期性执行
最新 热点 随机
最新 热点 随机
问题记录之Chrome设置屏蔽Https禁止调用Http行为 问题记录之Mac设置软链接 问题记录之JDK8连接MySQL数据库失败 面试系列之自我介绍 面试总结 算法思维
JVM类加载机制详解 java面试系列之反射 SpringCloud Netflix之Feign远程调用组件 Netty进阶 Docker之docker-compose RocketMQ之高级特性及原理

COPYRIGHT © 2021 rubinchu.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

京ICP备19039146号-1