为什么要使用线程池?

(一)提高程序的执行效率

如果程序中有大量短时间任务的线程任务,由于创建和销毁线程需要和底层操作系统交互,大量时间都耗费在创建和销毁线程上,因而比较浪费时间,系统效率很低

而线程池里的每一个线程任务结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用,因而借助线程池可以提高程序的执行效率

(二)控制线程的数量,防止程序崩溃(线程过多内存溢出)

如果不加限制地创建和启动线程很容易造成程序崩溃,比如高并发1000W个线程,JVM就需要有保存1000W个线程的空间,这样极易出现内存溢出
线程池中线程数量是一定的,可以有效避免出现内存溢出

使用线程池的好处:


  1.降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n多个子任务需要执行,如果我们为每个子任务都创建一个执行线程,而创建线程的过程是需要一定的系统消耗的,最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程,任务有多个,但执行任务的线程可以通过线程池来复用,这样减少了创建线程的开销,系统资源利用率得到了提升。

  2.降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的,而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程,如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。

  3.提升任务处理速度。线程池中长期驻留了一定数量的活线程,当任务需要执行时,我们不必先去创建线程,线程池会自己选择利用现有的活线程来处理任务。


很显然,线程池一个很显著的特征就是“长期驻留了一定数量的活线程”,避免了频繁创建线程和销毁线程的开销,那么它是如何做到的呢?我们知道一个线程只要执行完了run()方法内的代码,这个线程的使命就完成了,等待它的就是销毁。既然这是个“活线程”,自然是不能很快就销毁的。为了搞清楚这个“活线程”是如何工作的,下面通过追踪源码来看看能不能解开这个疑问。


分析方法

在分析源码之前先来思考一下要怎么去分析,源码往往是比较复杂的,如果知识储备不够丰厚,很有可能会读不下去,或者读岔了。一般来讲要时刻紧跟着自己的目标来看代码,跟目标关系不大的代码可以不理会它,一些异常的处理也可以暂不理会,先看正常的流程。就我们现在要分析的源码而言,目标就是看看线程是如何被复用的。那么对于线程池的状态的管理以及非正常状态下的处理代码就可以不理会,具体来讲,在ThreadPollExcutor类中,有一个字段 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 是对线程池的运行状态和线程池中有效线程的数量进行控制的, 它包含两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),还有几个对ctl进行计算的方法:

// 获取运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 获取活动线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }

以上两个方法在源码中经常用到,结合我们的目标,对运行状态的一些判断及处理可以不用去管,而对当前活动线程数要加以关注等等。

下面将遵循这些原则来分析源码。

解惑

当我们要向线程池添加一个任务时是调用ThreadPollExcutor对象的execute(Runnable command)方法来完成的,所以先来看看ThreadPollExcutor类中的execute(Runnable command)方法的源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

按照我们在分析方法中提到的一些原则,去掉一些相关性不强的代码,看看核心代码是怎样的

// 为分析而简化后的代码
public void execute(Runnable command) {

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中,并把任务添加到该线程中
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 如果当前活动线程数大于等于corePoolSize,则尝试将任务放入缓存队列
    if (workQueue.offer(command)) {
        int recheck = ctl.get();
        if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }else {
        // 缓存已满,新建一个线程放入线程池,并把任务添加到该线程中(此时新建的线程相当于非核心线程)
        addWorker(command, false)
    }
}

这样一看,逻辑应该清晰很多了。

 1.如果 当前活动线程数 < 指定的核心线程数,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);

 2.如果 当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;

 3.如果 当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);

接下来看 addWorker(Runnable firstTask, boolean core)方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

同样,我们也来简化一下:

// 为分析而简化后的代码
private boolean addWorker(Runnable firstTask, boolean core) {

    int wc = workerCountOf(c);
    if (wc >= (core ? corePoolSize : maximumPoolSize))
        // 如果当前活动线程数 >= 指定的核心线程数,不创建核心线程
        // 如果当前活动线程数 >= 指定的最大线程数,不创建非核心线程   
        return false;

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 新建一个Worker,将要执行的任务作为参数传进去
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            workers.add(w);
            workerAdded = true;
            if (workerAdded) {
                // 启动刚刚新建的那个worker持有的线程,等下要看看这个线程做了啥
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

看到这里,我们大概能猜测到,addWorker方法的功能就是新建一个线程并启动这个线程,要执行的任务应该就是在这个线程中执行。为了证实我们的这种猜测需要再来看看Worker这个类。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{
    // ....
}

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

从上面的Worker类的声明可以看到,它实现了Runnable接口,以及从它的构造方法中可以知道待执行的任务赋值给了它的变量firstTask,并以它自己为参数新建了一个线程赋值给它的变量thread,那么运行这个线程的时候其实就是执行Worker的run()方法,来看一下这个方法:

    public void run() {
        runWorker(this);
    }

    final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

在run()方法中只调了一下 runWorker(this) 方法,再来简化一下这个 runWorker() 方法

// 为分析而简化后的代码
final void runWorker(Worker w) {
    Runnable task = w.firstTask;
    w.firstTask = null;
    while (task != null || (task = getTask()) != null) {
            try {
                task.run();
            } finally {
                task = null;
            }
        }
}

很明显,runWorker()方法里面执行了我们新建Worker对象时传进去的待执行的任务,到这里为止貌似这个worker的run()方法就执行完了,既然执行完了那么这个线程也就没用了,只有等待虚拟机销毁了。那么回顾一下我们的目标:Java线程池中的核心线程是如何被重复利用的?好像并没有重复利用啊,新建一个线程,执行一个任务,然后就结束了,销毁了。没什么特别的啊,难道有什么地方漏掉了,被忽略了?再仔细看一下runWorker()方法的代码,有一个while循环,当执行完firstTask后task==null了,那么就会执行判断条件 (task = getTask()) != null,我们假设这个条件成立的话,那么这个线程就不止只执行一个任务了,可以执行多个任务了,也就实现了重复利用了。答案呼之欲出了,接着看getTask()方法

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

老规矩,简化一下代码来看:

// 为分析而简化后的代码
private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int wc = workerCountOf(c);

        // timed变量用于判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (timed && timedOut) {
            // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,
            // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

            // 注意workQueue中的poll()方法与take()方法的区别
            //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null
            //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止

            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

从以上代码可以看出,getTask()的作用是

  1.如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程。

  2.如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

小结

通过以上的分析,应该算是比较清楚地解答了“线程池中的核心线程是如何被重复利用的”这个问题,同时也对线程池的实现机制有了更进一步的理解:

  1.当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。

  2.当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。


最后上工具类代码

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author: BUG汇总
 * @description: 线程池工具类
 * @className: ThreadHelper
 * @createDate: 2020-06-09 11:34:59
 */
public class ThreadPoolHelper {

    @Retention(RetentionPolicy.SOURCE)
    public @interface Type {
        public static final int FixedThread = 0;
        public static final int CachedThread = 1;
        public static final int SingleThread = 2;
    }

    private ExecutorService exec;
    private ScheduledExecutorService scheduleExec;

    private ThreadPoolHelper() {
        throw new UnsupportedOperationException("u can't instantiate me...");
    }

    /**
     *
     * @param type         线程池类型
     * @param corePoolSize 只对Fixed和Scheduled线程池起效
     */
    public ThreadPoolHelper(@Type final int type, final int corePoolSize) {
        // 构造有定时功能的线程池
        // ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 10L, TimeUnit.MILLISECONDS, new BlockingQueue<Runnable>)
        scheduleExec = Executors.newScheduledThreadPool(corePoolSize);
        switch (type) {
            case Type.FixedThread:
                // 构造一个固定线程数目的线程池
                // ThreadPoolExecutor(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
                exec = Executors.newFixedThreadPool(corePoolSize);
                break;
            case Type.SingleThread:
                // 构造一个只支持一个线程的线程池,相当于newFixedThreadPool(1)
                // ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
                exec = Executors.newSingleThreadExecutor();
                break;
            case Type.CachedThread:
                // 构造一个缓冲功能的线程池
                // ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
                exec = Executors.newCachedThreadPool();
                break;
            default:
                break;
        }
    }

    /**
     * 在未来某个时间执行给定的命令
     * <p>该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。</p>
     *
     * @param command 命令
     */
    public void execute(final Runnable command) {
        exec.execute(command);
    }

    /**
     * 在未来某个时间执行给定的命令链表
     * <p>该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor 实现决定。</p>
     *
     * @param commands 命令链表
     */
    public void execute(final List<Runnable> commands) {
        for (Runnable command : commands) {
            exec.execute(command);
        }
    }

    /**
     * 待以前提交的任务执行完毕后关闭线程池
     * <p>启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
     * 如果已经关闭,则调用没有作用。</p>
     */
    public void shutDown() {
        exec.shutdown();
    }

    /**
     * 试图停止所有正在执行的活动任务
     * <p>试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。</p>
     * <p>无法保证能够停止正在处理的活动执行任务,但是会尽力尝试。</p>
     *
     * @return 等待执行的任务的列表
     */
    public List<Runnable> shutDownNow() {
        return exec.shutdownNow();
    }

    /**
     * 判断线程池是否已关闭
     *
     * @return {@code true}: 是<br>{@code false}: 否
     */
    public boolean isShutDown() {
        return exec.isShutdown();
    }

    /**
     * 关闭线程池后判断所有任务是否都已完成
     * <p>注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。</p>
     *
     * @return {@code true}: 是<br>{@code false}: 否
     */
    public boolean isTerminated() {
        return exec.isTerminated();
    }


    /**
     * 请求关闭、发生超时或者当前线程中断
     * <p>无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。</p>
     *
     * @param timeout 最长等待时间
     * @param unit    时间单位
     * @return {@code true}: 请求成功<br>{@code false}: 请求超时
     * @throws InterruptedException 终端异常
     */
    public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
        return exec.awaitTermination(timeout, unit);
    }

    /**
     * 提交一个Callable任务用于执行
     * <p>如果想立即阻塞任务的等待,则可以使用{@code result = exec.submit(aCallable).get();}形式的构造。</p>
     *
     * @param task 任务
     * @param <T>  泛型
     * @return 表示任务等待完成的Future, 该Future的{@code get}方法在成功完成时将会返回该任务的结果。
     */
    public <T> Future<T> submit(final Callable<T> task) {
        return exec.submit(task);
    }

    /**
     * 提交一个Runnable任务用于执行
     *
     * @param task   任务
     * @param result 返回的结果
     * @param <T>    泛型
     * @return 表示任务等待完成的Future, 该Future的{@code get}方法在成功完成时将会返回该任务的结果。
     */
    public <T> Future<T> submit(final Runnable task, final T result) {
        return exec.submit(task, result);
    }

    /**
     * 提交一个Runnable任务用于执行
     *
     * @param task 任务
     * @return 表示任务等待完成的Future, 该Future的{@code get}方法在成功完成时将会返回null结果。
     */
    public Future<?> submit(final Runnable task) {
        return exec.submit(task);
    }

    /**
     * 执行给定的任务
     * <p>当所有任务完成时,返回保持任务状态和结果的Future列表。
     * 返回列表的所有元素的{@link Future#isDone}为{@code true}。
     * 注意,可以正常地或通过抛出异常来终止已完成任务。
     * 如果正在进行此操作时修改了给定的 collection,则此方法的结果是不确定的。</p>
     *
     * @param tasks 任务集合
     * @param <T>   泛型
     * @return 表示任务的 Future 列表,列表顺序与给定任务列表的迭代器所生成的顺序相同,每个任务都已完成。
     * @throws InterruptedException 如果等待时发生中断,在这种情况下取消尚未完成的任务。
     */
    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return exec.invokeAll(tasks);
    }

    /**
     * 执行给定的任务
     * <p>当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的Future列表。
     * 返回列表的所有元素的{@link Future#isDone}为{@code true}。
     * 一旦返回后,即取消尚未完成的任务。
     * 注意,可以正常地或通过抛出异常来终止已完成任务。
     * 如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。</p>
     *
     * @param tasks   任务集合
     * @param timeout 最长等待时间
     * @param unit    时间单位
     * @param <T>     泛型
     * @return 表示任务的 Future 列表,列表顺序与给定任务列表的迭代器所生成的顺序相同。如果操作未超时,则已完成所有任务。如果确实超时了,则某些任务尚未完成。
     * @throws InterruptedException 如果等待时发生中断,在这种情况下取消尚未完成的任务
     */
    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws
            InterruptedException {
        return exec.invokeAll(tasks, timeout, unit);
    }

    /**
     * 执行给定的任务
     * <p>如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
     * 一旦正常或异常返回后,则取消尚未完成的任务。
     * 如果此操作正在进行时修改了给定的collection,则此方法的结果是不确定的。</p>
     *
     * @param tasks 任务集合
     * @param <T>   泛型
     * @return 某个任务返回的结果
     * @throws InterruptedException 如果等待时发生中断
     * @throws ExecutionException   如果没有任务成功完成
     */
    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return exec.invokeAny(tasks);
    }

    /**
     * 执行给定的任务
     * <p>如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
     * 一旦正常或异常返回后,则取消尚未完成的任务。
     * 如果此操作正在进行时修改了给定的collection,则此方法的结果是不确定的。</p>
     *
     * @param tasks   任务集合
     * @param timeout 最长等待时间
     * @param unit    时间单位
     * @param <T>     泛型
     * @return 某个任务返回的结果
     * @throws InterruptedException 如果等待时发生中断
     * @throws ExecutionException   如果没有任务成功完成
     * @throws TimeoutException     如果在所有任务成功完成之前给定的超时期满
     */
    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws
            InterruptedException, ExecutionException, TimeoutException {
        return exec.invokeAny(tasks, timeout, unit);
    }

    /**
     * 延迟执行Runnable命令
     *
     * @param command 命令
     * @param delay   延迟时间
     * @param unit    单位
     * @return 表示挂起任务完成的ScheduledFuture,并且其{@code get()}方法在完成后将返回{@code null}
     */
    public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
        return scheduleExec.schedule(command, delay, unit);
    }

    /**
     * 延迟执行Callable命令
     *
     * @param callable 命令
     * @param delay    延迟时间
     * @param unit     时间单位
     * @param <V>      泛型
     * @return 可用于提取结果或取消的ScheduledFuture
     */
    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
        return scheduleExec.schedule(callable, delay, unit);
    }

    /**
     * 延迟并循环执行命令
     *
     * @param command      命令
     * @param initialDelay 首次执行的延迟时间
     * @param period       连续执行之间的周期
     * @param unit         时间单位
     * @return 表示挂起任务完成的ScheduledFuture,并且其{@code get()}方法在取消后将抛出异常
     */
    public ScheduledFuture<?> scheduleWithFixedRate(final Runnable command, final long initialDelay,
                                                    final long period, final TimeUnit unit) {
        return scheduleExec.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    /**
     * 延迟并以固定休息时间循环执行命令
     *
     * @param command      命令
     * @param initialDelay 首次执行的延迟时间
     * @param delay        每一次执行终止和下一次执行开始之间的延迟
     * @param unit         时间单位
     * @return 表示挂起任务完成的ScheduledFuture,并且其{@code get()}方法在取消后将抛出异常
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay,
                                                     final long delay, final TimeUnit unit) {
        return scheduleExec.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

}