GYM4-线程池

1、线程池

优点:将更多的精力放在业务上,不必操心线程的创建和销毁。节省线程创建销毁的时间。

手写简易ThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class MyThreadPool {
private static MyThreadPool instance = new MyThreadPool();
private List<Worker> idleThreads;
private int threadCounter;
private boolean isShutDown =false;

private MyThreadPool(){
idleThreads = new Vector<Worker>(10);
threadCounter = 0;
}
public int getCreatedThreadCount() {return threadCounter;}
public static MyThreadPool getInstance(){return instance;}
protected synchronized void repool(Worker repoolingThread){
if(!isShutDown) repoolingThread.shutDown();
else idleThreads.add(repoolingThread);
}
public synchronized void shutDown(){
isShutDown = true;
for(Worker worker : idleThreads) worker.shutDown();

}
public synchronized void start(Runnable target){
Worker thread = null;
if(idleThreads.size()>0){
int lastIndex = idleThreads.size()-1;
thread = (Worker) idleThreads.get(lastIndex);
idleThreads.remove(lastIndex);
thread.setTarget(target);
}
}
public static class Worker extends Thread{
private MyThreadPool pool;
private Runnable target;
private boolean isShutdown = false;
private boolean isIdle = false;
public Worker(Runnable target, String name , MyThreadPool pool){
super(name);
this.pool = pool;
this.target =target;
}
public Runnable getTarget(){return target;}
public void setTarget(Runnable target){this.target = target;}
public boolean isIdle(){return isIdle;}
public void shutDown(){
this.isShutdown = true;
}
@Override
public void run(){
while (!isShutdown){
isIdle =false;
if(target != null) target.run();
isIdle = true;
try {
pool.repool(this);
synchronized (this){
wait();
}
}catch (InterruptedException e){

}
}
}
}
}

线程池重要接口

Executor接口

  执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():

ExecuteService

  Executor提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。
  可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。
  通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)。

ThreadPoolExecutor

Executors

  类提供了用于此包中所提供的执行程序服务的工厂方法。

线程池的使用

ThreadPoolExecutor 参数

Executors.ThreadPoolExecutor的参数不同实现不同的线程池
1)corePoolSize 核心线程池标准数量
2)maximumPoolSize 线程池最大尺寸
3)keepAliceTime 超过标准数量,空闲线程存活时间
4)TimeUnit 时间单位
5)BlockingQueue 保存没有执行的线程
6)RejectedExecutionHandler 持有拒绝策略,非必填
7)ThreadFactory

不同参数的不同线程池

newFixedThreadPool

(n,n,0, SECONDS,LinkedBlockingQueue)

newSingleThreadExecutor

(1,1,0L,s,LinkedBlockingQueue)

newCachedThreadPool

(0,MAX,60,SECONDS,SychronousQueue)
SynchronousQueue 容量为零,60s后线程被销毁,容量可扩张

newSingleThreadExecutor

newScheduledThreadPool

1
2
3
4
5
6
7
8
9
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for(int i=0;i<5;i++){
executorService.scheduleWithFixedDelay(
new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName());
}
},2,2, TimeUnit.SECONDS);
}

ThreadPoolExecutor的子类ScheduledThreadPoolExecutor实现

## ExecutorServeice
submit();提交Callable 返回Future
execute();提交Runnbale 不返回

线程池的扩展

增强接口

  • beforeExecute()
  • afterExecute()
  • terminated()
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("ready " + ((MyTask) r).name);
}
};
for(int i = 0;i<5;i++){
service.execute(new MyTask(String.valueOf(i)));
}

}

## 拒绝策略
缓冲队列不可能无限大,防止任务十分繁重导致的内存溢出和其他错误。
RejectedExecutionHandler接口持有拒绝策略。

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

它的实现类主要有以下几种

AbortPolicy

直接拒绝任务,拒绝任务时会抛出RejectedExecutorException

DiscardPolicy

丢弃任务,什么都不做

CallerRunsPolicy

调用线程池的线程完成任务,不再使用线程池中的线程。直接调用请求Runnable的run()方法。

DiscardOldestPolicy

重线程池队列中选取最老的线程丢弃

自定义线程工厂

ThreadFactory接口。构建新的线程。实现者同时也初始化优先级,线程名称,守护状态等。

1
2
3
public interface ThreadFactory {
Thread newThread(Runnable r);
}

ThreadPoolExecutor

ctl属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
后28为表示线程数量前四位表示线程装填,最高的三位表示不同的状态:

  • 111 RUNNING 正在运行
  • 000 SHUTDOWN 不再接收任务,队列中任务继续处理
  • 001 STOP 不接受新的任务忙不处理队列中的任务,中断正在处理的任务
  • 010 TIDYING 整理 所有线程终结,workerCount设置为0, terminated()方法运行的过渡状态
  • 011 TERMINATED terminated()方法运行完毕
    剩下的29位表示有效线程的数量

execut()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* 如果正在运行的线程小于corePoolSize 。尝试创建新的线程执行当前任务。 调用addWroker方法时,
会利用原子操作检查运行状态和有效线程数量 ,以防止之不应该添加线程时发生失败警告*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/* 没有使用锁进行同步,加入工作线程和加入队列函数必须进行再次检查。*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//双重检查,如果线程池关闭,则移除加入的任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}//加入队列失败则开启新的线程,不受核心线程数限制,而是最大线程数限制
else if (!addWorker(command, false))
reject(command);
}

Fork/Join 线程池

将大任务分成小任务,最终将结果整理,分治思想。等待子任务完成。每个子任务不一定需要创建新的线程,而是利用线程中的线程。
fork 拆分任务。join等待任务完成。

示例代码

从性能角度看,Fork/Join适合处理子任务比较复杂的情况。否则的话开辟线程的时间开销会比大于分治节省的时间,得不偿失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static class CountTask extends RecursiveTask {
private int start;
private int end;

public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Object compute() {
int sum = 0;
if (end - start <= 10000){
for(int i= start;i<=end;i++) sum += i;
} else {
int mid = (start + end) >> 1;
CountTask left = new CountTask(start,mid);
CountTask right = new CountTask(mid+1,end);
//执行子任务
left.fork();
right.fork();
//等待子任务完成
sum = (int)left.join() + (int)right.join();
}
return sum;
}
}
public static void compare(int n) throws Exception{
Long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(1, n);
Future result = forkJoinPool.submit(task);
System.out.println(result.get());
Long mid = System.currentTimeMillis();
System.out.println(mid - start);
for (int i = 1,sum = 0; i <= n; i++) sum += i;
System.out.println(System.currentTimeMillis() - mid);
}

WorkQueue

ForkJoinPool

ctl属性

  使用一个long型整数保存五个变量,AC(16位保存活跃线程数减去并行度)、TC(16位保存总共的线程数减去并行度)、ST(1位保存线程池是否终结)、EC(15位保存栈顶端ForkJoinTask的eventCount)、ID(15位记录栈顶端ForkJoinTask的poolIndex属性)
  使用一个long型整数表示5个变量并不会节省很多空间,而且大大降低了代码的可读性。仍然这么用的目的是同时保持五个变量的线程安全和并发性能。通过与无锁操作配合提升性能。

scan()

runWorker()

帮助策略

自己从top取 别人从Base取 减少冲突。均匀分配任务。

接口

RecursiveAction 无返回值
RecursiveTask 有返回值