任务执行

任务执行

  1. 找到明确的任务边界
  2. 明确任务的执行策略
    通过将任务的提交和执行解耦开来,从而无须太大得困难就可以为某种类型的任务指定和修改执行策略。

1 任务

任务通常是一些抽象的且离散的工作单元。
在理想的情况下,各个任务之间是相互独立的。

2 阻塞与中断

2.1 阻塞

阻塞的原因有很多,如:

  1. 等待IO操作结束
  2. 等待一个锁资源
  3. 等待sleep操作
  4. 等待另一个线程的调用结果

被阻塞的线程必须等待某一个不受它控制的事件发生后才能继续执行。

2.2 中断

如果一个线程被中断,那么它将努力提前结束阻塞状态(通常抛出InterruptedException)。
Thread提供了interrput方法用于中断线程,每个线程都有一个布尔类型的属性,标识中断状态,当中断时设置这个状态。
中断时一个协作机制,当线程A中断线程B时,A仅仅要求B在执行到某个可以暂停的地方停止正在执行的操作,前提是如果线程B愿意停止下来。

2.2.1 中断响应

接受中断后,必须做出响应,不然调用链上层无法对中断采取处理措施。

2.2.1.1 传递Exception

将InterruptedException传递给调用者

2.2.1.2 恢复中断

捕获异常后,调用线程上的interrupt方法重新设置中断状态。

3 任务的执行策略

各种执行策略都是资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求。

3.1 单线程串行执行

串行机制无法提高吞吐率和响应速度。
一般使用比较少,如:
GUI中的事件线程

3.2 显示的创建线程

为每个任务创建一个新的线程来提供服务,从而实现更高的响应性。
问题:

  1. 线程生命周期的开销大
  2. 无法控制资源的消耗
  3. 稳定性差(OutOfMemmery)

3.3 线程池

提供了生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
线程池和工作队列密切相关,其中工作队列中保存了所有等待执行的任务,工作线程从工作队列中获取一个任务,执行任务,然后返回线程池等待下一个任务。

4 线程池

只有任务是同等类型的并且相互独立时,线程池的性能才能达到最佳。

4.1 任务和执行策略间的隐式耦合

4.1.1 依赖性任务

线程饥饿死锁。
如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,便会发生线程饥饿死锁。

4.1.2 使用线程封闭机制的任务

线程封闭要求Executro是单线程(UI线程)

4.1.3 对响应敏感的任务

UI线程,对响应敏感的线程,需要特殊对待。

4.1.4 使用ThreadLocal的任务

只有在线程本地值得生命周期受限于任务的生命周期时,才可以在线程池中使用ThreadLocal。
线程池中的线程不允许使用ThreadLocal在线程间传递值。

4.1.5 运行时间较长的任务

如果任务执行时间很长,那么即使不出现死锁,线程池的响应性也会变得糟糕。

4.2 线程池的大小

线程池的理想大小取决于被提交任务的类型以及所部署系统的特征。
一般只需要避免线程池过大或过小即可。
如果需要执行不同类别的任务,并且他们之间的行为相差很大,那么应该使用多个线程池,并根据各自的负载情况进行调整。

4.2.1 计算密集型

在拥有N个处理核心时,当线程数为N+1时,一般能实现最优的利用率。

4.2.2 IO密集型

对于包含太多IO操作或者其他阻塞操作的任务,由于线程并不会一直运行,因此线程池规模应该更大些。

4.2.3 基准测试

在某个基准负载下,分别设置不同大小的线程池来运行应用程序,观察CPU利用情况,进而选择合适的线程大小。

4.2.4 资源计算

计算每个任务对资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得到的结果就是线程池大小的上限。
资源通常包括:内存、文件句柄、Socket句柄、数据库连接、Cache连接等。

4.3 ThreadPoolExecutor核心组件

ThreadPoolExecutor为一些Executor提供了基本的实现,他是一个灵活的、稳定的线程池,允许对其进行各种定制。

4.3.1 线程创建与销毁

线程池的核心大小、最大大小、存活时间,共同负责线程的创建与销毁。
核心大小:也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了得情况下才会创建超过这个数量的线程。
最大大小:表示可同时活动的线程数量的上限。
存活时间:某个线程的空闲时间超过存活时间,那么将被标记为可回收的,当线程池的当前大小大于核心大小时,线程将被销毁。
newFixedThreadPool:核心大小和最大大小一致
newCachedThreadPool:最大大小为Integer.MAX_VALUE

4.3.2 任务队列

如果新请求的到达速率超过线程池的处理速率,那么新到达的请求将积累起来,在线程池中,这些请求由一个Runnable的队列中等待,不会像Thread那样竞争CPU资源。
ThreadPoolExecutor提供一个BlockingQueue来保存等待运行的任务。主要有三种:

  1. 无界队列
  2. 有界队列
  3. 交换队列
    newFixedThreadPool、newSingleThreadPool使用一个无界队列。
    newCachedThreadPool使用交换队列。
    通过使用SynchronosQueue可以避免任务排队,直接将任务从生产者移交到消费者。
    PriorityBlockingQueue为优先队列,可以自定义任务的优先顺序。

4.3.3 饱和策略

当有节队列被填充满后,饱和策略开始发挥作用,如果一个任务被提交给一个已经关闭的Executor,也会使用饱和策略。
常见饱和策略:

  1. 中止
    直接抛出未检测的RejectedException
  2. 调用者运行
    将任务回退给调用者,从而降低新任务的流量(可以使服务器在高负载下实现一种平缓的性能降低)
  3. 抛弃
    直接抛弃该任务
  4. 抛弃最旧任务
    抛弃下一个将被执行的任务。
    自定义策略,通过对Executor进行封装,使用信号量(Semaphore)来控制任务提交的速率

4.3.4 线程工厂

当线程池需要一个线程时,将会调用该方法生产线的线程。
自定义线程工厂,从而完成线程的订制。
可订制内容包括:

  1. 线程名称
  2. UncaughtExceptionHandler
  3. 是否为守候线程
  4. 访问权限等

4.3.5 其他定制

调用完ExecutorThreadPool构造函数后,可以使用Setter方法进行订制(核心大小、最大大小、存活时间、线程工厂、饱和策略等)
Executors的unconfigurableExecutorService工厂方法,对一个ExecutorService进行包装,使其只暴露ExecutorService的方法,而不能对其进行配置(newSingleThreadExecutor方法)。

4.3.6 子类扩展

扩展方法:

  1. beforeExecute
    如果beforeExecute抛出一个RuntimeExeption,那么任务将不会被执行,afterExecute也不会执行
  2. afterExecute
    无论任务从run中正常返回,还是抛出一个异常而返回,afterExecute都会执行
  3. terminated
    当线程池完成关闭操作时将会调用terminated,也就是所有的任务已经完成并且所有的工作者线程也都已经关闭。

4.4 API

4.4.1 Executors

4.4.1.1 newFixedThreadPool

创建固定长度的线程池,每当提交一个任务时就创建一个线程,直到到达线程池的最大数量,以后线程池的规模将不再变化。

4.4.1.2 newCachedThreadPool

创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求,则回收线程;如果需求增加,则添加新线程,线程的规模不存在限制。

4.4.1.3 newSingleThreadPool

创建一个单线程的Executor,当线程异常终止后,会创建另一个线程来替代。

4.4.1.4 newScheduledThreadPool

创建一个固定长度的线程池,而且以延时或定时方式来执行任务。

4.4.2 Executor

Executor基于生产者消费者模式,提交任务的操作为生产者,执行任务的操作为消费者。

4.4.3 ExecutorService

扩展自Executor,添加了生命周期管理相关方法。
生命周期:

  1. 运行
  2. 关闭
  3. 终止
4.4.3.1 shutdown

当调用shutdown方法后,不再接受新的任务,同时等待已经提交的任务执行完成(包括那些还没有执行的任务)

4.4.3.2 shutdownNow

shutdownNow将执行粗暴的关闭,它将尝试取消所有正在运行的任务,并不再启动队列中尚未执行的任务。

4.4.3.3 awaitTermination

awaitTermination方法用来等待到达终止状态。

4.4.3.4 isTerminated

isTerminated方法用于查询是否终止。

4.4.3.5 Callable&Future

携带结果的任务

4.4.3.5.1 Callable

call() throws Exception

4.4.3.5.2 Future

Get

Get操作取决于当前的任务状态:

  1. 任务成功完成,返回值
  2. 任务异常完成,返回ExecuteException,包装task的异常
  3. 任务超时,返回TimeoutException
  4. 任务取消,返回CallcellationException

Cancel

取消任务

isCancelled

是否取消

isDone

是否完成。

4.4.3.6 invokeAll

invokeAll按照任务集合中俄迭代顺序将所有的Future添加到返回集合中,从而使调用者能将Future和Callable关联起来。
当所有的任务都执行完成、或被中断、或超时,invokeAll返回。
当超时时,所有未完成的任务都会被取消,客户端代码可以通过get或isDone或isCancelled等方法判断具体的状态。

4.4.4 ScheduledService

扩展自ExecutorService,添加延迟任务和周期任务。

4.4.4.1 Timer

Timer只会创建一个线程,如果某个任务执行时间太长,将会破坏其他TimerTask的定时准确性。
当一个TimerTask抛出异常将终止Timer线程,Timer不会恢复线程的执行,而是任务所有的任务都被取消了。

4.4.4.2 DelayQueue

如果需要构建自己的调度服务,那么可以使用DelayQueue,他实现了BlockingQueue,并为ScheduledThreadExecutorService提供调度功能。
DelayQueue管理一组Delayed对象,每个Delayed对象都有一个相应的延时时间,在DelayQueue中,只用某个元素逾期后,才能从DelayQueue重执行take操作。
从DelayQueue中返回的对象将根据他们延时时间进行排序。

4.4.5 CompletionService

将Executor和BlockingQueue结合在一起,任务完成时,自动将Future放到BlockingQueue中。
CompletionService cService = new ExecutorCompletionService(executor);
cService.submit(task);
whie(true){
Future future = cService.take();
}

多个ExecutorCompletiongService可以共享一个Executor

5 任务取消或关闭

Java没有提供任何机制来安全的终止线程,但它提供了中断,这是一种协作机制,可以使一个线程终止另一个线程的当前工作。
当线程需要终止时,首先会清除当前正在运行的工作,然后结束,因为任务本身的代码比发出取消请求的代码更清除如何执行清除工作。

5.1 线程中断

每个线程都有一个bool类型的中断状态,当中断线程时,这个状态将会设置为true,由线程在下一个合适的时刻中断自己。
interrupt:中断自己
isInterrupted:检测中断状态
interrupted:清除中断状态,并返回之前的值。

5.2 中断策略

任务和线程都应该有自己的中断策略。
最合理的中断策略是某种形式的线程级取消操作或服务级取消操作,尽快退出,并在必要时进行清理,通知某个所有者线程已经退出。
一个中断请求可以有一个或多个接收者,可能同时意味着“取消当前任务”和“关闭工作者线程”。
一般情况下,任务不会在自己拥有的线程中运行,而是在某个服务拥有的线程中执行。对于非线程所有者的代码来说,应该小心保管中断状态,这样拥有线程的代码才能对中断做出响应。

5.2.1 任务中断策略

任务不应该对执行任务的线程的中断策略做任何的假设。
执行取消操作的代码也不应该对线程的中断策略做出假设。

5.2.1.1 传递异常

抛出InterruptedException中断响应,尽快退出执行流程,并中断信息以Exception的方式传递给调用者。

5.2.1.2 恢复中断状态

任务应该保存线程的中断状态,在执行完清理工作后,对中断线程进行恢复(调用interrupt来恢复中断状态)

5.2.2 线程中断策略

线程应该只由它的所有者中断,所有者可以将线程的中断策略信息封装在某个合适的取消机制中。

5.3 任务取消

5.3.1 volatile的状态位

自定义一个状态位,在task循环过程总对其进行检测,并作为退出的信号。

5.3.2 中断状态位

同时兼顾状态位检测和可中断阻塞两种情况.

5.3.3 Future

Future拥有一个cancel方法,如果参数为true则中断运行任务的线程,如果为false则意味着“任务没启动,就不要运行他”。
因此在编写任务时,建议把中断作为取消请求的命令,这样可以通过cancel对其进行取消动作。

5.3.4 不可中断的阻塞

  1. java 同步socket io,InputStream、OutputStream中的read、write方法都不会响应中断,但通过关闭底层Socket,可以抛出SocketException从而退出阻塞
  2. java.io包中的同步io,当中断一个InteruptibleChannel时,将抛出CloseByInterruptException
  3. Selector异步io,调用close或wakeup时会抛出ClosedSelectorException
  4. 锁,Lock类中提供了lockInterruptibly方法,允许方法响应中断请求。
5.3.4.1 改写Thread的interrupt方法

使任务和执行线程绑定,通过重新interrupt方法,主动关闭底层资源,从而退出阻塞。

5.3.4.2 newTaskFor

newTaskFor方法返回一个RunnableFuture接口,通过定制标示任务的Future可以改变Future.cancel的行为。

5.4 停止线程服务

除非拥有某个线程,否则不能对该线程进行操作。
服务应该提供生命周期方法来关闭自己以及自己所拥有的线程。

对于持有线程的服务,只有服务存在的时间大于创建线程方法存在时间,那么就应该提供生命周期方法。

5.4.1 自己维护(单线程)服务

当取消一个生产者-消费者操作时,需要同时取消生产者和消费者。
需要状态控制变量和终端协同操作,使用起来比较麻烦。

5.4.2 毒丸对象

毒丸对象只放到队列中的一个特殊对象,当得到这个对象是,应该立即停止运行。
只有当生产者和消费者的数量已知的情况下才能使用毒丸对象。

5.4.3 只运行一次的服务

一般通过在方法范围内维护一个ExcutorService实现,通过ExecutorService的生命周期方法管理资源

5.4.4 ExecutorService

ExecutorService提供了一套生命周期管理方法,简化线程的管理。

5.4.4.1 shutdown

比较优雅的关闭方案。
首先Executor将会使用拒绝策略拒绝新提交的任务,然后会等待正在运作和任务队列中的任务全部运行完成,最后在优雅的关闭。

5.4.4.2 shutdownNow

比较暴力,直接中断正在运行的任务,返回任务队列中的待运行任务。

5.5 处理非正常终止的线程

导致线程异常退出的主要原因是RuntimeException

5.5.1 Try–Catch

使用try catch finally块主动捕获RuntimeException

5.5.2 UncaughtExceptionHandler

使用Thread的UncaughtExceptionHandler API设置异常处理逻辑。
它能检测某个线程由于未捕获异常而终结的情况。

5.5.3 ExecutorService

5.5.3.1 定制Runnable或Callable

使用try块主动防御

5.5.3.2 定制ThreadFactory

使用UncaughtExceptionHandler API定制线程

execute

提交的任务有效

submit

提交的任务无效。
service会对callable中抛出的异常进行处理,将其设置在Future(ExecuteException)中,在get方法被调用时,重新抛出

5.6 JVM关闭

5.6.1 关闭钩子

在JVM正常退出前,会主动调用已注册的关闭钩子(Shutdown Hook),关闭钩子指通过Runtime.addShutdownHook注册的线程。
JVM并不包装各个钩子的调用顺序,及钩子线程会并发执行(需要合适的同步控制,以避免发生数据不一致或死锁),当所有的钩子线程都执行结束,JVM将运行终结器,然后停止。
如果关闭钩子或终结器没有执行完成,那么正常关闭进行会被挂起。
如果对关闭存在顺序要求,可以将多个服务封装在一个Thread中,从而保持顺序。

5.6.2 终结器

垃圾回收器会对那些定义了finalize方法的对象进行特殊处理,在回收器释放他们前,调用他们的finalize方法,从而保证一些持久化资源被释放。
大多数情况下,鼓励用finally代码和显示的close对资源进行管理。

5.6.3 守护线程

线程分为守护线程和普通线程。
JVM启动时创建的所有线程中,除主线程外,其他的全部为守护线程。
当创建一个线程时,默认情况下,线程会自动继承创建他的线程状态。
当一个线程退出时,JVM会检查其他运行中的线程,如果这些线程全部为守护线程,那么JVM会正常退出。

wenxinzizhu wechat
扫一扫,添加我的微信,一起交流共同成长(备注为技术学习)