Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序
都可以使用线程池。

使用线程池的好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,
还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的实现原理

线程池的处理流程

1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作
线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这
个工作队列里。如果工作队列满了,则进入下个流程。
3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程
来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

ThreadPoolExecutor执行Executor方法分下面4种情况

1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤
需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能
地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

工作线程 线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。

线程池的使用

Java中的阻塞队列

ArrayListBlockingQueue: 一个由数组结构组成的有界阻塞队列

LinkedBlockingQueue: 一个由链表结构组成的无界阻塞队列

PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列

DelayQueue: 一个使用优先级队列实现的无界阻塞队列

SynchronousQueue 一个不存储元素的阻塞队列

LinkedtransferQueue 一个由链表结构组成的无界阻塞队列

LinkedBlockingDeque 一个由链表结构组成的双向阻塞对列

创建线程池

通过ThreadPoolExecutor 来创建一个线程池

1
new ThreadPoolExecutor(corepoolsize,maxiumPoolsize,keepalivetime,millseconds,runnabelTaskQueue,handler);

一个线程池要实现ExecutorService接口,ExecutorService 继承自Executor.

1
ExecutorService es = Executors.newFixedThreadPool(5);

向线程池提交任务

两个方法向线程池提交任务.分别为execute()和submit()

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功.

1
2
3
4
5
6
7
threadpool.execute(new Runnable()
{
public void run()
{
//TODO
}
});

submit方法用于提交需要返回值的任务,线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取值,get方法会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit) 方法则会阻塞当前线程一段时间后立即返回,这时候可能任务还没执行完

1
2
3
4
5
6
7
8
9
10
11
Future<Object> future  =executor.submit(harReturnValuetask);
try
{
Object s= future.get();
}catch(InterruptedException e)
{

}finally
{
executor.shutdown();
}

关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务
都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

也就是或,shutdown 不会关闭还在运行的线程,shutdownnow会尝试中断还在运行的线程.

核心线程池的内部实现

对于核心的几个线程池,无论是newFixedThreadPool()方法,newSingleThreadExecutor()还是newCachedThreadPool()方法,虽然看起来创建的线程有着完全不同的特点,但是内部实现均使用了ThreadPoolExecutor实现

都是return了一个 new ThreadPoolExecutor().核心的线程池都是ThreadPoolExecutor类的封装.

通过ThreadPoolExecutor来创建一个线程池(自定义线程池)

1
2
new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,
milliseconds,runnableTaskQueue,handler,unit);

1)corePoolSize(线程池的基本大小)

2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。

1
2
3
4
5
6
7
ArrayBlockingQueue 是一个基于数组结构的有界阻塞队列,此队列按照FIFO原则对元素进行排序。
--------
LinkedBlockingQueue 是一个基于链表结构的阻塞队列,此队列按照FIFO排列元素,吞吐量通常高于ArrayBlockingQueue, 静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
---------
SynchronousQueue 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue, 静态工厂方法Executors.newCachedThreadPool 使用了这个队列。
---------
PriorityBlockingQueue一个具有优先级的无界阻塞队列。

3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。

4)ThreadFactory:线程工厂,用于创建线程.一般用默认的即可.

5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。
·AbortPolicy:直接抛出异常。
·CallerRunsPolicy:只用调用者所在线程来运行任务。
·DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
·DiscardPolicy:不处理,丢弃掉。

6) keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。

7) unit keepAlivetime的单位.其中,Queue 是指提交但是未执行的任务队列,是一个BlockingQueue接口的对象,仅用于存放Runnable对象.

几种阻塞队列:

  • 直接提交的队列:由SynchronousQueue对象提供,是一个特殊的blockingqueue.它没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作.如果使用此队列,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,若没有空闲线程,则尝试新的进程,如果进程数量已经达到最大值,则执行拒绝策略.
  • 有界的任务队列 : ArrayBlockingQueue 实现.它的构造函数中必须带有一个容量参数,表示该队列的最大容量.
1
public ArrayBlockingQueue(int capacity)

若有新任务,且实际线程小于corePoolSize,优先创建新的线程,若大于corePoolSize,将新任务加入等待队列,若队列满,则无法加入,在总线程不大于maximumPoolsize的基础上,创建新的线程,若大于maximumPoolSize,执行拒绝策略.

  • 优先任务队列:带有执行优先级的队列,通过PriorityBlockingQueue实现,可控制任务的执行先后顺序,是一个特殊的无界队列,

newFixedThreadPool 返回了一个corePoolSize和maximumPoolSize大小一样的,并且使用了LinkedBlockingQueue任务队列的线程池,

newSingleThreadExecutor()返回了一个单线程的线程池.

newCachedThreadPool()返回corePoolSize为0, maximumPoolSize无穷大的线程池, 并使用SynchronousQueue队列.

几种拒绝策略

拒绝策略是系统超负荷运行时的补充措施,同时,等待队列中也已经满了,塞不下新任务了.JDK提供四种拒绝策略

  • AbortPolicy 直接抛出异常,阻止系统正常工作
  • CallerRunsPolicy 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,所以这种策略不会真的丢弃任务,但是任务提交线程的性能极有可能急剧下降.
  • DiscardOledestPolicy 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务.
  • DiscardPolicy 默默丢弃无法处理的任务,不予以任何处理.
1
2
3
4
5
6
7
8
9
//自定义一个线程池
ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(10),Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+"is discard");
}
});

自定义线程创建 ThreadFactory

线程池中的线程从哪里来,答案就是ThreadFactory

ThreadFactory是一个接口,它只有一个方法,用来创建线程,

1
Thread newThread(Runnable r);

当线程池需要新建线程时,就会调用这个方法