抽时间补上JVM基础中的最后一块拼图,也是很多同学一直比较迷糊的板块,按传统本文还是围绕基础展开做为一个入门级的指引,梳理下思路,感兴趣的小伙伴可以深入了解其中某个的知识点。
简单点来讲理解线程无非就是要了解:1、核心是保证共享数据的安全;2、其次是线程执行的调度。其它所有的知识点都是围绕上述两点展开的场景适配方法和性能优化策略。PS:本文中部分内容借助了原一同事大神的笔记内容。
一、基础知识
本节简单介绍下线程的基本知识,掌握一下线程执行的基本原理。整体内容如下图所示:
1.1、线程调度
1.1.1、抢占式调度
抢占式调度指的是每条线程执行的时间、线程的切换都由系统控制,系统控制指的是在系统某种运行机制下,可能每条线程都分同样的执行时间片,也可能是某些线程执行的时间片较长,甚至某些线程得不到执行的时间片。在这种机制下,一个线程的堵塞不会导致整个进程堵塞。
1.1.2、协同式调度
协同式调度指某一线程执行完后主动通知系统切换到另一线程上执行,这种模式就像接力赛一样,一个人跑完自己的路程就把接力棒交接给下一个人,下个人继续往下跑。线程的执行时间由线程本身控制,线程切换可以预知,不存在多线程同步问题,但它有一个致命弱点:如果一个线程编写有问题,运行到一半就一直堵塞,那么可能导致整个系统崩溃。
1.1.3、线程让出CPU的情况
- 当前运行线程主动放弃 CPU,JVM 暂时放弃 CPU 操作(基于时间片轮转调度的 JVM 操作系统);
- 统不会让线程永久放弃 CPU,或者说放弃本次时间片的执行权),例如调用 yield()方法;
- 当前运行线程因为某些原因进入阻塞状态,例如阻塞在 I/O 上;
- 当前运行线程结束,即运行完 run()方法里面的任务;
1.2、线程调度算法
1.2.1、优先调度算法
先来先服务调度算法(FCFS)
当在作业调度中采用该算法时,每次调度都是从后备作业队列中选择一个或多个最先进入该队列的作业,将它们调入内存,为它们分配资源、创建进程,然后放入就绪队列。在进程调度中采用 FCFS 算法时,则每次调度是从就绪队列中选择一个最先进入该队列的进程,为之分配处理机, 使之投入运行。该进程一直运行到完成或发生某事件而阻塞后才放弃处理机,特点是:算法比较简单,可以实现基本上的公平。
短作业(进程)优先调度算法
短作业优先(SJF)的调度算法是从后备队列中选择一个或若干个估计运行时间最短的作业,将它们调入内存运行。而短进程优先(SPF)调度算法则是从就绪队列中选出一个估计运行时间最短的进程,将处理机分配给它,使它立即执行并一直执行到完成,或发生某事件而被阻塞放弃处理机时再重新调度,该算法未照顾紧迫型作业,问题是有些线程可能永远无法执行。
1.2.1、高优先权优先调度算法
为了照顾紧迫型作业,使之在进入系统后便获得优先处理,引入了最高优先权优先(FPF)调度算法。当把该算法用于作业调度时,系统将从后备队列中选择若干个优先权最高的作业装入内存。当用于进程调度时,该算法是把处理机分配给就绪队列中优先权最高的进程。
非抢占式优先权算法
在这种方式下,系统一旦把处理机分配给就绪队列中优先权最高的进程后,该进程便一直执行下去,直至完成或因发生某事件使该进程放弃处理机时。这种调度算法主要用于批处理系统中,也可用于某些对实时性要求不严的实时系统中。
抢占式优先权调度算法
在这种方式下,系统同样是把处理机分配给优先权最高的进程,使之执行。但在其执行期间,只要又出现了另一个其优先权更高的进程,进程调度程序就立即停止当前进程(原优先权最高的进程)的执行,重新将处理机分配给新到的优先权最高的进程。显然,这种抢占式的优先权调度算法能更好地满足紧迫作业的要求,故而常用于要求比较严格的实时系统中,以及对性能要求较高的批处理和分时系统中。
高响应比优先调度算法
在批处理系统中,短作业优先算法是一种比较好的算法,其主要的不足之处是长作业的运行得不到保证。如果我们能为每个作业引入前面所述的动态优先权,并使作业的优先级随着等待时间的增加而以速率 a 提高,则长作业在等待一定的时间后,必然有机会分配到处理机。该优先权的变化规律可描述为:
R = (等待时间+要求服务时间)/要求服务时间 = 响应时间/要求服务时间
- 如果作业的等待时间相同,则要求服务的时间愈短,其优先权愈高,因而该算法有利于短作业。
- 当要求服务的时间相同时,作业的优先权决定于其等待时间,等待时间愈长,其优先权愈高,因而它实现的是先来先服务。
- 对于长作业,作业的优先级可以随等待时间的增加而提高,当其等待时间足够长时,其优先级便可升到很高,从而也可获得处理机。简言之,该算法既照顾了短作业,又考虑了作业到达的先后次序,不会使长作业长期得不到服务。因此,该算法实现了一种较好的折衷。当然,在利用该算法时,每要进行调度之前,都须先做响应比的计算,这会增加系统开销。
1.2.3、基于时间片的轮转调度算法
时间片轮转法
在早期的时间片轮转法中,系统将所有的就绪进程按先来先服务的原则排成一个队列,每次调度时,把 CPU 分配给队首进程,并令其执行一个时间片。时间片的大小从几 ms 到几百 ms。当执行的时间片用完时,由一个计时器发出时钟中断请求,调度程序便据此信号来停止该进程的执行,并将它送往就绪队列的末尾;然后,再把处理机分配给就绪队列中新的队首进程,同时也让它执行一个时间片。这样就可以保证就绪队列中的所有进程在一给定的时间内均能获得一时间片的处理机执行时间。
多级反馈队列调度算法
应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。例如,第二个队列的时间片要比第一个队列的时间片长一倍,......,第 i+1 个队列的时间片要比第 i 个队列的时间片长一倍。
当一个新进程进入内存后,首先将它放入第一队列的末尾,按 FCFS 原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按 FCFS 原则等待调度执行;如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,......,如此下去,当一个长作业(进程)从第一队列依次降到第 n 队列后,在第 n 队列便采取按时间片轮转的方式运行。
仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第 1~(i-1)队列均空时,才会调度第 i 队列中的进程运行。如果处理机正在第 i 队列中为某进程服务时,又有新进程进入优先权较高的队列(第 1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的处理机,即由调度程序把正在运行的进程放回到第 i 队列的末尾,把处理机分配给新到的高优先权进程。在多级反馈队列调度算法中,如果规定第一个队列的时间片略大于多数人机交互所需之处理时间时,便能够较好的满足各种类型用户的需要。
1.3、线程上下文切换
巧妙地利用了时间片轮转的方式, CPU 给每个任务都服务一定的时间,然后把当前任务的状态保存下来,在加载下一任务的状态后,继续服务下一任务,,任务的状态保存及再加载, 这段过程就叫做上下文切换。时间片轮转的方式使多个任务在同一颗 CPU 上执行变成了可能。
1.3.1、术语解释
进程
(有时候也称做任务)是指一个程序运行的实例。在 Linux 系统中,线程就是能并行运行并且与他们的父进程(创建他们的进程)共享同一地址空间(一段内存区域)和其他资源的轻量级的进程。
上下文
是指某一时间点 CPU 寄存器和程序计数器的内容。
寄存器
指CPU 内部的数量较少但是速度很快的内存(与之对应的是 CPU 外部相对较慢的 RAM 主内存)。寄存器通过对常用值(通常是运算的中间值)的快速访问来提高计算机程序运行的速度。
程序计数器
一个专用的寄存器,用于表明指令序列中 CPU 正在执行的位置,存的值为正在执行的指令的位置或者下一个将要被执行的指令的位置,具体依赖于特定的系统。
PCB-“切换桢”
上下文切换可以认为是内核(操作系统的核心)在 CPU 上对于进程(包括线程)进行切换,上下文切换过程中的信息是保存在进程控制块(PCB, process control block)中的。PCB 还经常被称作“切换桢”(switchframe)。信息会一直保存到 CPU 的内存中,直到他们被再次使用。
1.3.2、上下文切换的活动
挂起一个进程,将这个进程在 CPU 中的状态(上下文)存储于内存中的某处。在内存中检索下一个进程的上下文并将其在 CPU 的寄存器中恢复。跳转到程序计数器所指向的位置(即跳转到进程被中断时的代码行),以恢复该进程在程序
1.3.3、引起线程上下文切换的原因
- 当前执行任务的时间片用完之后,系统 CPU 正常调度下一个任务;
- 当前执行任务碰到 IO 阻塞,调度器将此任务挂起,继续下一任务;
- 多个任务抢占锁资源,当前任务没有抢到锁资源,被调度器挂起,继续下一任务;
- 用户代码挂起当前任务,让出 CPU 时间;
- 硬件中断;
1.4、与jvm相关的内容
1.4.1、jvm线程调度的方式
java 使用的线程调使用抢占式调度,Java 中线程会按优先级分配 CPU 时间片运行,且优先级越高越优先执行,但优先级高并不代表能独自占用执行时间片,可能是优先级高得到越多的执行时间片,反之,优先级低的分到的执行时间少但不会分配不到执行时间。
1.4.2、jvm的线程监控
java所使用的同步机制是监视器,它支持互斥和协作两类线程,互斥是通过锁来实现的,协作是通过wait和notify来实现的。一个线程的执行一般会为分为5步,如下图:进入监控器、获得监控器、持有监控器、释放监视器、退出监视器。
- 监视器:可以这样理解为一个建筑、里面有一个特别的房间、房间里有一些数据;
- 监视区域:不可再分的代码块,分为方法和代码块;java开发时只需要用synchronized来标识监视区即可,它有三种不同的形态
- 修饰非静态方法时,监视器锁是当前对象;
- 修饰静态方法时,监控器锁是当前类的类对象;
- 修饰代码块时,监视器锁是lock对象,synchronize(lock){};
1.4.2.1、协作
又称为等待并唤醒,当A线程wait()时会释放监视器进入一个等待区,需要注意的是wait()后是进入了另一个区域,并不是当初的等待区。一直等到其它线程执行notify命令,然后它就会继续持有此监视器,直到它主动释放。
如果等待区中线程过多,那么当notify时这些等待线程就会发生竞争,所以如果不知道确定的唤醒线程前不要使用notify而用notifyAll。虽说在等待区中可以用队列等算法,但不同的JVM的实现方式不一样,所以程序员不能依赖特定的这种算法或优先级,否则不同平台的表现有可能不一样。也就失去了平台无关性,使程序变的不可控。
1.4.2.2、互斥
类变量和类实例变量分别存放在方法和堆中,这两个区是共享的。所以当有竞争时,就需要锁的机制。在JVM中每个对象(可以理解为用synchronized修饰的代码块)和类(可以理解为用synchronized修饰的方法)都与一个监视器相关联。对于对象其监控保护的是实例变量、对于类其监视保护的是类变量。同时JVM为每个对象和类都关联了一个锁。类锁是用对象锁实现的,当锁住一个类的时候,实际上锁的是类的java.lang.Class对象。锁的释放:正常结束和异常结束时都会释放锁。
一般来讲同步方法会比同步代码块的性能要好,原因是JVM执行时,如果遇到synchronized方法时,就会自动获得类锁并维护一个锁计数器,但synchronized代码块前后都会有进入和离开代码所以其性能会稍差,但他们锁定的范围不太一样。简单的计算建议锁方法;
1.5、守护进程
守护线程也称“服务线程”,他是后台线程,它有一个特性,即为用户线程提供公共服务,在没有用户线程可服务时会自动离开。它有以下几个特点:
- 优先级:守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务;
- 通过 setDaemon(true)来设置线程为“守护线程”;将一个用户线程设置为守护线程的方式是在线程对象创建之前用线程对象的 setDaemon 方法;
- 在 Daemon 线程中产生的新线程也是 Daemon 的;
- 生命周期:守护进程(Daemon)是运行在后台的一种特殊进程。它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件。也就是说守护线程不依赖于终端,但是依赖于系统,与系统“同生共死”。当 JVM 中所有的线程都是守护线程的时候,JVM 就可以退出了;如果还有一个或以上的非守护线程则 JVM 不会退出。
线程则是 JVM 级别的,以 Tomcat 为例,如果你在 Web 应用中启动一个线程,这个线程的生命周期并不会和 Web 应用程序保持同步。也就是说,即使你停止了 Web 应用,这个线程依旧是活跃的;
垃圾回收线程就是一个经典的守护线程,当我们的程序中不再有任何运行的 Thread,程序就不会再产生垃圾,垃圾回收器也就无事可做,所以当垃圾回收线程是 JVM 上仅剩的线程时,垃圾回收线程会自动离开。它始终在低级别的状态中运行,用于实时监控和管理系统中的可回收资源。
二、线程的生命周期
当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。 在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞 (Blocked)和死亡(Dead)5 种状态。尤其是当线程启动以后,它不可能一直"霸占"着 CPU 独自 运行,所以 CPU 需要在多条线程之间切换,于是线程状态也会多次在运行、阻塞之间切换。
2.1、新建(new)
当程序使用 new 关键字创建了一个线程之后,该线程就处于新建状态,此时仅由 JVM 为其分配 内存,并初始化其成员变量的值
2.2、就绪(runnable)
当线程对象调用了 start()方法之后,该线程处于就绪状态。Java 虚拟机会为其创建方法调用栈和程序计数器,等待调度运行。
2.3、运行(running)
如果处于就绪状态的线程获得了 CPU,开始执行 run()方法的线程执行体,则该线程处于运行状态。
2.4、阻塞(blocked)
阻塞状态是指线程因为某种原因放弃了 cpu 使用权,也即让出了 cpu timeslice,暂时停止运行。 直到线程进入可运行(runnable)状态,才有机会再次获得 cpu timeslice 转到运行(running)状 态。阻塞的情况分三种:
- 等待阻塞(o.wait->等待对列):运行(running)的线程执行 o.wait()方法,JVM 会把该线程放入等待队列(waitting queue) 中。
- 同步阻塞(lock->锁池):运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则 JVM 会把该线 程放入锁池(lock pool)中。
- 其他阻塞(sleep/join):运行(running)的线程执行 Thread.sleep(long ms)或 t.join()方法,或者发出了 I/O 请求时, JVM 会把该线程置为阻塞状态。当 sleep()状态超时、join()等待线程终止或者超时、或者 I/O 处理完毕时,线程重新转入可运行(runnable)状态。
2.5、死亡(dead)
线程会以下面三种方式结束,结束后就是死亡状态。
- 正常结束:run()或 call()方法执行完成,线程正常结束。
- 异常结束:线程抛出一个未捕获的 Exception 或 Error。
- 调用 stop:3. 直接调用该线程的 stop()方法来结束该线程—该方法通常容易导致死锁,不推荐使用。
程序中可以直接使用 thread.stop()来强行终止线程,但是 stop 方法是很危险的,就象突然关闭计算机电源,而不是按正常程序关机一样,可能会产生不可预料的结果。
不安全主要是:thread.stop()调用之后,创建子线程的线程就会抛出 ThreadDeatherror 的错误,并且会释放子线程所持有的所有锁。一般任何进行加锁的代码块,都是为了保护数据的一致性,如果在调用thread.stop()后导致了该线程所持有的所有锁的突然释放(不可控制),那么被保护数据就有可能呈现不一致性,其他线程在使用这些被破坏的数据时,有可能导致一些很奇怪的应用程序错误。因此,并不推荐使用 stop 方法来终止线程。
三、线程取消与关闭
本小节也是相当重要的一节知识,一般我们写代码时很少关注线程取消的情况,但有时对于底层系统确实需要这项设计,希望同学们可以学会并掌握其取消技术。java中没有提供任何机制,来安全是强迫线程停止手头的工作,通常来讲要使线程终止通常会有4种方案:
- 正常结束;
- 勾子方法(设置一个标志位,由外部线程来控制,原理是设置一个volatile变量,使线程池不在创建新线程达到平滑关闭的效果;适合一直在运行的长时间任务)
- 中断;
- 阻塞线程用:interrupt()方法会马上抛出异常,捕获到这个线程后break跳出强制关闭;
- 未阻塞线程用:interrupt()方法设置中断标志位,然后在循环时用isInterrupted()来判断中断标志位,其实和自定义标志位一样原理;
- stop或异常强制中止;
以上Thread.stop和Thread.suspend方法存在严重的缺陷不能使用。但每个Thread提供了Interruption中断,一种协作机制来协调线程间的操作和控制。这是推荐的方式。程序不应该立即停止,应该采用中断这种协作机制来处理,正确的做法是:先清除当前进程中的工作,再终止。下面详细介绍下这些方案以及各自适用的场景。
3.1、中断
线程中断是一个协作机制,一个线程给另一个线程发送信号,通知它在方便或可能的情况下停止正在做的工作,去做其他事情 。但实际上,使用中断来处理取消之外的任何事情都是不明智的,中断通常是实现取消最明知的选择,一般在取消方法中设置中断状态。
每个线程都有一个boolean的中断状态,Thread包含一些中断线程的方法:interrupt方法中断目标线程,isInterrupted返回目标线程的中断状态,interrupted用于清除当前线程的中断状态,这是清除中断状态唯一的方法。中断一般不能与可阻塞的函数一起使用。
静态的interrupted方法应该小心使用,它会清除并发线程的中断状态,如果返回了true,必须进行处理,如果想掩盖这个中断,可以抛出 InterruptedException 异常或者再次调用interrupt来保存中断状态。调用interrupt并不意味着必须停止目标线程正在进行的工作,它仅仅传递了请求中断的信息,意味着完成当前任务,保证数据结构的统一,然后在下一周期结束。有一些方法对这样的请求很重视,比如wait,sleep(阻塞方式),当它们接到中断请求时会抛出一个异常,或者进入时中断状态就已经被设置了。所以这两个方法尽量不要用。
下例中在两处用到了检测中断技术,因为put是个阻塞操作,所以在之前检测总比在之后检测性能更好。前提是此时没有消费者线程或是put是个很耗时的操作,像这种对中断状态进行显式的检测会对调用可中断的阻塞方法时很有用外,因为通常我们不能得到期望的响应。
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
//这个例子可以很好的处理阻塞操作的中断问题,通过中断。
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() {
interrupt();
}
}
3.1.2、中断策略
正如需要为任务制定取消策略一样,也应该定制线程中断策略。一个中断策略决定线程如何应对中断请求--当发现中断请求时,它会做什么。
区分任务和线程对中断的反应是很重要的,任务不会在自己拥有的线程中执行,它们借用属于服务的线程,比如线程池,如果代码并不是线程的所有者就应该小心地保存中断状态(如果你给主人打扫房间,主人不在的这段时间你不能把收到的邮件全丢掉,应该收起来待主人回来再处理)。这就是为什么大多数可阻塞的库函数,仅仅抛出InterruptedException做为中断的响应,这也是最合理的策略。
在理中断时应该保存中断状态,也不能简单的是把InterruptedException传递给调用者,应该在它之后恢复中断的状态:Thread.currentThread().interrupt();
当检查到中断请求时,任务不需要放弃所有的事情,可以选择推迟直到更合适的时机。这需要记得它已经被请求过中断了,完成当前正在进行的任务,再抛出中断异常或指明中断,这种技术可以保证数据结构不被破坏。
3.1.3、响应中断
有两种处理InterruptedException的实用策略:传递异常和恢复中断状态,使你的方法也成为可中断的阻塞方法,或者保存中断状态,上层调用者代码能对其进行处理。
如果不想或不能传递 InterruptedException 异常,需要另一种方式保存中断请求,因为大多数代码并不知道它们在哪个线程中运行,并再次调用interrupt来恢复中断状态,而不应该掩盖 InterruptedException 异常,如果你的代码没有相应的处理程序,就不应该在catch中捕获这个异常。过早设置中断可能会引起无限循环。
不可取消的任务在任务退出前保存中断
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// fall through and retry
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
在中断线程之前,应该了解它的中断策略,且不要在外部线程中安排中断。如果要在一个专门的线程中中断任务,这里用到了jion方法,这个方法有不足之处,它如果传一个超时参数,无法确定是由于异常还是因为超时退出的状态。
public class TimedRun2 {
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);
public static void timedRun(final Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;
public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}
void rethrow() {
if (t != null)
throw launderThrowable(t);
}
}
RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
taskThread.join(unit.toMillis(timeout));
task.rethrow();
}
}
3.2、任务取消
当外部代码能在活动自然完成之前,把它更改为完成状态,被称为取消。取消的原因很多种可能,比如:用户请求、限时活动、应用程序设计如此、错误、关闭。java中没有一种绝对安全停止线程的方法,只能选择相互协作的机制,通过协作,使任务和代码遵循一个统一的协议,用来请求取消。
一个可取消的任务必须有取消策略,这个策略是一套程序,规定了不同任务或机制间的协作,保证数据的统一。下面是一个简单的例子:
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
("this")
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
//让上一个程序在1秒后停止,但这并不是严格的一秒,可能存在误差。
static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();//这是一个自定义的非阻塞的方法
}
}
3.2.1、通过Future取消
Future有一个cancel方法,它需要一个boolean参数,它的返回值表示取消尝试是否成功(这仅仅告诉你它是否能接收中断,而不是任务是否检测并处理了中断)。
如果为true并且任务正在一线程中运行,那么这个线程是应该中断的。如果是false并且任务还没启动的话,那这个任务永远不会启动了。除非知道线程的中断策略,否则不应该中断线程,这个例子中cancel何时设置true和false需要考虑。
但任务执行线程是由标准的Executor实现创建的,它实现了一个中断策略,使得任务可以通过中断被取消,这时cancel是安全的。通过Future来中断任务并不影响线程池中其它的线程。在一个专门的线程中中断任务。通过Future来取消任务:
public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 下面任务会被取消
} catch (ExecutionException e) {
// task中抛出的异常,重抛出
throw launderThrowable(e.getCause());
} finally {
// 如果任务已经取消,是无害的
task.cancel(true); // interrupt if running,如果为false表示如果还没有启动的话,不要运行这个任务,用于那些不处理中断的任务。
}
}
3.2.2、处理不可中断阻塞
很多可阻塞的库方法通过提前返回和抛出InterruptedException来实现对中断的响应,这使得构建可以响应取消的任务更加容易。但有些阻塞方法或阻塞机制并不响应中断。但可以通过与中断类似的手段,来确保可以停止这些线程,前提是我们需要清楚地知道线程为什么会阻塞。
下例展现了一项用来封装非标准取消的技术,为了方便终止一个用户的连接或关闭服务器。重写了interrupt方法。
public class ReaderThread extends Thread {
private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
public void interrupt() {
try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuffer(buf, count);
}
} catch (IOException e) { /* Allow thread to exit */
}
}
public void processBuffer(byte[] buf, int count) {
}
}
3.2.3、用newTaskFor钩子方法封装非标准取消
在上个例子中,可以使用newTaskFor钩子函数来改进用来封装非标准取消的方法。这是java 6 中添加到ThreadPoolExecutor的新特性,当提交一个Callable给ExecutorService时,submit返回一个Future,可以用Future来取消任务。newTaskFor钩子是一个工厂方法,创建Future来代表任务,它返回一个RunnableFuture接口,它扩展了Future和Runnable由FutureTask来实现。
自定义的任务Future可以重写canel方法,实现日志或收集取消的统计信息。也可以通过重写Thread.interrupt()实现上面的非标准取消功能。详细参考:SocketUsingTask.java类。
public interface CancellableTask<T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
public class CancellingExecutor extends ThreadPoolExecutor{
protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
("this") private Socket socket;
protected synchronized void setSocket(Socket s) {
socket = s;
}
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
如果SocketUsingTask通过自身Future被取消,执行线程会被中断,这提高了任务对取消的响应性,这样做在保证响应取消的同时,不仅可以安全地调用可中断方法,还可以调用阻塞中的Socket I/O方法。
3.3、停止基于线程的服务
由于java不提供退出线程惯用的方法,所以需要自行编码结束。实践指出,我们不应该操控某个线程--中断它、改变它的优先级等等,除非你拥有这个线程。 线程通过一个Thread对象表示,线程的所有权也是不能被传递的,但线程可以被自由的共享。
一般的应用程序会有三个部分组成:应用程序拥有服务,服务拥有工作线程,但应用程序并不拥有工作线程,因此应用程序如果想控制线程只能通过服务来处理。就像线程池拥有工作者线程一样。服务比如ExecutorService应该提供生命周期方法来关闭它自己并关闭它拥有的线程。
//这是一个多生产者,单消费者设计,日志信息通过BlockingQueue移交给日志线程
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
("this") private boolean isShutdown;
("this") private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations;
}
queue.put(msg);
}
另一种更高级的方法。复杂的程序可能把会ExecutorService封装在一个更高层级的服务中,通过增加链接,把所有权链从应用程序扩展到服务,再到线程,每一个链上的成员管理它所拥有的服务或线程的生命周期。
public class LogService1 {
private final ExecutorService exec =
public void start(){}
public void stop() throws InterruptedException{
exec.shutdown();
exec.awaitTermination(timeout, unit);
writer.close();
}//LogService委托给ExecutorService执行,LogService管理自己的生命周期
public void log(String msg){
try{
exec.execute(new WriteTask(msg));
}catch(Exception e){}
}
}
3.3.1、致命药丸
另一种保证生产--消费服务关闭的方式是使用poison pill:一个可识别的对象,置于队列中,意味着“当你得到它时或得到一定数量时,停止一切工作”。这种方式适合在生产--消费数量已知的情况下使用。不过在生产--消费者数量较大时很难处理,致命药丸只在无限队列中工作时,才是可靠的。
//生产者线程
class CrawlerThread extends Thread {
public void run() {
try {
crawl(root);
} catch (InterruptedException e) { /* fall through */
} finally {
while (true) {
try {
queue.put(POISON);
break;
} catch (InterruptedException e1) { /* retry */
}
}
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
queue.put(entry);
}
}
}
}
3.3.2、只执行一次的服务
如果一个方法需要处理一批任务,并在所有任务结束前不会返回,那么可以通过私有的Executor来简化服务的生命周期管理,其中Executor的寿命限定在该方法中(通常会用到invokeAll和invokeAny方法):向每个主机提交任务,在这之后,当所有检查邮件的任务完成后,会关闭Executor,并等待结束。
public boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
//为了从内部Runnable访问hasNewMail标志,它必须是final类型的,才能避免被更改
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try {
for (final String host : hosts)
exec.execute(new Runnable() {
public void run() {
if (checkMail(host))
hasNewMail.set(true);
}
});
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}
3.3.3、TrackingExecutor任务跟踪
但这个方法会强制中断正在运行的任务,也就是无法区分哪些任务正在执行中,哪些执行完了,必须自己通过设置检查点来区分,如果不处理可能会造成数据的不一致性。但可以通过扩展AbstractExecutorService来区分取消和中止的任务。 如TrackingExecutor可以识别那些已经开始,但没有正常结束的任务。任务必须在返回时保存线程的中断状态。TrackingExecutorService例子说明了为后续执行来保存未完成的任务。
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
//返回被取消(已经开始,但没有正常结束)的任务清单
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}
3.4、处理反常的线程终止
导致线程dead的主要原因是RuntimeException,因为这种异常错误是不可修复的。下面的例子阐述了如何在线程池内部构建一个工作者线程,如果任务抛出了一个未检查的异常,它将允许线程终结,但是会首先通知框架,线程已经终结。然后,框架可能会用新的线程取代这个工作线程,也可能不这么做,因为线程池也许正在关闭,抑或当前已有足够多的线程,能够满足需要了。ThreadPoolExecutor和Swing使用这项技术来确保那些不能正常运转的任务不会影响到后续任务的执行。
3.4.1、需查异常的处理
典型线程池的工作者线程的构建,实际的例子还没找到如何来用。
public void run(){
Throwable thrown = null;
try{
while(!isInterrupted())
runTask(getTaskFromWorkQueue());
}catch(Throwable e){
thrown = e;
}finally{
threadExited(this, thrown);
}
}
3.4.2、不需查异常的处理
上面讲到了不需查异常的处理,线程的API同样提供了UncaughtExceptionHandler工具,便于监测到线程因不需查异常引起的dead。这两个方案互为补充,可以有效防止线程的泄漏问题。
当一个线程因为不需查异常退出时,JVM会把这个事件报告给应用程序自定义的Handler。如果handler不存在,默认会用System.err打印信息。至于hander如何处理取决于应用程序对服务质量的要求了,一般会直接写入日志。
public class UEHLogger implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
Logger logger = Logger.getAnonymousLogger();
logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
}
}
为了给线程设置UncaughtExceptionHandler需要向ThreadPoolExecutor的构造函数中提供一个ThreadFactory(只有线程的所有者能够改变其UncaughtExceptionHandler)。只有通过execute提交的任务,才能将它抛出的异常送给Handler,而通过submit提交的任务,抛出的任何异常,都会认为是任务返回状态的一部分。如果一个由submit提交的任务以异常作为终结,这个异常会被Future.get重抛出。包装在ExecutionException中。在一个长时间运行的应用程序中,所有的线程都要给未捕获异常设置一个处理器,这个处理器至少要将异常信息记入日志中。
3.5、JVM关闭
JVM即可以正常关闭,也可以通过System.exit或是Crtl-C来强制关闭。但JVM也可以通过Runtime.halt或者“杀死”JVM的操作系统进程被强行关闭。`
3.5.1、关闭钩子
在正常的关闭中,JVM首先启动所有已注册的shutdown hook,shutodwn hook是使用Runtime.addShutdownHook(new Thread())注册的尚未开始的线程,JVM不能保证shutodwn hook的开始顺序。当所有shutodwn hook结束的时候,如果runFinalizerOnExit为true,可以选择运行finalizer,之后停止。JVM不会尝试停止或中断任何关闭时仍在运行中的应用程序线程,它们在JVM最终终止时被强制退出。如果shutdown hook或finalizer没有完成,那么正常的关闭进程会“挂起”并且JVM必须强制关闭,这时JVM只会运行强制关闭程序,其它的线程根本不管,包括shutodwn hook。
shutodwn hook应该是线程安全的。shutodwn hook可以用于服务或应用程序的清理,比如del临时文件。并且shutodwn hook全部是并发执行的。关闭日志文件可能引起其他需要使用日志服务的shutodwn hook麻烦,所以shutodwn hook不应该依赖于可能被应用程序或其他shutodwn hook关闭的服务,所有的服务应使用唯一的shutodwn hook。确保关闭的动作在单线程上顺序发生。也就是说一般的应用程序只会注册一人shutdown Hook。
注册shutodwn hook来停止日志服务
public void start(){
Runtime.getRuntime().addShutdownHook(new Thread(){
public void run(){
LogService.this.stop();
}
});
}
3.5.2、精灵线程
有时需要创建一个线程,执行一些辅助工作,但这不希望这个线程的存在阻碍JVM的关闭,这里就需要用到daemon thread。线程分为:普通线程和精灵线程。JVM启动的时候创建所有的线程,除了主线程外,其他的都是精灵线程(比如GC)。他们的区别仅仅在于退出时会发生什么,当一个普通线程退出时,JVM会检查一个运行中线程的详细清单,如果仅有daemon thread时,它会发起正常的退出。当JVM停止时,所有仍然存在的daemon thread都会被抛弃--不会执行finally块,也不会释放栈--JVM直接退出。daemon thread是好用于“家务管理”的任务,比如一个背景线程可以从内存的缓存中周期性的移除过期的访问。应用程序中,daemon thread不能替代对服务的生命周期恰当、良好的管理。
3.5.3、Finalizer
有些不再需要的资源,GC会自动回收,但有些比如Socket句柄,不需要时,必须显式是归还给操作系统。为了在这方面提供帮助,GC对那些具有特殊finalize方法的对象进行特殊对待,在GC获得它们后,finalize被调用,这样就能保证持久化的资源可以被释放。正确的书写finalizer很困难,所以应该避免使用finalizer,或参考笔者第一章的内容了解finalizer的应用场景。
四、资源共享
本节只简单描述下多线路程下资源共享的几种方式。在多线程学习时一定要区分共享和协同这两个关键概念。共享是针对数据的,协同/互斥是指的是线程间的运行。
4.1、关键字
其实有三种变量能达到共享的效果,volatile、final、static。后面两种比较简单,注意下内存占用机制即可,这里着重说下volatile是一种同步的弱形式(只保证可见性,并不保证操作的原子性),当声明为volatile类型后,编辑器在运行时会监视这个变量,volatile变量不会缓存在寄存器或缓存在对其他处理器隐藏的地方,所以它总是返回最新的值。访问volatile变量不会加锁,所以不会引起线程的阻塞。写入volatile变量就像退出同步块,读取volatile变量就像进入同步块。相对来说它的用处不是很大。一般用于确保它们所引用的对象状态的可见性,或者用于标识重要生命周期事件的发生。一般只有满足下列所有条件时才会使用:
- 写入变量时并不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值;
- 变量不需要与其他的状态变量共同参与不变约束;
- 访问变量时,没有其他的原因需要加锁;
与 sychronized相比, 更轻量级的同步锁在访问 volatile 变量时不会执行加锁操作,因此也就不会使执行线程阻塞,因此 volatile 变量是一 种比 sychronized 关键字更轻量级的同步机制。volatile 适合这种场景:一个变量被多个线程共 享,线程直接给这个变量赋值。当对非 volatile 变量进行读写的时候,每个线程先从内存拷贝变量到 CPU 缓存中。如果计算机有 多个 CPU,每个线程可能在不同的 CPU 上被处理,这意味着每个线程可以拷贝到不同的 CPU
4.2、Atomic变量
除了各种API外,还提供了一系列atomic变量(AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference 等),这里只以 AtomicInteger为例说明,其它的类似区别在与运算对象类型的不同,AtomicInteger一个提供原子操作的 Integer 的类,令人兴奋地,还可以通过 AtomicReference<V>将一个对象的所有操作转化成原子操作。
我们知道,在多线程程序中,诸如++i 或 i++等运算不具有原子性,是不安全的线程操作之一。通常我们会使用 synchronized 将该操作变成一个原子操作,但 JVM 为此类操作特意提供了一些同步类,使得使用更方便,且使程序运行效率变得更高。通过相关资料显示,通常 AtomicInteger的性能是 ReentantLock 的好几倍。
4.3、ThreadLocal
ThreadLocal,很多地方叫做线程本地变量,也有些地方叫做线程本地存储,ThreadLocal 的作用是提供线程内的局部变量,这种变量在线程的生命周期内起作用,减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度。
维护线程限制的更规范的方式就是使用ThreadLocal(空间换时间),每个Thread的对象都有一个ThreadLocalMap,当创建一个ThreadLocal的时候,就会将该ThreadLocal对象添加到该Map中,为每个使用它的线程维护一份单独的拷贝,所以get总是返回由当前执行线程通过set设置的最新值。
ThreadLocal变量通常用于防止在基于可变的单体或全局变量的设计中出现不正确的共享。比如:JDBC没有规范Connection一定是线程安全的,所以需要额外的协调,通过ThreadLocal把Connection存储起来。
private ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>(){
public Connection initialValue()
{
try{
return DriverManager.getConnection(DB_URL);
} catch (SQLException e){
throw new RuntimeException("Unable Connection,");
}
};
public Connection getConnection(){
return connectionHolder.get();
}
public static void main(String []args){
new ConnectionDispenser().getConnection();
}
ThreadLocal存储(拷贝)了与Connection相关的值,线程终止后,这些值会被回收 。如果把一个单线程应用迁移到多线程环境中,可以考虑将共享的全局变量都转化为ThreadLocal<T>类型。另一个简单的应用就是J2EE程序中,J2EE容器把一个事务上下文与一个可执行线程关联起来。它利用静态ThreadLocal持有事务上下文;当框架代码需要知道正在运行的哪个事务时,只要从ThreadLocal中获得即可,带来了方便但增加了代码的耦合。
ThreadLocal还可用于一个频繁执行的操作即需要像buffer这样的临时对象或操作代价异常昂贵,同时还需要避免每次都重分配该临时对象。如果为临时缓存这种简单的事物而使用并没有优势。
注意点:大量使用线程本地变量会降低重用性,引入隐晦的类间的耦合。
end,至此线程相关的基础知识基本介绍完了,这些知识是能否掌握多线程编程的关键,当然了笔者所描述的内容也只是一个入门级的教程,还需要多练习。