1. 线程池创建
线程池创建有三种:
如果使用spring,可以使用spring自带的线程池
可以使用Executors框架体系(不推介使用,because 它使用的都是无界限队列,容易发生OOM)
1.1 自定义创建线程池
Copy @ Bean
public ThreadPoolExecutor echartExecutor() {
log . info ( "ThreadPoolExecutor echartExecutor init ... " );
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor( 9 , 27 , 30 , TimeUnit . MINUTES , new ArrayBlockingQueue <>( 10000 ) ,
Executors . defaultThreadFactory() , new AbortPolicy()) ;
threadPoolExecutor . prestartAllCoreThreads (); // 预启动所有核心线程
log . info ( "ThreadPoolExecutor echartExecutor init completed !!! " );
return threadPoolExecutor;
}
线程创建工厂也可以自己定义
Copy static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger( 1 ) ;
@ Override
public Thread newThread ( Runnable r) {
Thread t = new Thread(r , "my-thread-" + mThreadNum . getAndIncrement()) ;
System . out . println ( t . getName () + " has been created" );
return t;
}
}
拒绝执行处理器也可以自己定义
Copy public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution ( Runnable r , ThreadPoolExecutor e) {
doLog(r , e) ;
}
private void doLog ( Runnable r , ThreadPoolExecutor e) {
// 可做日志记录等
System . err . println ( r . toString () + " rejected" );
// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}
corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
workQueue:工作队列,和上面示例代码的工作队列同义。
threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
CallerRunsPolicy:提交任务的线程自己去执行该任务。
AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
DiscardPolicy:直接丢弃任务,没有任何异常抛出。
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走。
1.2 spring自己的线程池
Copy @ Bean (name = "threadPoolTaskExecutor" )
public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor() ;
threadPool . setCorePoolSize ( 10 );
threadPool . setMaxPoolSize ( 60 );
threadPool . setKeepAliveSeconds ( 60 );
threadPool . setQueueCapacity ( 10000 );
threadPool . setWaitForTasksToCompleteOnShutdown ( true );
threadPool . setAwaitTerminationSeconds ( 60 );
threadPool . initialize ();
return threadPool;
}
1.3 Executors工具框架体系创建线程池
Copy ExecutorService pool = Executors . newFixedThreadPool (taskSize); //创建固定线程数的线程池
newCachedThreadPool() //创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)
newSingleThreadExecutor() //创建一个单线程化的Executor
newScheduledThreadPool( int corePoolSize) //创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
1.4 使用线程池的注意事项
强烈建议使用有界队列,这也是不推荐使用Executors
的原因(Executors使用的都是无界队列容易发生OOM)
使用有界队列,任务过多时候,容易发生拒绝策略,默认的拒绝策略是:throw RuntimeException. 默认拒绝策略要慎重使用 ,建议自定义自己的拒绝策略;
并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
2. Future & FutureTask
2.1 Future
Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。 下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。
Copy // 提交 Runnable 任务 没有返回结果
Future<?> submit( Runnable task) ;
// 提交 Callable 任务 有返回结果
< T > Future< T > submit( Callable< T > task) ;
// 提交 Runnable 任务及结果引用
< T > Future< T > submit( Runnable task , T result) ;
提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run()方法是没有返回值的,
所以 submit(Runnable task) 这个方法返回的 Future仅可以用来断言任务已经结束了,类似于 Thread.join()。
提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,
所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。
result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
Copy import java . util . concurrent . ArrayBlockingQueue ;
import java . util . concurrent . Executors ;
import java . util . concurrent . Future ;
import java . util . concurrent . ThreadPoolExecutor ;
import java . util . concurrent . ThreadPoolExecutor . AbortPolicy ;
import java . util . concurrent . TimeUnit ;
public class Main {
public static void main ( String [] args) throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 10 , 5 , TimeUnit . MINUTES ,
new ArrayBlockingQueue <>( 30 ) , Executors . defaultThreadFactory() , new AbortPolicy()) ;
threadPoolExecutor . prestartAllCoreThreads (); // 预启动所有核心线程
Data data = new Main() . new Data() ;
Future < Data > future = threadPoolExecutor . submit ( new Main() . new Task(data) , data);
TimeUnit . SECONDS . sleep ( 2 );
System . out . println ( data . getName ());
System . out . println ( future . get () . getName ()); //future.get() === result
threadPoolExecutor . shutdown ();
}
class Data {
String name;
public String getName () {
return name;
}
public void setName ( String name) {
this . name = name;
}
}
class Task implements Runnable {
Data data;
public Task ( Data result) {
this . data = result;
}
@ Override
public void run () {
System . out . println ( "run ============" );
data . setName ( "222" );
}
}
}
你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是:
判断任务是否已取消的方法 isCancelled()
以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。
通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。 不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。
2.2 FutureTask
下面我们再来介绍 FutureTask 工具类。前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数, 它们的参数和前面介绍的 submit() 方法类似,所以这里我就不再赘述了。
Copy FutureTask( Callable< V > callable) ;
FutureTask( Runnable runnable , V result) ;
那如何使用 FutureTask 呢?其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
Copy ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 10 , 5 , TimeUnit . MINUTES ,
new ArrayBlockingQueue <>( 30 ) , Executors . defaultThreadFactory() , new AbortPolicy()) ;
threadPoolExecutor . prestartAllCoreThreads (); // 预启动所有核心线程
FutureTask < String > futureTask = new FutureTask <>(() -> "111" );
threadPoolExecutor . submit (futureTask);
String string = futureTask . get (); //这里主线程会阻塞等待副线程完成
System . out . println (string); //111
threadPoolExecutor . shutdown ();
也可以使用线程去执行
Copy FutureTask < String > futureTask = new FutureTask <>(() -> "111" );
new Thread(futureTask) . start ();
String string = futureTask . get (); //这里主线程会阻塞等待副线程完成
2.3 各个子线程结果相互引用案例
思路:通过构造器把T1线程的FutureTask传给T2线程
Copy import java . util . concurrent . ArrayBlockingQueue ;
import java . util . concurrent . Callable ;
import java . util . concurrent . Executors ;
import java . util . concurrent . FutureTask ;
import java . util . concurrent . ThreadPoolExecutor ;
import java . util . concurrent . ThreadPoolExecutor . AbortPolicy ;
import java . util . concurrent . TimeUnit ;
public class Main {
public static void main ( String [] args) throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 10 , 5 , TimeUnit . MINUTES ,
new ArrayBlockingQueue <>( 30 ) , Executors . defaultThreadFactory() , new AbortPolicy()) ;
threadPoolExecutor . prestartAllCoreThreads (); // 预启动所有核心线程
FutureTask < String > futureTask2 = new FutureTask <>( new Main() . new T2() );
FutureTask < String > futureTask1 = new FutureTask <>( new Main() . new T1(futureTask2) );
threadPoolExecutor . submit (futureTask1);
threadPoolExecutor . submit (futureTask2);
String result = futureTask1 . get ();
System . out . println (result);
threadPoolExecutor . shutdown ();
}
class T1 implements Callable < String > {
FutureTask < String > t2;
public T1 ( FutureTask < String > t2) {
this . t2 = t2;
}
@ Override
public String call () throws Exception {
System . out . println ( "洗水壶1分钟" );
TimeUnit . SECONDS . sleep ( 1 );
System . out . println ( "烧开水15分钟" );
TimeUnit . SECONDS . sleep ( 15 );
String tea = t2 . get (); //获取t2子线程的运行结果
System . out . println ( "开始泡制" + tea);
return "泡茶完成" ;
}
}
class T2 implements Callable < String > {
@ Override
public String call () throws Exception {
System . out . println ( "洗茶壶1分钟" );
TimeUnit . SECONDS . sleep ( 1 );
System . out . println ( "洗茶杯2分钟" );
TimeUnit . SECONDS . sleep ( 2 );
System . out . println ( "拿茶叶1分钟" );
TimeUnit . SECONDS . sleep ( 1 );
return "龙井" ;
}
}
}
3. Future增强CompletableFuture
Copy import java . util . concurrent . ArrayBlockingQueue ;
import java . util . concurrent . CompletableFuture ;
import java . util . concurrent . Executors ;
import java . util . concurrent . ThreadPoolExecutor ;
import java . util . concurrent . ThreadPoolExecutor . AbortPolicy ;
import java . util . concurrent . TimeUnit ;
public class Main {
public static void main ( String [] args) throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 10 , 5 , TimeUnit . MINUTES ,
new ArrayBlockingQueue <>( 30 ) , Executors . defaultThreadFactory() , new AbortPolicy()) ;
threadPoolExecutor . prestartAllCoreThreads (); // 预启动所有核心线程
CompletableFuture < Void > f1 = CompletableFuture . runAsync (() -> {
System . out . println ( "t1--- 洗水壶" );
sleep( 1 , TimeUnit . SECONDS ) ;
System . out . println ( "t1---烧开水" );
sleep( 15 , TimeUnit . SECONDS ) ;
} , threadPoolExecutor);
CompletableFuture < String > f2 = CompletableFuture . supplyAsync (() -> {
System . out . println ( "t2--- 洗茶壶" );
sleep( 1 , TimeUnit . SECONDS ) ;
System . out . println ( "t2---洗茶杯" );
sleep( 2 , TimeUnit . SECONDS ) ;
System . out . println ( "t2---拿茶叶" );
sleep( 1 , TimeUnit . SECONDS ) ;
return "龙井" ;
} , threadPoolExecutor);
CompletableFuture < String > f3 = f1 . thenCombineAsync (f2 , (__ , tf) -> {
System . out . println ( "t3---拿到茶叶" );
System . out . println ( "t3---泡茶" );
return "上茶" + tf;
} , threadPoolExecutor);
f3 . join (); //等待任务3执行结果
threadPoolExecutor . shutdown ();
}
static void sleep ( int t , TimeUnit u) {
try {
u . sleep (t);
} catch ( InterruptedException e) {
// TODO Auto-generated catch block
e . printStackTrace ();
}
}
}
3.1 创建CompletableFuture对象
Copy CompletableFuture . runAsync ( Runnable runnable) //不带返回结果
CompletableFuture . supplyAsync ( Supplier< U > supplier) //带返回结果
//下面两个是上面两个的扩展:使用自定义的线程池执行
CompletableFuture . runAsync ( Runnable runnable , Executor executor)
CompletableFuture . supplyAsync ( Supplier< U > supplier , Executor executor)
默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。
所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法
对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果?
因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。 另外,CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法我们该如何理解呢?
3.2 CompletionStage
串行关系
thenCompose:和thenApply相似,方法会创建一个子流程
Copy ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 10 , 5 , TimeUnit . MINUTES ,
new ArrayBlockingQueue <>( 30 ) , Executors . defaultThreadFactory() , new AbortPolicy()) ;
threadPoolExecutor . prestartAllCoreThreads (); // 预启动所有核心线程
CompletableFuture < String > f2 = CompletableFuture . supplyAsync (() -> "helloworld" , threadPoolExecutor)
. thenApply (s -> s + " hellofuture" )
. thenApply (String :: toUpperCase);
String string = f2 . get ();
System . out . println (string); //HELLOWORLD HELLOFUTURE
threadPoolExecutor . shutdown ();
Copy CompletionStage< R > thenCombine(other , fn) ; //接收参数和返回值
CompletionStage< R > thenCombineAsync(other , fn) ;
CompletionStage< Void > thenAcceptBoth(other , consumer) ; //接收参数,不接受返回值
CompletionStage< Void > thenAcceptBothAsync(other , consumer) ;
CompletionStage< Void > runAfterBoth(other , action) ; //不支持参数和返回值
CompletionStage< Void > runAfterBothAsync(other , action) ;
Copy CompletionStage applyToEither(other , fn) ;
CompletionStage applyToEitherAsync(other , fn) ;
CompletionStage acceptEither(other , consumer) ;
CompletionStage acceptEitherAsync(other , consumer) ;
CompletionStage runAfterEither(other , action) ;
CompletionStage runAfterEitherAsync(other , action) ;
fn:接收参数和返回值 consumer: 接收参数不接受返回值 action: 不接受参数和返回值
Copy CompletableFuture < String > completableFuture1 = CompletableFuture . supplyAsync (() -> {
int nextInt = new Random() . nextInt ( 5 );
sleep(nextInt , TimeUnit . SECONDS ) ;
return String . valueOf (nextInt);
});
CompletableFuture < String > completableFuture2 = CompletableFuture . supplyAsync (() -> {
int nextInt = new Random() . nextInt ( 5 );
sleep(nextInt , TimeUnit . SECONDS ) ;
return String . valueOf (nextInt);
});
CompletableFuture < String > completableFuture = completableFuture1 . applyToEither (completableFuture2 , s -> s);
String string = completableFuture . get ();
System . out . println (string);
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?
Copy CompletableFuture < Integer > f0 = CompletableFuture . supplyAsync (() -> ( 7 / 0 )) . thenApply (r -> r * 10 );
System . out . println ( f0 . join ());
Copy CompletionStage exceptionally(fn) ;
CompletionStage< R > whenComplete(consumer) ;
CompletionStage< R > whenCompleteAsync(consumer) ;
CompletionStage< R > handle(fn) ;
CompletionStage< R > handleAsync(fn) ;
下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
Copy CompletableFuture < Integer > f0 = CompletableFuture . supplyAsync (() -> 7 / 0 ))
. thenApply (r -> r * 10 )
. exceptionally (e -> 0 );
System . out . println ( f0 . join ());
4. 那个任务先完成就先处理哪个CompletionService
CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果的 Future 对象加入到阻塞队列
4.1 创建 CompletionService
ExecutorCompletionService(Executor executor);
ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)。
这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。 CompletionService 接口提供的 take() 方法获取一个 Future 对象
Copy ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2 , 10 , 5 , TimeUnit . MINUTES ,
new ArrayBlockingQueue <>( 30 ) , Executors . defaultThreadFactory() , new AbortPolicy()) ;
threadPoolExecutor . prestartAllCoreThreads (); // 预启动所有核心线程
ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(threadPoolExecutor);
executorCompletionService . submit (() -> {
return new Random() . nextInt ();
});
executorCompletionService . submit (() -> {
return new Random() . nextInt ();
});
executorCompletionService . submit (() -> {
return new Random() . nextInt ();
});
for ( int i = 0 ; i < 3 ; i ++ ) {
Future < Integer > future = executorCompletionService . take (); //获取从阻塞队列获取一个结果
Integer integer = future . get ();
System . out . println (integer);
}
threadPoolExecutor . shutdown ();
都是和阻塞队列相关的,take()、poll()都是从阻塞队列中获取并移除一个元素;它们的区别在于如果阻塞队列是空的,那么调用 take()方法的线程会被阻塞,而 poll() 方法会返回 null 值。 poll(long timeout, TimeUnit unit)方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit时间,阻塞队列还是空的,那么该方法会返回 null 值。