线程
什么是线程
线程,程序执行流的最小执行单位,是行程中的实际运作单位
线程的生命周期
新建 New、就绪 Runnable、运行 Running、阻塞 Blocked、死亡 Dead
新建(new)
1 | public class ThreadTest { |
就绪(Runnable)
当线程对象调用了Thread.start()
方法之后,该线程处于就绪状态,JVM
会为其创建方法调用栈和程序计数器,处于这个状态的线程并没有开始运行,它只是表示该线程可以运行了
1 | class Thread implements Runnable { |
运行(Running)
如果处于就绪状态的线程获得了CPU
资源,就开始执行run()
的线程执行体,则该线程处于运行状态
1 | class Thread implements Runnable { |
阻塞(Blocked)
阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况大概三种:
- 等待阻塞
运行的线程执行wait()
方法,JVM
会把该线程放入等待池中。(wait会释放持有的锁)
- 同步阻塞
运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM
会把该线程放入锁池中
- 其他阻塞
运行的线程执行sleep()
或join()
方法,或者发出了I/O
请求时,JVM
会把该线程置为阻塞状态。当sleep()
状态超时、join()
等待线程终止或者超时、或者I/O
处理完毕时,线程重新转入就绪状态。(注意,sleep()
是不会释放持有的锁)
具体阻塞情况有如下情况:
线程睡眠:Thread.sleep(long millis)
方法,使线程转到阻塞状态。millis
参数设定睡眠的时间,以毫秒为单位。当睡眠结束后,就转为就绪(Runnable)状态。sleep()
平台移植性好。
线程等待:Object类中的wait()
方法,导致当前的线程等待,直到其他线程调用此对象的notify()
方法或 notifyAll()
唤醒方法。这个两个唤醒方法也是Object类中的方法,行为等价于调用wait(0)
一样。唤醒线程后,就转为就绪(Runnable)状态。
线程让步:Thread.yield()
方法,暂停当前正在执行的线程对象,把执行机会让给相同或者更高优先级的线程。
线程加入:join()
方法,等待其他线程终止。在当前线程中调用另一个线程的join()
方法,则当前线程转入阻塞状态,直到另一个进程运行结束,当前线程再由阻塞转为就绪状态。
线程I/O:线程执行某些IO操作,因为等待相关的资源而进入了阻塞状态。比如说监听system.in
,但是尚且没有收到键盘的输入,则进入阻塞状态。
线程唤醒:Object类中的notify()
方法,唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程,选择是任意性的,并在对实现做出决定时发生。类似的方法还有一个notifyAll()
,唤醒在此对象监视器上等待的所有线程
死亡(Dead)
线程会以以下三种方式之一结束,结束后就处于死亡状态:
run()
方法执行完成,线程正常结束- 线程抛出一个未捕获的
Exception
或Error
- 直接调用该线程的
stop()
方法来结束该线程,该方法容易导致死锁,通常不推荐使用
创建线程的三种方式(推荐实现Runnable接口的方式)
- 继承于java.lang.Thread类,覆盖run方法,调用start()方法启动线程
1 | //1):定义一个类A继承于java.lang.Thread类. |
- 实现Runnable接口,覆盖run方法,传入到线程类中,调用start()方法启动线程
1 | //1):定义一个类A实现于java.lang.Runnable接口,注意A类不是线程类. |
- 直接在函数体使用(匿名内部类,其实也是属于第二种实现方式的特例)
1 | void java_thread(){ |
线程的并行与并发
并发的关键是你有处理多个任务的能力,不一定要同时
并行的关键是你有同时处理多个任务的能力
Java虚拟机的多线程是通过线程轮流切换并分配处理器执行时间的方式来实现,一个CPU
可以多线程,但是一个单核CPU
任何时间点,都只能在做一个任务。而程序时间大多花在读取数据(IO)
上,真正的计算工作花时间还是相对少的,因此CPU
很大时间表现都很闲。现实中的CPU
在大部分时候的闲置状态的。因此,开多线程能提高效率不如说成是充分利用了CPU
执行时间。多核CPU
才是真正意义的并行处理
线程池
在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念,线程池可以用来管理线程,减少资源的消耗
java并发编程框架:Executor
Executor
作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable
来表示任务,Executor
的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制
ThreadPoolExecutor构造函数的各个参数说明
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize
核心线程数,如果运行的线程少于
corePoolSize
,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的maximumPoolSize
最大线程数,可允许创建的线程数,
corePoolSize
和maximumPoolSize
设置的边界自动调整池大小:corePoolSize < 运行的线程数 < maximumPoolSize:仅当队列满时才创建新线程 corePoolSize = 运行的线程数 = maximumPoolSize:创建固定大小的线程池
keepAliveTime
如果线程数多于
corePoolSize
,则这些多余的线程的空闲时间超过keepAliveTime
时将被终止unit
keepAliveTime
参数的时间单位TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
workQueue【重点】
一个阻塞队列,用来存储等待执行的任务,它的使用规则如下:
当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列 当运行的线程数等于或多于corePoolSize,在有新任务添加时则选加入队列,不直接创建线程 当队列满时,在有新任务时就创建新线程
最常用的阻塞队列有如下三种,前提核心线程数已经用完:
ArrayBlockingQueue【不推荐】:有界队列,当新任务进来时,如果队列满了并且线程数小于maximumPoolSize,会去创建新线程,如果线程数等于maximumPoolSize,则会通过拒接策略拒接任务 LinkedBlockingQueue【推荐】:无界队列,当新任务进来时,会加到队列中,此时maximumPoolSize是无效的 SynchronousQueue【特定场景】:在某次添加线程后必须等待其他线程取走后才能继续添加,否则会创建新的线程去执行,由此推断在A、B线程有依赖的情况下使用SynchronousQueue
threadFactory
使用
ThreadFactory
创建新线程,默认使用DefaultThreadFactory
创建线程handle
定义处理被拒绝任务的策略,默认使用
ThreadPoolExecutor.AbortPolicy
,任务被拒绝时将抛出RejectExecutorException
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
Executor的类图详解
继承关系:Executor -> ExecutorService -> AbstractExecutorService -> ThreadPoolExecutor
- Executor
Executor
是一个顶层接口,在它里面只声明了一个方法execute(Runnable)
,返回值为void
,即在将来的某个时间执行给定的可运行程序。可运行程序会被线程池操作线程去执行,由线程池控制
- ExecutorService
ExecutorService
接口是继承了Executor
的接口,并声明了一些方法:submit、invokeAll、invokeAny、shutDown
等
- AbstractExecutorService
AbstractExecutorService
抽象类实现了ExecutorService
接口,基本实现了ExecutorService
中声明的所有方法
- ThreadPoolExecutor
ThreadPoolExecutor
继承AbstractExecutorService
抽象类
几个重要的方法
execute()
方法实际上是Executor
中声明的方法,在ThreadPoolExecutor
进行了具体的实现,这个方法是ThreadPoolExecutor
的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行submit()
方法是在ExecutorService
中声明的方法,在AbstractExecutorService
就已经有了具体的实现,在ThreadPoolExecutor
中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()
方法不同,它能够返回任务执行的结果,去看submit()
方法的实现,会发现它实际上还是调用的execute()
方法,只不过它利用了Future
来获取任务执行结果shutdown()
和shutdownNow()
是用来关闭线程池的
ThreadPoolExecutor源码分析
类内部结构
1 | public class ThreadPoolExecutor extends AbstractExecutorService { |
execute()
1 | /** |
shutdown()
1 | /** |
shutdownNow()
1 | /** |
思考与总结
线程池中包含如下参数:
AtomicInteger
声明的ctl
,它包含了两个含义:workerCount
(当前有效线程数)、runState
(运行状态)ReentrantLock
声明的可重入锁mainLock
,主要用在操作工作者集合时HashSet<Worker>
声明的工作者集合workers
,持有mainLock
才可以访问BlockingQueue<Runnable>
声明的等待队列workQueue
(包含还没执行的任务列表)
runState控制线程池的生命周期:
- RUNNING(-1):接受新任务并处理排队的任务
- SHUTDOWN(0):不接受新任务,而是处理排队的任务
- STOP(1):不接受新任务,不处理排队任务,中断进行中的任务
- TIDYING(2):所有任务都已终止,
workerCount=0
,线程转换为状态TIDYING
将运行Terminated()
挂钩方法 - TERMINATED(3):调用
Terminated()
完成
Worker工作者是继承AQS实现的,能够控制内部线程对任务的执行,内部包含:
- thread:工作者内部拥有一个线程
- firstTask:工作者的第一个任务,可以为空
- 工作者内部执行一个任务,需要获得AQS的独占锁,防止内部错乱,执行任务和尝试终止时都需要获得独占锁
- 执行完一个任务后,再去
workQueue
拉取任务,如果空闲的工作者太多,则会根据corePoolSize
和maximumPoolSize
来销毁工作者
往线程池中加入一个任务的逻辑如下:
- 如果线程池中工作者数量小于
corePoolSize
,则增加工作者(附加当前任务) - 如果线程池中工作者数量大于等于
corePoolSize
,则把任务加入到workQueue
等待队列中 - 如果线程池中工作者数量大于等于
corePoolSize
,并且加入workQueue
队列失败,则尝试开新的工作者去执行(这些工作者数量大于corePoolSize
,空闲后会被销毁) - 如果线程池中工作者数量大于等于
corePoolSize
,并且加入workQueue
队列失败,并且开新的工作者也失败,则根据线程池的拒绝策略,拒绝任务
增加工作者,并开始工作的逻辑如下:
- 判断线程池状态是否
SHUTDOWN
- 判断线程池工作线程数是否超过容量
- CAS增加
workCount
- 新建一个工作者,并把任务作为工作者的第一个任务,获取线程池可重入锁,加锁,把工作者加入到工作集合中,解锁
- 启动工作者,开始执行任务(内部线程执行任务)
- 如果工作者启动失败,则执行回滚(把工作者移出工作集合)
源码中多次执行tryTerminate()
方法,尝试终止线程池,逻辑如下:
- 如果线程池处于
RUNNING
或TIDYING
或SHUTDOWN且工作列表不为空
时,放弃转换 - 在可能导致终止的任何操作之后调用此方法,可能是最后一个线程退出所以需要尝试终止线程池
- 如果线程池处于
SHUTDOWN且工作列表为空
,则尝试中断一个工作者(中断时,会先获取线程池锁,再尝试获取每一个工作者的AQS独占锁),此处仅中断一个(因为调用tryTerminate都是因为某个工作者退出) - 如果线程池处于
SHUTDOWN且工作列表为空
,且工作者数量为空,则终止线程池
为什么即需要ReentrantLock
还需要volatile
和AQS
独占锁?
ReentrantLock
主要用在操作工作集合,防止工作集合在多线程环境下出问题volatile
声明的workCount
和runState
,变化用CAS
去做,这块的变化在ReentrantLock
锁之外做,提升效率Worker
中用AQS
独占锁来控制工作者的任务执行和任务中断
线程池工厂类:Executors
Executors:提供了一系列静态工厂方法用于创建各种线程池,实际上也是间接调用了ThreadPoolExocutor,不过是传的不同的构造参数
Executors.newFixedThreadPool:创建可重用且固定线程数的线程池,
corePoolSize=maximumPoolSize
,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程Executors.newCachedThreadPool:创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一条线程
Executors.newScheduledThreadPool:创建一个可延迟执行或定期执行的线程池,内部使用无界延迟阻塞队列
DelayedWorkQueue
Executors.newSingleThreadExecutor:创建一个单线程的线程池,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务,适用于有顺序的任务的应用场景
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
线程的join
join的作用是转并为串行,注意join要在线程start之后执行
- join()
在A线程中调用了B线程的join()方法时,表示只有当B线程执行完毕时,A线程才能继续执行
join的意思是使得放弃当前线程的执行,并返回对应的线程,例如下面代码的意思就是:
程序在main线程中调用t1线程的join方法,则main线程放弃cpu控制权,并返回t1线程继续执行直到线程t1执行完毕,所以结果是t1线程执行完后,才到主线程执行,相当于在main线程中同步t1线程,t1执行完了,main线程才有执行的机会
1 | Thread t1 = new Thread(new ThreadA("线程一")); |
- join(10)
如果A线程中调用B线程的join(10),则表示A线程会等待B线程执行10毫秒,10毫秒过后,A、B线程并行执行
1 | Thread t1 = new Thread(new ThreadA("线程一")); |