BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。
BlockingQueue主要的方法
添加 put(o) offer(o) offer(o,timeout,timeunit)
移除 take() poll() poll(timeout,timeunit)
BlockingQueue 的实现
| 队列类 | 数据结构 | 最大容量 | 支持容量自增 | 队列形式 | 锁个数 |
|---|---|---|---|---|---|
| ArrayBlockingQueue | 数组 | Integer.MAX_VALU | 不支持,容量必须在构造函数中指定,创建队列后不能变更容量大小。 | FIFO(先进先出) | 1把ReentrantLock锁,插入与移除互斥执行。 |
| LinkedBlockingQueue | 单向链表 | Integer.MAX_VALU | 不涉及 | FIFO(先进先出) | 2把ReentrantLock锁,插入与移除各一把锁,插入与移除可并发执行。 |
| LinkedBlockingDeque | 双向链表 | Integer.MAX_VALU | 不涉及 | 提供多种API,可以在队头和队尾进行插入和移除操作 | 1把ReentrantLock锁,插入与移除互斥执行。 |
| DelayQueue | 数组(内部为优先队列) | Integer.MAX_VALUE 支持(默认值为11) | 按元素的大小排序,元素必须实现 Delayed 接口(Delayed 继承 Comparable 接口)。最小先出队列,最大最后出队列 | 元素出队之前调用元素的 getDelay 接口返回延时时间,延迟这段时间后再出队列。 | 1把ReentrantLock 锁,插入与移除互斥执行。 |
| PriorityBlockingQueue | 数组 | Integer.MAX_VALUE - 8 支持(默认值为11) | 排序从小到大输出 | 元素实现 Comparable 接口,或者构造函数中传入 Comparator 对象,以便元素排序。 | 1把ReentrantLock锁,插入与移除互斥执行。 |
| SynchronousQueue | 无缓存结构 | 1 | 不涉及 | put | 操作会阻塞至 take 操作的到来。take 操作也会阻塞至 put 操作的到来。 |
ThreadPoolExecutor 是线程池的最主要的功能实现类。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
| 参数名称 | 说明 |
|---|---|
| corePoolSize | 核心线程池数量,一个任务到来时,若当前没有空闲线程且线程数量小于orePoolSize,则创建一个新的线程执行这个任务。 |
| workQueue | 负责任务缓冲的阻塞队列,若当前没有空闲线程且线程数量已经等于corePoolSize,则尝试将任务放入这个阻塞队列,若成功则等待有空闲线程取出执行。 |
| maximumPoolSize | 若阻塞队列已满,无法继续将任务放入。如果当前线程数量小于maximumPoolSize则创建一个新的线程执行当前任务。 |
| handler | 若当前阻塞队列满,且当前线程数量已经到达 maximumPoolSize,则调用 handler 的 |
| rejectedExecution | 方法处理这个任务。keepAliveTime,unit 线程空闲时间,如果当前线程数量已经大于 corePoolSize 或者调用了 allowsCoreThreadTimeOut 将线程超时关闭开个设置为了true,则空闲这段时间后会关闭线程。 |
线程池创建线程流程(默认|下箭头)
execute or schedule
|
|
活动线程数量小于corePoolSize ------NO---> workQueue阻塞队列是否满 --No--> 将任务插入阻塞队列
|yes |
| |yes
创建一个线程用来执行任务 <------yes------- 活动线程数量小于 maximumPoolSize
| |no
| |
任务结束 使用拒绝策略 handler
线程执行流程
线程池中线程开始运行
|
取出初始执任务行
|
|-->活动线程数量大于maximumPoolSize --yes--> 当前线程销毁
| |no | 上箭头 超时( keepAliveTime Unit )
| 活动线程数量大于corePoolSize --yes--> 超时(poll)方式获取任务
| |no |
| 取出(阻塞take)任务执行 执行任务
----| <-------------------------------------------------------|
提供一系列静态方法,创建不同运行机制的线程池。
| 关键方法 | 说明 |
|---|---|
| static ExecutorService newSingleThreadExecutor() | 创建一个线程池,池中只有一个线程,所有任务由这个线程顺序执行。 |
| static ExecutorService newFixedThreadPool(int core) | 创建一个线程池,池中最多有core个线程。 |
| static ExecutorService newCachedThreadPool() | 创建一个线程池,池中是一个缓存线程池,每次来一个任务若没有空闲线程则创建一个新的线程;若线程空闲60秒,则关闭空闲线程。 |
| static ScheduledExecutorService core)newScheduledThreadPool(int 创建一个具有延迟性质线程池,池中最多有core个线程,任务可以延迟执行或者周期执行。 |
// 创建单线程线程池
ThreadPoolExecutor singlePool = (ThreadPoolExecutor)Executors.newSingleThreadExecutor();
singlePool.execute(runnable);
//创建缓存线程池
ThreadPoolExecutor cachePool = (ThreadPoolExecutor)Executors.newCachedThreadPool();
cachePool.execute(runnable);
//创建固定数量线程的线程池
ThreadPoolExecutor fixPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(10);
fixPool.execute(runnable);
//创建延迟性质的线程池
ScheduledExecutorService schedulePool = Executors.newScheduledThreadPool(10);
schedulePool.schedule(runnable,5,TimeUnit.SECONDS); //延迟5秒执行
schedulePool.scheduleAtFixedRate(runnable,5,10,TimeUnit.SECONDS); //延迟5秒,周期为10秒执行
ThreadPoolExecutor相关策略
void shutdown() 线程池处于 SHUTDOWN 状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕。
List<Runnable> shutdownNow() 线程池处于 STOP 状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务,执行线程的 interrupt 方法, 并返回还未执行的任务。
void execute(Runnable command) 执行一个任务,任务无返回值。
Future submit(Callable task) 执行一个任务有返回值,可以通过Future获取执行结果,任务还没有执行前还可以取消任务。
Executors.newSingleThreadExecutor 和 Executors.newFixedThreadPool 创建的线程池使用链表阻塞队列,最大能缓存Integer.MAX_VALUE,也就是说线程池中的线程数量 99.9% 不会超过核心线程池数量。
Executors.newCachedThreadPool 使用这个同步阻塞队列缓存任务,它只能存储一个任务,意味着每来一个任务都需要创建一个线程处理,任务很多时会创建很多线程造成很大的消耗,但是CacheThreadPool会清除空闲时间超过60秒的线程。
Executors.newScheduledThreadPool 使用延迟的阻塞队列,最大能缓存 Integer.MAX_VALUE 个任务,所以核心线程池数量 99.9% 不会超过 ScheduledThreadPool 构造函数中指定的核心线程数量。

