1、线程池
优点:将更多的精力放在业务上,不必操心线程的创建和销毁。节省线程创建销毁的时间。
手写简易ThreadPool
1 | public class MyThreadPool { |
线程池重要接口
Executor接口
执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():
ExecuteService
Executor提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。
可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。
通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)。
ThreadPoolExecutor
Executors
类提供了用于此包中所提供的执行程序服务的工厂方法。
线程池的使用
ThreadPoolExecutor 参数
Executors.ThreadPoolExecutor的参数不同实现不同的线程池
1)corePoolSize 核心线程池标准数量
2)maximumPoolSize 线程池最大尺寸
3)keepAliceTime 超过标准数量,空闲线程存活时间
4)TimeUnit 时间单位
5)BlockingQueue
6)RejectedExecutionHandler 持有拒绝策略,非必填
7)ThreadFactory
不同参数的不同线程池
newFixedThreadPool
(n,n,0, SECONDS,LinkedBlockingQueue
newSingleThreadExecutor
(1,1,0L,s,LinkedBlockingQueue
newCachedThreadPool
(0,MAX,60,SECONDS,SychronousQueue)
SynchronousQueue 容量为零,60s后线程被销毁,容量可扩张
newSingleThreadExecutor
newScheduledThreadPool
1 | ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); |
ThreadPoolExecutor的子类ScheduledThreadPoolExecutor实现
## ExecutorServeice
submit();提交Callable 返回Future
execute();提交Runnbale 不返回
线程池的扩展
增强接口
- beforeExecute()
- afterExecute()
- terminated()
1 | public static void main(String[] args) { |
## 拒绝策略
缓冲队列不可能无限大,防止任务十分繁重导致的内存溢出和其他错误。
RejectedExecutionHandler接口持有拒绝策略。1
2
3public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
它的实现类主要有以下几种
AbortPolicy
直接拒绝任务,拒绝任务时会抛出RejectedExecutorException
DiscardPolicy
丢弃任务,什么都不做
CallerRunsPolicy
调用线程池的线程完成任务,不再使用线程池中的线程。直接调用请求Runnable的run()方法。
DiscardOldestPolicy
重线程池队列中选取最老的线程丢弃
自定义线程工厂
ThreadFactory接口。构建新的线程。实现者同时也初始化优先级,线程名称,守护状态等。1
2
3public interface ThreadFactory {
Thread newThread(Runnable r);
}
ThreadPoolExecutor
ctl属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
后28为表示线程数量前四位表示线程装填,最高的三位表示不同的状态:
- 111 RUNNING 正在运行
- 000 SHUTDOWN 不再接收任务,队列中任务继续处理
- 001 STOP 不接受新的任务忙不处理队列中的任务,中断正在处理的任务
- 010 TIDYING 整理 所有线程终结,workerCount设置为0, terminated()方法运行的过渡状态
- 011 TERMINATED terminated()方法运行完毕
剩下的29位表示有效线程的数量
execut()
1 | public void execute(Runnable command) { |
Fork/Join 线程池
将大任务分成小任务,最终将结果整理,分治思想。等待子任务完成。每个子任务不一定需要创建新的线程,而是利用线程中的线程。
fork 拆分任务。join等待任务完成。
示例代码
从性能角度看,Fork/Join适合处理子任务比较复杂的情况。否则的话开辟线程的时间开销会比大于分治节省的时间,得不偿失。
1 | public static class CountTask extends RecursiveTask { |
WorkQueue
ForkJoinPool
ctl属性
使用一个long型整数保存五个变量,AC(16位保存活跃线程数减去并行度)、TC(16位保存总共的线程数减去并行度)、ST(1位保存线程池是否终结)、EC(15位保存栈顶端ForkJoinTask的eventCount)、ID(15位记录栈顶端ForkJoinTask的poolIndex属性)
使用一个long型整数表示5个变量并不会节省很多空间,而且大大降低了代码的可读性。仍然这么用的目的是同时保持五个变量的线程安全和并发性能。通过与无锁操作配合提升性能。
scan()
runWorker()
帮助策略
自己从top取 别人从Base取 减少冲突。均匀分配任务。
接口
RecursiveAction 无返回值
RecursiveTask 有返回值