1 准备知识 介绍线程池之前先简要了解一下Executor,ExecutorService,Future,Callable,Executors是什么,和线程池又有什么关系
1.1 Executor 它是线程池顶级接口。它定义了一个方法**void execute(Runnable)**。 这个方法是用于处理任务的一个服务方法,调用者提供Runnable接口的实现,线程池通过线程执行这个Runnable,该服务方法是无返回值的
1.2 ExecutorService ExecutorService是Executor接口的子接口,它提供了一个新的服务方法submit,是有返回值的,返回值类型为Future类型(关于Future见1.3 ),它提供返回值主要是由Callable的call方法提供返回值(Callable见1.4 ) ,所有的线程池类型都实现这个接口
1.3 Future 顾名思义,Future->未来,代表线程任务执行结束后的结果。 获取线程执行结果的方式是通过get方法获取的,get有两种方式,有参和无参 无参T get()
->阻塞等待线程执行结束,并得到结果。 有参T get(long, TimeUnit)
->阻塞固定时长,等待线程执行结束后的结果,如果在阻塞时长范围内,线程未执行结束,抛出异常。
1.4 Callable Callable类似Runnable接口,它有一个call方法,它的作用和Runnable中的run方法完全一致,但也有区别 Callable的call->有返回值,可以抛出任意异常 Runnable的run-> 无返回值,不能抛出未检查的异常 call方法的返回值就是Future中get方法的返回值
1.5 Executors Executors是一个工具类,类似Collection和Collections的关系,可以更简单的创建若干种线程池,通过Executors可以直接得到想要的线程池
2 线程池 线程池可以自动创建也可以手动创建,自动创建体现在Executors工具类中,常见的可以创建newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool;
手动创建体现在可以灵活设置线程池的各个参数,体现在代码中即ThreadPoolExecutor类构造器上各个实参的不同:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static ExecutorService newFixedThreadPool (int var0) { return new ThreadPoolExecutor (var0, var0, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue ()); } public static ExecutorService newSingleThreadExecutor () { return new Executors .FinalizableDelegatedExecutorService(new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue ())); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , 2147483647 , 60L , TimeUnit.SECONDS, new SynchronousQueue ()); } public static ScheduledExecutorService newScheduledThreadPool (int var0) { return new ScheduledThreadPoolExecutor (var0); }
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {……}
线程池中的七大参数如下: (1)corePoolSize:线程池中的常驻核心线程数。
(2)maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值大于等于1。
(3)keepAliveTime:多余的空闲线程存活时间,当空间时间达到keepAliveTime值时,多余的线程会被销毁直到只剩下corePoolSize个线程为止。
(4)unit:keepAliveTime的单位。
(5)workQueue:任务队列,被提交但尚未被执行的任务。
(6)threadFactory:表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可。
(7)handler:拒绝策略,表示当线程队列满了并且工作线程大于等于线程池的最大显示数(maxnumPoolSize)时如何来拒绝请求执行的runnable的策略。
流程分析
线程池中线程数小于corePoolSize时,新任务将创建一个新线程执行任务,不论此时线程池中存在空闲线程;
线程池中线程数达到corePoolSize时,新任务将被放入workQueue中,等待线程池中任务调度执行;
当workQueue已满,且maximumPoolSize>corePoolSize时,新任务会创建新线程执行任务;
当workQueue已满,且提交任务数超过maximumPoolSize,任务由RejectedExecutionHandler处理;
当线程池中线程数超过corePoolSize,且超过这部分的空闲时间达到keepAliveTime时,回收该线程;
如果设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收;
一:corePoolSize 详细描述
(1)在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近视理解为今日当值线程。 (2)当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中。
二:最大线程数(maximumPoolSize):该参数定义了一个线程池中最多能容纳多少个线程。当一个任务提交到线程池中时,如果线程数量达到了核心线程数,并且任务队列已满,不能再向任务队列中添加任务时,这时会检查任务是否达到了最大线程数,如果未达到,则创建新线程,执行任务,否则,执行拒绝策略。可以通过源码来看一下。如下:可以看出,当调用submit(Runnable task)方法,将任务提交到线程池中时,会调用execute()方法去执行任务,在该方法内,会进行核心线程数,任务队列的判断,最后决定是执行或者是拒绝。总结起来就是:最大线程数参数,是在已经达到核心线程池参数,并且任务队列已经满的情况下,才去判断该参数。
三:keepAliveTime 详细描述
只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,直到线程中的线程数不大于corepoolSIze。
四:系统默认的拒绝策略有以下几种:
AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。
DiscardPolicy:直接抛弃不处理。
DiscardOldestPolicy:丢弃队列中最老的任务。
CallerRunsPolicy:将任务分配给当前执行execute方法线程来处理。
线程池状态: Running, ShuttingDown, Termitnaed
Running - 线程池正在执行中。活动状态。
ShuttingDown - 线程池正在关闭过程中。优雅关闭。一旦进入这个状态,线程池不再接收新的任务,处理所有已接收的任务,处理完毕后,关闭线程池。
Terminated - 线程池已经关闭。
2.1 固定容量线程池FixedThreadPool FixedThreadPool是固定容量线程池,创建线程池的时候容量固定,使用的是BlockingQueue作为任务的载体,线程池默认的容量上限是Integer.MAX_VALUE
特点:当任务数量大于线程池容量的时候,没有运行的任务保存在任务队列中,当线程有空闲的,自动从队列中取出任务执行
使用场景: 大多数情况下,使用的线程池,首选推荐FixedThreadPool。OS系统和硬件是有线程支持上限。不能随意的无限制提供线程池。
下面是一个无返回值的小案例: 案例中创建了一个线程池,容量为5,执行6个任务,分析调用shutdown方法后,分析任务的执行情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package com.bernardlowe.concurrent.t08;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class Test_02_FixedThreadPool { public static void main (String[] args) { ExecutorService service = Executors.newFixedThreadPool(5 ); for (int i = 0 ; i < 6 ; i++){ service.execute(new Runnable () { @Override public void run () { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor" ); } }); } System.out.println("初始状态:" + service); System.out.println("开始调用shutdown方法=====" ); service.shutdown(); System.out.println("是否terminated:" + service.isTerminated()); System.out.println("是否shutdown:" + service.isShutdown()); System.out.println("shutdown后的状态:" + service); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("2秒过后任务全部执行完====" ); System.out.println("是否terminated:" + service.isTerminated()); System.out.println("是否shutdown:" + service.isShutdown()); System.out.println("任务全部执行完过后状态:" + service); } }
结果: 从图中可以分析出以下几个过程 在初始状态:五个执行线程,1个任务在等待队列,0个完成任务 ↓ 调用shutdown方法后:线程池未关闭(terminated为false),调用了shutdown(不再接收新任务),0个完成任务 ↓ 两秒后任务执行完毕:线程池已关闭(terminated为true),调用了shutdown(不再接收新任务),6个完成任务
下面是一个有返回值的小案例: 案例中创建了一个线程池,容量为1,submit方法传了一个Callable,future通过get获取线程的返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.bernardlowe.concurrent.t08;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;import java.util.concurrent.TimeUnit;public class Test_03_Future { public static void main (String[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newFixedThreadPool(1 ); Future<String> future = service.submit(new Callable <String>() { @Override public String call () { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } return Thread.currentThread().getName() + " - test executor" ; } }); System.out.println("线程是否结束: " + future.isDone()); System.out.println("call方法的返回值: " + future.get()); System.out.println("线程是否结束: " + future.isDone()); service.shutdown(); } }
结果:
2.2 CachedThreadPool 缓存的线程池, 容量不限(Integer.MAX_VALUE),自动扩容 容量管理策略:如果线程池中的线程数量不满足任务执行,创建新的线程。每次有新任务无法即时处理的时候,都会创建新的线程。当线程池中的线程空闲时长达到一定的临界值(默认60秒),自动释放线程,这里通过Executors.newCachedThreadPool()方法得到的线程池无法修改空闲时间,具体原因见下图,但可以通过自定义线程池ThreadPoolExecutor修改,具体方法见2.5,这里就不解释了
应用场景: 内部应用或测试应用。
内部应用,有条件的内部数据瞬间处理时应用,如:电信平台夜间执行数据整理(有把握在短时间内处理完所有工作,且对硬件和软件有足够的信心)。
测试应用,在测试的时候,尝试得到硬件或软件的最高负载量,用于提供FixedThreadPool容量的指导
案例演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.bernardlowe.concurrent.t08;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class Test_05_CachedThreadPool { public static void main (String[] args) { ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0 ; i < 5 ; i++){ service.execute(new Runnable () { @Override public void run () { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor" ); } }); } System.out.println(service); try { TimeUnit.SECONDS.sleep(65 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(service); } }
2.3 计划任务线程池ScheduledThreadPool ScheduledThreadPool是计划任务线程池,可以根据计划自动执行任务的线程池,底层实现是一个DelayedWorkQueue,它的一个主要方法scheduleAtFixedRate 有以下几个参数:
command - 要执行的任务
initialDelay - 第一次任务执行的间隔。
period - 多次任务执行的间隔。
unit - 多次任务执行间隔的时间单位。
案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.bernardlowe.concurrent.t08;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class Test_07_ScheduledThreadPool { public static void main (String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(3 ); System.out.println(service); service.scheduleAtFixedRate(new Runnable () { @Override public void run () { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0 , 300 , TimeUnit.MILLISECONDS); } }
2.4 单一容量的线程池SingleThreadExecutor 单一容量的线程池,用法和FixedThreadPool类似,但和newFixedThreadPool不一样的是newSingleThreadExecutor创建的线程池又被一个FinalizableDelegatedExecutorService包装了一下 总结一下SingleThreadExecutor:
单线任务处理的线程池
shutdown方法必然会被调用
不具备ThreadPoolExecutor所有功能的线程池 具体可以看看这篇文章:https://www.jianshu.com/p/2b7d853322bb
2.5 分支合并线程池ForkJoinPool 分支合并线程池(mapduce类似的设计思想),可以递归完成复杂任务,适合用于处理复杂任务 要求可分支合并的任务必须是ForkJoinTask类型的子类型 ForkJoinTask类型提供了两个抽象子类型: RecursiveTask有返回结果的分支合并任务 RecursiveAction无返回结果的分支合并任务
案例: 这个案例做了一个以ForkJoinPool实现的数据累加,当计算数字区间大于MAX_SIZE=50000时,开启新的线程任务的计算,最后合并统计结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package com.bernardlowe.concurrent.t08;import java.io.IOException;import java.util.Random;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.Future;import java.util.concurrent.RecursiveTask;public class Test_08_ForkJoinPool { final static int [] numbers = new int [1000000 ]; final static int MAX_SIZE = 500000 ; final static Random r = new Random (); static { for (int i = 0 ; i < numbers.length; i++){ numbers[i] = r.nextInt(1000 ); } } static class AddTask extends RecursiveTask <Long>{ int begin, end; public AddTask (int begin, int end) { this .begin = begin; this .end = end; } protected Long compute () { if ((end - begin) < MAX_SIZE){ long sum = 0L ; for (int i = begin; i < end; i++){ sum += numbers[i]; } return sum; }else { int middle = begin + (end - begin)/2 ; AddTask task1 = new AddTask (begin, middle); AddTask task2 = new AddTask (middle, end); task1.fork(); task2.fork(); return task1.join() + task2.join(); } } } public static void main (String[] args) throws InterruptedException, ExecutionException, IOException { long result = 0L ; for (int i = 0 ; i < numbers.length; i++){ result += numbers[i]; } System.out.println(result); ForkJoinPool pool = new ForkJoinPool (); AddTask task = new AddTask (0 , numbers.length); Future<Long> future = pool.submit(task); System.out.println(future.get()); } }
结果:该任务分类四个线程任务进行计算,最后汇总
2.5 ThreadPoolExecutor ThreadPoolExecutor线程池的底层实现,除ForkJoinPool外,其他常用线程池底层都是使用ThreadPoolExecutor实现的,其中有一个构造方法如下:
corePoolSize:核心容量,创建线程池的时候,默认有多少线程。也是线程池保持的最少线程数
maximumPoolSize: 最大容量,线程池最多有多少线程
keepAliveTime: 生命周期,0为永久。当线程空闲多久后,自动回收
unit: 生命周期单位,为生命周期提供单位,如:秒,毫秒
workQueue 任务队列,阻塞队列。注意,泛型必须是Runnable
案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package com.bernardlowe.concurrent.t08;import java.util.ArrayList;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Test_09_ThreadPoolExecutor { public static void main (String[] args) { ExecutorService service = new ThreadPoolExecutor (5 , 5 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); for (int i = 0 ; i < 6 ; i++){ service.execute(new Runnable () { @Override public void run () { try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor" ); } }); } System.out.println(service); service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); } }
更多精彩内容:mrxccc