本章内容比较简单,主要是对Thread的实现方式做下初步介绍,稍带着描述下相关方法的原理和使用场景。此节的内容也是开发同学能不能写出高质量线程程序的一个基础,同样也非常重要。
一、基础知识
还是按之前文档描述的习惯,用一张图来概念下本章的内容,如下图所示红框内描述所示:(理论基础请看上一章内容,线程协同主要是讲锁相关的知识的后续会做为第三节补充上):
二、Thread基本方法
线程的基本方法基本就是Object和Thread这两个API提供的,它们控制着线程的生命周期和状态的流转,如下图所示:
2.1、Object基本方法
2.1.1、线程等待(wait)
调用该方法的线程进入 WAITING 状态,只有等待另外线程的通知或被中断才会返回,需要注意的是调用 wait()方法后,会释放对象的锁。因此,wait 方法一般用在同步方法或同步代码块中。
2.1.2、线程让步(yield)
yield 会使当前线程让出 CPU 执行时间片,与其他线程一起重新竞争 CPU 时间片。一般情况下,优先级高的线程有更大的可能性成功竞争得到 CPU 时间片,但这又不是绝对的,有的操作系统对线程优先级并不敏感。
2.1.3、等待其他线程终止(join )
join() 方法,等待其他线程终止,在当前线程中调用一个线程的 join() 方法,则当前线程转为阻塞状态,回到另一个线程结束,当前线程再由阻塞状态变为就绪状态,等待 cpu 的宠幸。
在主线程生成并启动了子线程,需要用到子线程返回的结果,也就是需要主线程需要在子线程结束后再结束,这时候就要用到 join() 方法。
System.out.println(Thread.currentThread().getName() + "线程运行开始!");
Thread6 thread1 = new Thread6();
thread1.setName("线程 B");
thread1.join();
System.out.println("这时 thread1 执行完毕之后才能执行主线程");
2.1.4、线程唤醒(notify)
Object 类中的 notify() 方法,唤醒在此对象监视器上等待的单个线程,如果所有线程都在此对象上等待,则会选择唤醒其中一个线程,选择是任意的,并在对实现做出决定时发生,线程通过调用其中一个 wait() 方法,在对象的监视器上等待,直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程,被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。类似的方法还有 notifyAll() ,唤醒再次监视器上等待的所有线程。
2.2、Thread基本方法
2.2.1、线程睡眠(sleep)
sleep 导致当前线程休眠,与 wait 方法不同的是 sleep 不会释放当前占有的锁,sleep(long)会导致线程进入 TIMED-WATING 状态,而 wait()方法会导致当前线程进入 WATING 状态。
2.2.2、 线程中断(interrupt)
中断一个线程,其本意是给这个线程一个通知信号,会影响这个线程内部的一个中断标识位。这个线程本身并不会因此而改变状态(如阻塞,终止等)。
- 调用 interrupt()方法并不会中断一个正在运行的线程。也就是说处于 Running 状态的线程并不会因为被中断而被终止,仅仅改变了内部维护的中断标识位而已。
- 若调用 sleep()而使线程处于 TIMED-WATING 状态,这时调用 interrupt()方法,会抛出InterruptedException,从而使线程提前结束 TIMED-WATING 状态。
- 许多声明抛出 InterruptedException 的方法(如 Thread.sleep(long mills 方法)),抛出异常前,都会清除中断标识位,所以抛出异常后,调用 isInterrupted()方法将会返回 false。
- 中断状态是线程固有的一个标识位,可以通过此标识位安全的终止线程。比如,你想终止一个线程 thread 的时候,可以调用 thread.interrupt()方法,在线程的 run 方法内部可以根据 thread.isInterrupted()的值来优雅的终止线程。
2.2.3、其他方法
- isAlive(): 判断一个线程是否存活。
- activeCount(): 程序中活跃的线程数。
- enumerate(): 枚举程序中的线程。
- currentThread(): 得到当前线程。
- isDaemon(): 一个线程是否为守护线程。
- setDaemon(): 设置一个线程为守护线程。(用户线程和守护线程的区别在于,是否等待主线程依赖于主线程结束而结束)
- setName(): 为线程设置一个名称。
- setPriority(): 设置一个线程的优先级。
- getPriority()::获得一个线程的优先级。
2.3、sleep与wait区别
对于 sleep()方法,我们首先要知道该方法是属于 Thread 类中的。而 wait()方法,则是属于 Object 类中的。sleep()方法导致了程序暂停执行指定的时间,让出 cpu 该其他线程,但是他的监控状态依然保持者,当指定的时间到了又会自动恢复运行状态。 在调用 sleep()方法的过程中,线程不会释放对象锁。而当调用 wait()方法的时候,线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此对象调用 notify()方法后本线程才进入对象锁定池准备获取对象锁进入运行状态。
三、单线程实现
3.1、继承Thread类
Thread 类本质上是实现了 Runnable 接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过 Thread 类的 start()实例方法。
start()方法是一个 native 方法,它将启动一个新线程,并执行 run()方法。
public class MyThread extends Thread {
public void run() {}
}
System.out.println("MyThread.run()");
MyThread myThread1 = new MyThread();
myThread1.start();
3.2、实现Runnable接口
如果自己的类已经 extends 另一个类,就无法直接 extends Thread,此时,可以实现一个 Runnable 接口。
public class MyThreadRunnable implements Runnable {
public void run() { }
}
//启动 MyThread,需要首先实例化一个 Thread,并传入自己的 MyThread 实例:
MyThread myThread = new MyThread();
Thread thread = new Thread(myThread);
thread.start();
//事实上,当传入一个 Runnable target 参数给 Thread 后,Thread 的 run()方法就会调用
target.run()
3.3、start与run区别
- start()方法来启动线程,真正实现了多线程运行。这时无需等待 run 方法体代码执行完毕,可以直接继续执行下面的代码。
- 通过调用 Thread 类的 start()方法来启动一个线程, 这时此线程是处于就绪状态, 并没有运行;
- 方法 run()称为线程体,它包含了要执行的这个线程的内容,线程就进入了运行状态,开始运行 run 函数当中的代码。 Run 方法运行结束, 此线程终止。然后 CPU 再调度其它线程。
3.4、“每任务每线程”的程序缺点
- 线程生命周期的开销,如果请求是频繁且轻量级的,就会消耗大量的计算资源。
- 资源消耗量,主要是针对内存。如果可运行的线程数多于可用的处理器数,线程将会空闲。大量空闲线程占用更多的内存,给GC带来压力,而且会存在大量线程在竞争CPU资源,还会产生其他的性能开销。如果有足够多的线程保持所有CPU忙碌,那么再创建更多的线程是百害无一利的。
- 稳定性,应该限制可创建线程的数目,这个数目受OS、JVM启动参数、Thread的构造函数中请求的栈大小等因素影响。如果打破了这些规则可能会收到OutOfMemoryError错误。(在32位的机器上,主要的限制因素是线程栈的地址空间,每个线程都维护着两个执行栈,一个用于java代码,另一个用于原生代码,典型的JVM默认会产生一个组合的栈,大小在0.5M大小左右,可以通过-Xss JVM参数或者通过Thread的构造函数修改这个值。如果为每个线程分配了大小232字节的栈,那么你的线程数量将被限制在几千到几万间不等)。
这种实现方式只能当作练习,不能应用到正式的环境中去。
四、线程池实现
4.1、基本原理
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable 和 Future、FutureTask 这几个类。
4.1.1、线程池的原理
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。他的主要特点为:线程复用;控制最大并发数;管理线程。
每一个 Thread 的类都有一个 start 方法。 当调用 start 启动线程时 Java 虚拟机会调用该类的 run方法。 那么该类的 run() 方法中就是调用了 Runnable 对象的 run() 方法。 我们可以继承重写Thread 类,在其 start 方法中添加不断循环调用传递过来的 Runnable 对象。 这就是线程池的实现原理。循环方法中不断获取 Runnable 是用 Queue 实现的,在获取下一个 Runnable 之前可以是阻塞的。
4.2.2、线程池的组成
- 线程池管理器:用于创建并管理线程池
- 工作线程:线程池中的线程
- 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
- 任务队列:用于存放待处理的任务,提供一种缓冲机制
4.3.3、线程池的执行策略
将任务的提交与任务的执行进行解耦,价值在于让你可以简单地为一个类给定的任务制定执行策略,并且保证后续的修改不至于太困难,执行策略指明了:
- 任务在什么线程中执行;
- 任务以什么顺序执行(FIFO、LIFO、优先级);
- 可以有多少个任务并发执行;
- 可以有多少个任务进入等待执行队列;
- 如果系统过载,需要放弃哪个任务并且如何通知Application知道这一切;
- 在一个任务的执行前与后,应该插入什么处理操作。
执行策略是资源管理工具,最佳策略取决于可用的计算资源和你对服务质量的需求,将任务的提交与执行进行分离,有助于在部署阶段选择一个与当前硬件最匹配的执行策略。所以以后的程序中尽量少用或不用new Thread(runnable).start()这种方式而改用Executor委托执行。
4.4.4、线程池工作过程
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
- 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectExecutionException。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
4.2、Executor
线程是使任务异步执行的机制,作为Executor框架的一部分,它是基于生产—消费模式设计,java.util.concurrent提供了一个灵活的线程池实现。Executor很简单但可用于异步任务执行,支持不同类型的任务执行策略,还为任务提交和任务执行之间的解耦提供了标准的方法,还提供了对生命周期的支持以及钩子函数等。在生产—消费模式中,提交任务是执行者是生产者,执行任务的线程是消费者。Executor接口的实现相当于一个模板或抽象模板实现,是把Runnable委托给Executor来执行。所以可以定义自己的Executor实现类。
Executor实现通常只是为执行任务而创建线程,JVM会在所有线程全部终止后才通出,因此无法正确关闭Executor,进而会阻塞JVM的结束。
因为Executor是异步地执行任务,所以有很多不确定因素,为了解决执行服务的生命周期问题,提供了ExecutorService接口,它扩展了Executor接口同时添加了一些用于生命周期(运行、关闭、终止)管理的方法。
public interface ExecutorService extends Executor {
//不在接收新的任务后平缓关闭
void shutdown();
//强制停止正在运行和队列中等待的任务,并返回列表以便序列化等操作,下次启动时恢复
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
//转换ExecutorService的状态,同时调用shutdown()方法
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
}
一旦所有的任务全部完成后ExecutorService会转入终止状态,可以调用awaitTermination等待ExecutorService到达终止状态,也可以轮询isTerminated判断ExecutorService是否已经终止。通常shutdown会紧随awaitTermination之后,这样可以产生同步关闭ExecutorService的效果。
4.2.1、基本线程池实现(Runnable)
ExecutorService threadPool = Executors.newFixedThreadPool(10);
while (true) {
threadPool.execute(new Runnable() { });// 提交多个线程任务,并执行
public void run () { }
System.out.println(Thread.currentThread().getName() + " is running ..");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
4.2.2、带返回值的线程(Callable, Future)
Callable主要用于计算时间长的任务,比如下载、复杂运算、数据库操作。它会在主进入点call--等待返回值,并为可能抛出的异常预告做好了准备,Executors包含一些工具方法,可以把其他类型的任务封装成一个Callable。Callable<void>===Runnable。在Executor框架中,如果任务执行要花费很长时间,任务可以手动取消,但对于已经开始的任务,只有中断后才可以取消,取消一个已完成的任务不会有任何影响。
Future可以描述任务的生命周期,并提供了相关的方法来获得任务的结果、取消任务以及检验任务是否完成。Future和ExecutorService这些线程相关的对象的生命周期是单向的,无法回退。Future的get方法是个阻塞方法, 注意get处理异常的能力,它会把异常重新包装后再抛出。
ExecutorService的所有submit()方法都返回一个Future。可以将一个Runnable或Callable提交给executor,然后得到一个Future。也可以显示的为Runnable或Callable创建一个FutureTask。FutureTask实现了Runnable接口。所以可以直接交给ExecutorService来执行,也可以直接调用run方法。
private final ExecutorService executor = Executors.newCachedThreadPool();
void renderPage(CharSequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task =
new Callable<List<ImageData>>() {
public List<ImageData> call() {
List<ImageData> result = new
ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos)
result.add(imageInfo.downloadImage());
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
// FutureTask future = new FutureTask(task);
// executor.submit(future);//这两行代码和上面一行代码是等价的。
try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData)
renderImage(data); //渲染图片
} catch (InterruptedException e) {
// 重新声明线程的中断状态
Thread.currentThread().interrupt();
// 不需要结果 ,故取消任务
future.cancel(true);//异常处理
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
4.3、ThreadPoolExecutor
4.3.1、线程池类型
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(),
defaultHandler);
}
- corePoolSize:指定了线程池中的线程数量。
- maximumPoolSize:指定了线程池中的最大线程数量。
- keepAliveTime:当前线程池数量超过 corePoolSize 时,多余的空闲线程的存活时间,即多次时间内会
- unit:keepAliveTime 的单位。
- workQueue:任务队列,被提交但尚未被执行的任务。
- threadFactory:线程工厂,用于创建线程,一般用默认的即可。 7. handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。
4.3.1.1、newCachedThreadPool(非定长)
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
4.3.1.2、newFixedThreadPool(定长)
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
4.3.1.3、newScheduledThreadPool(定长周期)
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行支持定时的以及周期性的任务执行,类似于Timer。
4.3.1.4、newSingleThreadExecutor(单线程)
Executors.newSingleThreadExecutor()返回一个线程池(这个线程池只有一个线程),这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!
4.4、拒绝策略
线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也 塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。JDK 内置的拒绝策略如下:
- AbortPolicy : 直接抛出异常,阻止系统正常运行。
- CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的 任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
- DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再 次提交当前任务。
- DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢 失,这是最好的一种方案。
以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。
4.5、阻塞队列
阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:1、当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放 入队列;2、当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有 空的位置,线程被自动唤醒。
4.5.1、ArrayBlockingQueue(公平、非公平)
用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列
4.5.2、LinkedBlockingQueue(两个独立锁提高并发)
基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
4.5.3、PriorityBlockingQueue(compareTo 排序实现优先)
是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
4.5.4、DelayQueue(缓存失效、定时任务 )
是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:
- 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了;
- 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。
4.5.5、SynchronousQueue(不存储数据、可用于传递数据)
是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和ArrayBlockingQueue。
4.5.6、LinkedTransferQueue(无阻塞)
是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
- transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素;
- tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。
4.5.7、LinkedBlockingDeque(双向阻塞)
是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。
End,下一章节笔者打算就线程间的协同机制(锁)进行详细描述,因为锁的内容比较多,敬请期待。