Thread和Runnable
Java中的线程
Java中的线程创建有两种方式:
- 扩展Thread类
- 实现Runnable接口
扩展Thread类的方式创建新线程:
public class RubinThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("RubinThread线程:" + i);
}
}
}
public class Main {
public static void main(String[] args) {
RubinThread thread = new RubinThread();
thread.start();
}
}
实现Runnable接口的方式创建线程:
public class MyRunnable implements Runnable {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + " 运行了");
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
Java中的线程特征和状态
- 所有的Java程序,不论并发与否,都有一个名为主线程的
Thread
对象。执行该程序时,Java虚拟机( JVM )将创建一个新Thread
并在该线程中执行main()
方法。这是非并发应用程序中唯一的线程,也是并发应用程序中的第一个线程 - Java中的线程共享应用程序中的所有资源,包括内存和打开的文件,快速而简单地共享信息。但是必须使用同步避免数据竞争
- java中的所有线程都有一个优先级,这个整数值介于
Thread.MIN_PRIORITY(1)
和Thread.MAX_PRIORITY(10)
之间,默认优先级是Thread.NORM_PRIORITY(5)
。线程的执行顺序并没有保证,通常,较高优先级的线程将在较低优先级的钱程之前执行
在Java 中,可以创建两种线程:
- 守护线程
- 非守护线程
区别在于它们如何影响程序的结束。java程序结束执行过程的情形:
- 程序执行
Runtime
类的exit()
方法, 而且用户有权执行该方法 - 应用程序的所有非守护线程均已结束执行,无论是否有正在运行的守护线程
守护线程通常用在作为垃圾收集器或缓存管理器的应用程序中,执行辅助任务。在线程start
之前调用isDaemon()
方法检查线程是否为守护线程,也可以使用setDaemon()
方法将某个线程确立为守护线程。
Thread.States
类中定义线程的状态如下:
- NEW:Thread对象已经创建,但是还没有开始执行
- RUNNABLE:Thread对象正在Java虚拟机中运行
- BLOCKED : Thread对象正在等待锁定
- WAITING:Thread 对象正在等待另一个线程的动作
- TIME_WAITING:Thread对象正在等待另一个线程的操作,但是有时间限制
- TERMINATED:Thread对象已经完成了执行
Thread类和Runnable接口
Runnable
接口只定义了一种方法:run()
方法。这是每个线程的主方法。当执行start()
方法启动新线程时,它将调用run()
方法。
Thread
类其他常用方法:
getId()
:该方法返回Thread
对象的标识符。该标识符是在钱程创建时分配的一个正整数。在线程的整个生命周期中是唯一且无法改变的getName()/setName()
:这两种方法允许你获取或设置Thread
对象的名称。这个名称是一个String
对象,也可以在Thread
类的构造函数中建立getPriority()/setPriority()
:你可以使用这两种方法来获取或设置Thread
对象的优先级isDaemon()/setDaemon()
:这两种方法允许你获取或建立Thread
对象的守护条件getState()
:该方法返回Thread
对象的状态interrupt()
:中断目标线程,给目标线程发送一个中断信号,线程被打上中断标记interrupted()
:判断目标线程是否被中断,但是将清除线程的中断标记isinterrupted()
:判断目标线程是否被中断,不会清除中断标记sleep(long ms)
:该方法将线程的执行暂停ms时间join()
:暂停线程的执行,直到调用该方法的线程执行结束为止。可以使用该方法等待另一个Thread
对象结束setUncaughtExceptionHandler()
:当线程执行出现未校验异常时,该方法用于建立未校验异常的控制器currentThread()
:Thread
类的静态方法,返回实际执行该代码的Thread
对象
join
示例程序:
package com.rubin.concurrent.join;
public class RubinThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("RubinThread线程:" + i);
}
}
}
package com.rubin.concurrent.join;
public class ThreadJoinDemo {
public static void main(String[] args) throws InterruptedException {
RubinThread rubinThread = new RubinThread();
rubinThread.start();
rubinThread.join();
System.out.println("main线程 - 执行完成");
}
}
Callable
Callable
接口是一个与Runnable
接口非常相似的接口。Callable
接口的主要特征如下:
- 接口。有简单类型参数,与
call()
方法的返回类型相对应 - 声明了
call()
方法。执行器运行任务时,该方法会被执行器执行。它必须返回声明中指定类型的对象 call()
方法可以抛出任何一种校验异常。可以实现自己的执行器并重载afterExecute()
方法来处理这些异常
使用示例:
package com.rubin.concurrent.callable;
import java.util.concurrent.Callable;
public class RubinStringCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return Thread.currentThread().getName() + " --- hello world call() invoked!";
}
}
package com.rubin.concurrent.callable;
import java.util.concurrent.*;
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
runAsyncThread();
runAsyncThreadPool();
}
private static void runAsyncThread() throws ExecutionException, InterruptedException {
RubinStringCallable rubinStringCallable = new RubinStringCallable();
FutureTask<String> futureTask = new FutureTask<>(rubinStringCallable);
new Thread(futureTask).start();
final String result = futureTask.get();
System.out.println(result);
}
private static void runAsyncThreadPool() throws ExecutionException, InterruptedException {
RubinStringCallable rubinStringCallable = new RubinStringCallable();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
};
final Future<String> stringFuture = threadPoolExecutor.submit(rubinStringCallable);
final String result = stringFuture.get();
System.out.println(result);
threadPoolExecutor.shutdown();
}
}
synchronized关键字
锁的对象
synchronized
关键字“给某个对象加锁”,示例代码:
public Class MyClass {
public void synchronized method1() {
// ...
}
public static void synchronized method2() {
// ...
}
}
等价于:
public class MyClass {
public void method1() {
synchronized(this) {
// ...
}
}
public static void method2() {
synchronized(MyClass.class) {
// ...
}
}
}
实例方法的锁加在对象myClass
上。
静态方法的锁加在MyClass.class
上。
锁的本质
如果一份资源需要多个线程同时访问,需要给该资源加锁。加锁之后,可以保证同一时间只能有一个线程访问该资源。资源可以是一个变量、一个对象或一个文件等。
锁是一个“对象”,作用如下:
- 这个对象内部得有一个标志位(
state
变量),记录自己有没有被某个线程占用。最简单的情况是这个state
有0、1两个取值,0表示没有线程占用这个锁,1表示有某个线程占用了这个锁 - 如果这个对象被某个线程占用,记录这个线程的
thread ID
- 这个对象维护一个
thread id list
,记录其他所有阻塞的、等待获取拿这个锁的线程。在当前线程释放锁之后从这个thread id list
里面取一个线程唤醒
要访问的共享资源本身也是一个对象,例如前面的对象myClass,这两个对象可以合成一个对象。代码就变成synchronized(this) {…}
,要访问的共享资源是对象a,锁加在对象a上。当然,也可以另外新建一个对象,代码变成synchronized(obj1) {…}
。这个时候,访问的共享资源是对象a,而锁加在新建的对象obj1上。
资源和锁合二为一,使得在Java里面,synchronized
关键字可以加在任何对象的成员上面。这意味着,这个对象既是共享资源,同时也具备“锁”的功能!
实现原理
锁如何实现?
在对象头里,有一块数据叫Mark Word
。在64位机器上,Mark Word
是8字节(64位)的,这64位中有2个重要字段:锁标志位和占用该锁的thread ID
。因为不同版本的JVM实现,对象头的数据结构会有各种差异。
wait与notify
生产者−消费者模型
生产者-消费者模型是一个常见的多线程编程模型,如下图所示:
一个内存队列,多个生产者线程往内存队列中放数据,多个消费者线程从内存队列中取数据。要实现这样一个编程模型,需要做下面几件事情:
- 内存队列本身要加锁,才能实现线程安全
- 阻塞。当内存队列满了,生产者放不进去时,会被阻塞;当内存队列是空的时候,消费者无事可做,会被阻塞
- 双向通知。消费者被阻塞之后,生产者放入新数据,要
notify()
消费者;反之,生产者被阻塞之后,消费者消费了数据,要notify()
生产者
第1件事情必须要做,第2件和第3件事情不一定要做。例如,可以采取一个简单的办法,生产者放不进去之后,睡眠几百毫秒再重试,消费者取不到数据之后,睡眠几百毫秒再重试。但这个办法效率低下,也不实时。所以,我们只讨论如何阻塞、如何通知的问题。
多个生产者多个消费者的情形:
阻塞队列:
package com.rubin.concurrent.synchronizedqueue;
/**
* 同步队列 支持多个生产者消费者并发访问
* 线程安全类
*/
public class RubinSyncStringQueue2 {
private String[] queue = new String[10];
private volatile int size = 0;
private int readIndex = 0;
private int putIndex = 0;
public synchronized void put(String element) {
while (size == queue.length) {
try {
// 任何线程调用notify()时 会执行所有wait状态的线程后面的代码 我们再次调用该方法 让唤醒的一个线程去再次判断条件 因为唤醒的可能不是消费者线程
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
put0(element);
// 通知所有线程执行wait后面的逻辑
notify();
}
private void put0(String element) {
if (putIndex == queue.length) {
putIndex = 0;
}
queue[putIndex] = element;
++putIndex;
++size;
}
public synchronized String get() {
while (size == 0) {
try {
// 如果抢到锁的消费者线程发现队列空了 则释放锁 进入等待状态
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 抢到锁的消费者线程发现队列有数据可消费 则消费数据
String result = get0();
// 通知所有线程执行wait后面的逻辑
notify();
return result;
}
private String get0() {
if (readIndex == queue.length) {
readIndex = 0;
}
String result = queue[readIndex];
++readIndex;
--size;
return result;
}
}
生产者:
package com.rubin.concurrent.synchronizedqueue;
public class Producer2 extends Thread {
private long index = 1L;
private RubinSyncStringQueue2 queue;
public Producer2(RubinSyncStringQueue2 queue) {
this.queue = queue;
}
@Override
public void run() {
super.run();
while (true) {
String element = getName() + "----生产数据" + index;
queue.put(element);
System.out.println(element);
index++;
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者:
package com.rubin.concurrent.synchronizedqueue;
public class Consumer2 extends Thread {
private RubinSyncStringQueue2 queue;
public Consumer2(RubinSyncStringQueue2 queue) {
this.queue = queue;
}
@Override
public void run() {
super.run();
while (true) {
String element = getName() + "----消费数据:" + queue.get();
System.out.println(element);
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行示例:
package com.rubin.concurrent.synchronizedqueue;
public class Main2 {
public static void main(String[] args) {
RubinSyncStringQueue2 queue = new RubinSyncStringQueue2();
for (int i = 0; i < 3; i++) {
String threadName = "Producer-" + i;
Producer2 producer = new Producer2(queue);
producer.setName(threadName);
producer.start();
}
for (int j = 0; j < 3; j++) {
String threadName = "Consumer-" + j;
Consumer2 consumer = new Consumer2(queue);
consumer.setName(threadName);
consumer.start();
}
}
}
为什么必须和synchronized一起使用
在Java里面,wait()
和notify()
是Object
的成员函数,是基础中的基础。为什么Java要把wait()
和notify()
放在如此基础的类里面,而不是作为像Thread
一类的成员函数,或者其他类的成员函数呢?
先看为什么wait()
和notify()
必须和synchronized
一起使用?请看下面的代码:
class MyClass1 {
private Object obj1 = new Object();
public void method1() {
synchronized(obj1) {
//...
obj1.wait();
//...
}
}
public void method2() {
synchronized(obj1) {
//...
obj1.notify();
//...
}
}
}
或者下面的代码:
public class MyClass1 {
public void synchronized method1() {
//...
this.wait();
//...
}
public void synchronized method2() {
//...
this.notify();
//...
}
}
然后,开两个线程,线程A调用method1()
,线程B调用method2()
。答案已经很明显:两个线程之间要通信,对于同一个对象来说,一个线程调用该对象的wait()
,另一个线程调用该对象的notify()
,该对象本身就需要同步!所以,在调用wait()
、notify()
之前,要先通过synchronized
关键字同步给对象,也就是给该对象加锁。
synchronized
关键字可以加在任何对象的实例方法上面,任何对象都可能成为锁。因此,wait()
和notify()
只能放在Object
里面了。
为什么wait()的时候必须释放锁
当线程A进入synchronized(obj1)
中之后,也就是对obj1上了锁。此时,调用wait()
进入阻塞状态,一直不能退出synchronized
代码块;那么,线程B永远无法进入synchronized(obj1)
同步块里,永远没有机会调用notify()
,发生死锁。
这就涉及一个关键的问题:在wait()
的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()
唤醒,重新获取锁!其次,wait()
调用完成后,执行后面的业务逻辑代码,然后退出synchronized
同步块,再次释放锁,如此则可以避免死锁。
wait()与notify()的问题
生产者在通知消费者的同时,也通知了其他的生产者;消费者在通知生产者的同时,也通知了其他消费者。原因在于wait()
和notify()
所作用的对象和synchronized
所作用的对象是同一个,只能有一个对象,无法区分队列空和列队满两个条件。这正是Condition
要解决的问题。
InterruptedException与interrupt()方法
Interrupted异常
什么情况下会抛出Interrupted异常呢?
假设while
循环中没有调用任何的阻塞函数,就是通常的算术运算,或者打印一行日志,如下所示:
public class MyThread extends Thread {
@Override
public void run() {
while (true) {
boolean interrupted = isInterrupted();
System.out.println("中断标记:" + interrupted);
}
}
}
这个时候,在主线程中调用一句thread.interrupt()
,请问该线程是否会抛出异常?不会。
只有那些声明了会抛出InterruptedException
的函数才会抛出异常,也就是下面这些常用的函数:
public static native void sleep(long millis) throws InterruptedException
{...}
public final void wait() throws InterruptedException {...}
public final void join() throws InterruptedException {...}
轻量级阻塞与重量级阻塞
能够被中断的阻塞称为轻量级阻塞,对应的线程状态是WAITING
或者TIMED_WAITING
.而像synchronized
这种不能被中断的阻塞称为重量级阻塞,对应的状态是BLOCKED
。如图所示:调用不同的方法后,一个线程的状态迁移过程。
初始线程处于NEW
状态,调用start()
开始执行后,进入RUNNING
或者READY
状态。如果没有调用任何的阻塞函数,线程只会在RUNNING
和READY
之间切换,也就是系统的时间片调度。这两种状态的切换是操作系统完成的,除非手动调用yield()
函数,放弃对CPU的占用。
一旦调用了图中的任何阻塞函数,线程就会进入WAITING
或者TIMED_WAITING
状态,两者的区别只是前者为无限期阻塞,后者则传入了一个时间参数,阻塞一个有限的时间。如果使用了synchronized
关键字或者synchronized
块,则会进入BLOCKED
状态。
不太常见的阻塞/唤醒函数,LockSupport.park()/unpark()
。这对函数非常关键,Concurrent
包中Lock
的实现即依赖这一对操作原语。
因此thread.interrupt()
的精确含义是“唤醒轻量级阻塞”,而不是字面意思“中断一个线程”。
thread.isInterrupted()与Thread.interrupted()的区别
因为thread.interrupt()
相当于给线程发送了一个唤醒的信号,所以如果线程此时恰好处于WAITING
或者TIMED_WAITING
状态,就会抛出一个InterruptedException
,并且线程被唤醒。而如果线程此时并没有被阻塞,则线程什么都不会做。但在后续,线程可以判断自己是否收到过其他线程发来的中断信号,然后做一些对应的处理。
这两个方法都是线程用来判断自己是否收到过中断信号的,前者是实例方法,后者是静态方法。二者的区别在于,前者只是读取中断状态,不修改状态;后者不仅读取中断状态,还会重置中断标志位。
线程的优雅关闭
stop与destory函数
线程是“一段运行中的代码”,一个运行中的方法。运行到一半的线程能否强制杀死?
不能。在Java中,有stop()
、destory()
等方法,但这些方法官方明确不建议使用。原因很简单,如果强制杀死线程,则线程中所使用的资源,例如文件描述符、网络连接等无法正常关闭。
因此,一个线程一旦运行起来,不要强行关闭,合理的做法是让其运行完(也就是方法执行完毕),干净地释放掉所有资源,然后退出。如果是一个不断循环运行的线程,就需要用到线程间的通信机制,让主线程通知其退出。
守护线程
daemon线程和非daemon线程的对比:
public class Main {
public static void main(String[] args) {
MyDaemonThread myDaemonThread = new MyDaemonThread();
// 设置为daemon线程
myDaemonThread.setDaemon(true);
myDaemonThread.start();
// 启动非daemon线程,当非daemon线程结束,不管daemon线程是否结束,都结束JVM进
程。
new MyThread().start();
}
}
public class MyDaemonThread extends Thread {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("非Daemon线程");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
对于上面的程序,在thread.start()
前面加一行代码thread.setDaemon(true)
。当main(…)
函数退出后,线程thread
就会退出,整个进程也会退出。
当在一个JVM进程里面开多个线程时,这些线程被分成两类:守护线程和非守护线程。默认都是非守护线程。
在Java中有一个规定:当所有的非守护线程退出后,整个JVM进程就会退出。意思就是守护线程“不算作数”,守护线程不影响整个JVM进程的退出。
例如,垃圾回收线程就是守护线程,它们在后台默默工作,当开发者的所有前台线程(非守护线程)都退出之后,整个JVM进程就退出了。
设置关闭的标志位
开发中一般通过设置标志位的方式,停止循环运行的线程。
例如:
public class MyThread extends Thread{
private boolean running = true;
@Override
public void run() {
while (running) {
System.out.println("线程正在运行。。。");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stopRunning() {
this.running = false;
}
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
Thread.sleep(5000);
myThread.stopRunning();
myThread.join();
}
}
但上面的代码有一个问题:如果MyThread t
在while
循环中阻塞在某个地方,例如里面调用了object.wait()
函数,那它可能永远没有机会再执行 while( ! stopped)
代码,也就一直无法退出循环。
此时,就要用到InterruptedException
与interrupt()
函数。
以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~
文章评论