Executor框架

Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括RunnableCallable,而执行机制由Executor框架提供。

Executor框架

Executor框架的两级调度模型

对于Sun JDK来说,它的Windows版和Linux版都是使用一对一的线程模型实现的,一条Java线程就映射到一条轻量级进程(LWP)中。

参见:Java与线程

Executor框架的两级调度模型可以描述为:在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

Executor框架的结构

Executor框架主要由3部分组成:

  1. 任务

    被执行的任务需要实现的接口: Runnable或者Callable

  2. 任务的执行

    包括任务执行机制的核心接口Executor,以及继承自ExecutorExecutorService接口。

    Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutorScheduledThreadPoolExecutor

  3. 异步计算的结果

    包括接口Future和其实现类FutureTask

Executor框架的类与接口

Executor框架的使用示意图

ThreadPoolExecutor

ThreadPoolExecutor通常使用Executors工厂类来创建,可以创建以下3种类型:

  1. FixedThreadPool

    Executors.newFixedThreadPool(int)适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适用于负载比较重的服务器。

    工作队列workQueue使用的是LinkedBlockingQueue,容量为Integer.MAX_VALUE

  2. SingleThreadExecutor

    Executors.newSingleThreadExecutor()适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。

    工作队列workQueue使用的是LinkedBlockingQueue。容量为Integer.MAX_VALUE

  3. CachedThreadPool

    Executors.newCachedThreadPool()适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。corePoolSize为0,maximumPoolSize被设置为Integer.MAX_VALUE,即无界的。

    作队列workQueue使用的是SynchronousQueue

FixedThreadPool和SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但是CachedThreadPool的maximumPoolSize是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中处理任务的速度时,CachedThreadPool会不断创建线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor可以在给定的延迟后运行命令或者定期执行命令,使用Executors工厂类来创建,由2种类型:

  1. ScheduledThreadPoolExecutor

    包含若干个线程的ScheduledThreadPoolExecutor

  2. SingleScheduledThreadPoolExecutor

    只包含一个线程的ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor会把调度任务(ScheduledFutureTask)放到DelayQueue中。

FutureTask

Future接口和其实现类FutureTask代表了异步计算的结果。

FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

1
2
3
4
5
6
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask除了实现了Future接口外,还实现了Runnable接口,因此FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

FutureTask的状态

FutureTask可以处于以下3种状态:

  1. 未启动

    FutureTask.run()被创建但是还没有被执行之前,FutureTask处于未启动状态。

  2. 已启动

    FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。

  3. 完成

    FutureTask.run()执行完后正常结束,或被取消(Future.cancel(...)),或执行时抛出异常而结束,FutureTask处于完成状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
Task task = new Task();
Future<String> future = es.submit(task);
FutureTask<String> futureTask = new FutureTask<String>(task);
// 将FutureTask交给线程池执行
es.submit(futureTask);
// FutureTask直接交给Thread执行
// new Thread(futureTask).start();
System.out.println(future.get());
System.out.println(futureTask.get());
es.shutdown();
}
static class Task implements Callable<String>{
@Override
public String call() throws Exception {
Thread.sleep(2000);
int sum = 0;
for (int i = 0; i < 1000; i++){
sum += i;
}
return "sum="+sum;
}
}
}