Executor 与线程池

创建线程不像创建对象一样简单,创建对象,仅仅是JVM的堆上分配一块内存而已,而创建一个线程,却需要调用操作系统内核的API,然后OS要为线程分配一系列的资源,这个成本就很高,所以线程是一个重量级的对象,应该避免频繁创建和销毁。

线程池:生产消费模式

目前业界线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。在下面的示例代码中,创建了一个非常简单的线程池 MyThreadPool。

在 MyThreadPool 的内部,维护了一个阻塞队列 workQueue 和一组工作线程,工作 线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。 MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务。

如何使用Java中的线程池

Java 提供的线程池相关的工具类中,最核心的是ThreadPoolExecutor,它强调的是 Executor,而不是一般意义上的池化资源。其构造函数如下

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor{
int corePoolSize;
int maximumPoolSeiz,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
}

可以把线程池类比为一个项目组,而线程就是项目组 的成员。

corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤 了,至少要留 corePoolSize 个人坚守阵地。

maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但 是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤 人了,最多能撤到 corePoolSize 个人。

keepAliveTime & unit:上面提到项目根据忙闲来增减人员,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲, keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize , 那么这个空闲的线程就要被回收了。

workQueue:工作队列,储存任务

threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一 个有意义的名字。

handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙 碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。 ThreadPoolExecutor 已经提供了以下 4 种策略。 CallerRunsPolicy:提交任务的线程自己去执行该任务。

AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。

DiscardPolicy:直接丢弃任务,没有任何异常抛出。

DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃, 然后把新任务加入到工作队列。

注意事项

考虑到 ThreadPoolExecutor 的构造函数实在是有些复杂,所以 Java 并发包里提供了一个 线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂 的编码规范中基本上都不建议使用 Executors 了,

不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无 界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导 致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。

使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任 务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降 级策略配合使用。

使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止; 不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执 行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕 获所有异常并按需处理,你可以参考下面的示例代码。

1
2
3
4
5
6
7
try{
//业务逻辑
}catch (RuntimeException e){
// 按需处理
}catch (Throwable x){
//按需处理
}

获取任务执行结果

Java通过ThreadPoolExecutor 提供的三个submit()方法和一个FutureTask工具类来支持获得任务执行结果的需求。

1
2
3
4
5
6
//提交Runnable任务
Future<?> submit (Runnable task);
// 提交Callable 任务
<T> Fututre<T> submit(Callable<T> task);
//提交Runnable任务及结果引用
<T> Future<T> submit (Runnable task, T result);

发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它 们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务 是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit), 其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这 两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

1
2
3
4
5
6
7
8
9
10
11
// 取消任务
boolean cancel(
boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);

三个Sumbit()方法之间的区别在于方法参数不同。

  1. 提交 Runnable 任务submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()
  2. 提交 Callable 任务submit(Callable<T> task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。
  3. 提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法 很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit()方法的参数 result。下面这段示例代码展示了它的经典用法。需要注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可 以共享数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ExecutorService executor =Executors.newFixedThreadPool(1);
// 创建Result 对象 r
Result r =new Result();
r.setAAA(a);
//提交任务
Future<Result> future =executor.submit(new Task(r),r);
Result fr =future.get();
// 下面等式成立
fr ====r
fr.getAAA() ====a
fr.getXXX() ===x

class Task implements Runnable{
Result r;
// 通过构造函数传入result
Task(Result r){
this.r =r;
}
void run(){
// 可以操作result
a =r.getAAA();
r.setXXX(x);
}
}

FutureTask 【可以直接方便获取线程结果】

Future是一个接口,而FutureTask是一个实实在在的工具类,有两个构造函数,和前面的submit()类似

1
2
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable,V result);

使用FutureTask 很简单,FutureTask实现了Runnable和Future接口,由于实现了Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口, 所以也能用来获得任务的执行结果。

1
2
3
4
5
6
7
8
//创建 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()->1+2);
//创建线程池
ExecutorService es =Executors.newCachedThreadPool();
//提交FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result =futureTask.get();

Fork-Join

对于简单的 并行任务,可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无 论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行 任务,则可以通过 CompletionService 来解决。

分治任务模型

分治任务模型可以分为两个阶段,一个阶段是任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果,另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获取最终结果。

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和 子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往 都采用递归算法。

fork-join的使用

Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架 主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可 以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。

ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其 中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的 执行结果。ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的 时候,需要你定义子类去扩展。