Rubin's Blog

  • 首页
  • 关于作者
  • 隐私政策
享受恬静与美好~~~
分享生活的点点滴滴~~~
  1. 首页
  2. 并发编程
  3. 正文

java并发编程之多线程

2021年 12月 29日 694点热度 0人点赞 0条评论

Thread和Runnable

Java中的线程

Java中的线程创建有两种方式:

  • 扩展Thread类
  • 实现Runnable接口

扩展Thread类的方式创建新线程:

public class RubinThread extends Thread {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("RubinThread线程:" + i);
        }
    }

}
public class Main {
  public static void main(String[] args) {
    RubinThread thread = new RubinThread();
    thread.start();
 }
}

实现Runnable接口的方式创建线程:

public class MyRunnable implements Runnable {
  @Override
  public void run() {
    while (true) {
      System.out.println(Thread.currentThread().getName() + " 运行了");
      try {
        Thread.sleep(800);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
     }
   }
}
public class Main {
  public static void main(String[] args) {
    Thread thread = new Thread(new MyRunnable());
    thread.start();
   }
}

Java中的线程特征和状态

  • 所有的Java程序,不论并发与否,都有一个名为主线程的Thread对象。执行该程序时,Java虚拟机( JVM )将创建一个新Thread并在该线程中执行main()方法。这是非并发应用程序中唯一的线程,也是并发应用程序中的第一个线程
  • Java中的线程共享应用程序中的所有资源,包括内存和打开的文件,快速而简单地共享信息。但是必须使用同步避免数据竞争
  • java中的所有线程都有一个优先级,这个整数值介于Thread.MIN_PRIORITY(1)和Thread.MAX_PRIORITY(10)之间,默认优先级是Thread.NORM_PRIORITY(5)。线程的执行顺序并没有保证,通常,较高优先级的线程将在较低优先级的钱程之前执行

在Java 中,可以创建两种线程:

  • 守护线程
  • 非守护线程

区别在于它们如何影响程序的结束。java程序结束执行过程的情形:

  • 程序执行Runtime类的exit()方法, 而且用户有权执行该方法
  • 应用程序的所有非守护线程均已结束执行,无论是否有正在运行的守护线程

守护线程通常用在作为垃圾收集器或缓存管理器的应用程序中,执行辅助任务。在线程start之前调用isDaemon()方法检查线程是否为守护线程,也可以使用setDaemon()方法将某个线程确立为守护线程。

Thread.States类中定义线程的状态如下:

  • NEW:Thread对象已经创建,但是还没有开始执行
  • RUNNABLE:Thread对象正在Java虚拟机中运行
  • BLOCKED : Thread对象正在等待锁定
  • WAITING:Thread 对象正在等待另一个线程的动作
  • TIME_WAITING:Thread对象正在等待另一个线程的操作,但是有时间限制
  • TERMINATED:Thread对象已经完成了执行

Thread类和Runnable接口

Runnable接口只定义了一种方法:run()方法。这是每个线程的主方法。当执行start()方法启动新线程时,它将调用run()方法。

Thread类其他常用方法:

  • getId():该方法返回Thread对象的标识符。该标识符是在钱程创建时分配的一个正整数。在线程的整个生命周期中是唯一且无法改变的
  • getName()/setName():这两种方法允许你获取或设置Thread对象的名称。这个名称是一个String对象,也可以在Thread类的构造函数中建立
  • getPriority()/setPriority():你可以使用这两种方法来获取或设置Thread对象的优先级
  • isDaemon()/setDaemon():这两种方法允许你获取或建立Thread对象的守护条件
  • getState():该方法返回Thread对象的状态
  • interrupt():中断目标线程,给目标线程发送一个中断信号,线程被打上中断标记
  • interrupted():判断目标线程是否被中断,但是将清除线程的中断标记
  • isinterrupted():判断目标线程是否被中断,不会清除中断标记
  • sleep(long ms):该方法将线程的执行暂停ms时间
  • join():暂停线程的执行,直到调用该方法的线程执行结束为止。可以使用该方法等待另一个Thread对象结束
  • setUncaughtExceptionHandler():当线程执行出现未校验异常时,该方法用于建立未校验异常的控制器
  • currentThread():Thread类的静态方法,返回实际执行该代码的Thread对象

join示例程序:

package com.rubin.concurrent.join;

public class RubinThread extends Thread {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("RubinThread线程:" + i);
        }
    }

}
package com.rubin.concurrent.join;

public class ThreadJoinDemo {

    public static void main(String[] args) throws InterruptedException {
        RubinThread rubinThread = new RubinThread();
        rubinThread.start();
        rubinThread.join();
        System.out.println("main线程 - 执行完成");
    }

}

Callable

Callable接口是一个与Runnable接口非常相似的接口。Callable接口的主要特征如下:

  • 接口。有简单类型参数,与call()方法的返回类型相对应
  • 声明了call()方法。执行器运行任务时,该方法会被执行器执行。它必须返回声明中指定类型的对象
  • call()方法可以抛出任何一种校验异常。可以实现自己的执行器并重载afterExecute()方法来处理这些异常

使用示例:

package com.rubin.concurrent.callable;

import java.util.concurrent.Callable;

public class RubinStringCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(5000);
        return Thread.currentThread().getName() + " --- hello world call() invoked!";
    }
}
package com.rubin.concurrent.callable;

import java.util.concurrent.*;

public class CallableDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        runAsyncThread();
        runAsyncThreadPool();
    }

    private static void runAsyncThread() throws ExecutionException, InterruptedException {
        RubinStringCallable rubinStringCallable = new RubinStringCallable();
        FutureTask<String> futureTask = new FutureTask<>(rubinStringCallable);
        new Thread(futureTask).start();
        final String result = futureTask.get();
        System.out.println(result);
    }

    private static void runAsyncThreadPool() throws ExecutionException, InterruptedException {
        RubinStringCallable rubinStringCallable = new RubinStringCallable();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) {
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
            }
        };
        final Future<String> stringFuture = threadPoolExecutor.submit(rubinStringCallable);
        final String result = stringFuture.get();
        System.out.println(result);
        threadPoolExecutor.shutdown();
    }

}

synchronized关键字

锁的对象

synchronized关键字“给某个对象加锁”,示例代码:

public Class MyClass {
  public void synchronized method1() {
    // ...
   }
 
  public static void synchronized method2() {
    // ...
   }
}

等价于:

public class MyClass {
  public void method1() {
    synchronized(this) {
      // ...
     }
   }
 
  public static void method2() {
    synchronized(MyClass.class) {
      // ...
     }
   }
}

实例方法的锁加在对象myClass上。

静态方法的锁加在MyClass.class上。

锁的本质

如果一份资源需要多个线程同时访问,需要给该资源加锁。加锁之后,可以保证同一时间只能有一个线程访问该资源。资源可以是一个变量、一个对象或一个文件等。

锁是一个“对象”,作用如下:

  1. 这个对象内部得有一个标志位(state变量),记录自己有没有被某个线程占用。最简单的情况是这个state有0、1两个取值,0表示没有线程占用这个锁,1表示有某个线程占用了这个锁
  2. 如果这个对象被某个线程占用,记录这个线程的thread ID
  3. 这个对象维护一个thread id list,记录其他所有阻塞的、等待获取拿这个锁的线程。在当前线程释放锁之后从这个thread id list里面取一个线程唤醒

要访问的共享资源本身也是一个对象,例如前面的对象myClass,这两个对象可以合成一个对象。代码就变成synchronized(this) {…},要访问的共享资源是对象a,锁加在对象a上。当然,也可以另外新建一个对象,代码变成synchronized(obj1) {…}。这个时候,访问的共享资源是对象a,而锁加在新建的对象obj1上。

资源和锁合二为一,使得在Java里面,synchronized关键字可以加在任何对象的成员上面。这意味着,这个对象既是共享资源,同时也具备“锁”的功能!

实现原理

锁如何实现?

在对象头里,有一块数据叫Mark Word。在64位机器上,Mark Word是8字节(64位)的,这64位中有2个重要字段:锁标志位和占用该锁的thread ID。因为不同版本的JVM实现,对象头的数据结构会有各种差异。

wait与notify

生产者−消费者模型

生产者-消费者模型是一个常见的多线程编程模型,如下图所示:

一个内存队列,多个生产者线程往内存队列中放数据,多个消费者线程从内存队列中取数据。要实现这样一个编程模型,需要做下面几件事情:

  • 内存队列本身要加锁,才能实现线程安全
  • 阻塞。当内存队列满了,生产者放不进去时,会被阻塞;当内存队列是空的时候,消费者无事可做,会被阻塞
  • 双向通知。消费者被阻塞之后,生产者放入新数据,要notify()消费者;反之,生产者被阻塞之后,消费者消费了数据,要notify()生产者

第1件事情必须要做,第2件和第3件事情不一定要做。例如,可以采取一个简单的办法,生产者放不进去之后,睡眠几百毫秒再重试,消费者取不到数据之后,睡眠几百毫秒再重试。但这个办法效率低下,也不实时。所以,我们只讨论如何阻塞、如何通知的问题。

多个生产者多个消费者的情形:

阻塞队列:

package com.rubin.concurrent.synchronizedqueue;

/**
 * 同步队列 支持多个生产者消费者并发访问
 * 线程安全类
 */
public class RubinSyncStringQueue2 {

    private String[] queue = new String[10];
    private volatile int size = 0;
    private int readIndex = 0;
    private int putIndex = 0;

    public synchronized void put(String element) {
        while (size == queue.length) {
            try {
                // 任何线程调用notify()时 会执行所有wait状态的线程后面的代码  我们再次调用该方法 让唤醒的一个线程去再次判断条件 因为唤醒的可能不是消费者线程
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        put0(element);
        // 通知所有线程执行wait后面的逻辑
        notify();
    }

    private void put0(String element) {
        if (putIndex == queue.length) {
            putIndex = 0;
        }
        queue[putIndex] = element;
        ++putIndex;
        ++size;
    }

    public synchronized String get() {
        while (size == 0) {
            try {
                // 如果抢到锁的消费者线程发现队列空了 则释放锁 进入等待状态
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 抢到锁的消费者线程发现队列有数据可消费 则消费数据
        String result = get0();
        // 通知所有线程执行wait后面的逻辑
        notify();
        return result;
    }

    private String get0() {
        if (readIndex == queue.length) {
            readIndex = 0;
        }
        String result = queue[readIndex];
        ++readIndex;
        --size;
        return result;
    }

}

生产者:

package com.rubin.concurrent.synchronizedqueue;

public class Producer2 extends Thread {

    private long index = 1L;

    private RubinSyncStringQueue2 queue;

    public Producer2(RubinSyncStringQueue2 queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        super.run();
        while (true) {
            String element = getName() + "----生产数据" + index;
            queue.put(element);
            System.out.println(element);
            index++;
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

消费者:

package com.rubin.concurrent.synchronizedqueue;

public class Consumer2 extends Thread {

    private RubinSyncStringQueue2 queue;

    public Consumer2(RubinSyncStringQueue2 queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        super.run();
        while (true) {
            String element = getName() + "----消费数据:" + queue.get();
            System.out.println(element);
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

运行示例:

package com.rubin.concurrent.synchronizedqueue;

public class Main2 {

    public static void main(String[] args) {

        RubinSyncStringQueue2 queue = new RubinSyncStringQueue2();

        for (int i = 0; i < 3; i++) {
            String threadName = "Producer-" + i;
            Producer2 producer = new Producer2(queue);
            producer.setName(threadName);
            producer.start();
        }

        for (int j = 0; j < 3; j++) {
            String threadName = "Consumer-" + j;
            Consumer2 consumer = new Consumer2(queue);
            consumer.setName(threadName);
            consumer.start();
        }

    }

}

为什么必须和synchronized一起使用

在Java里面,wait()和notify()是Object的成员函数,是基础中的基础。为什么Java要把wait()和notify()放在如此基础的类里面,而不是作为像Thread一类的成员函数,或者其他类的成员函数呢?

先看为什么wait()和notify()必须和synchronized一起使用?请看下面的代码:

class MyClass1 {
  private Object obj1 = new Object();
 
  public void method1() {
    synchronized(obj1) {
      //...
      obj1.wait();
      //...
     }
   }
 
  public void method2() {
    synchronized(obj1) {
      //...
      obj1.notify();
      //...
     }
   }
}

或者下面的代码:

public class MyClass1 {
  public void synchronized method1() {
    //...
    this.wait();
    //...
   }
 
  public void synchronized method2() {
    //...
    this.notify();
    //...
   }
}

然后,开两个线程,线程A调用method1(),线程B调用method2()。答案已经很明显:两个线程之间要通信,对于同一个对象来说,一个线程调用该对象的wait(),另一个线程调用该对象的notify(),该对象本身就需要同步!所以,在调用wait()、notify()之前,要先通过synchronized关键字同步给对象,也就是给该对象加锁。

synchronized关键字可以加在任何对象的实例方法上面,任何对象都可能成为锁。因此,wait()和notify()只能放在Object里面了。

为什么wait()的时候必须释放锁

当线程A进入synchronized(obj1)中之后,也就是对obj1上了锁。此时,调用wait()进入阻塞状态,一直不能退出synchronized代码块;那么,线程B永远无法进入synchronized(obj1)同步块里,永远没有机会调用notify(),发生死锁。

这就涉及一个关键的问题:在wait()的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒,重新获取锁!其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchronized同步块,再次释放锁,如此则可以避免死锁。

wait()与notify()的问题

生产者在通知消费者的同时,也通知了其他的生产者;消费者在通知生产者的同时,也通知了其他消费者。原因在于wait()和notify()所作用的对象和synchronized所作用的对象是同一个,只能有一个对象,无法区分队列空和列队满两个条件。这正是Condition要解决的问题。

InterruptedException与interrupt()方法

Interrupted异常

什么情况下会抛出Interrupted异常呢?

假设while循环中没有调用任何的阻塞函数,就是通常的算术运算,或者打印一行日志,如下所示:

public class MyThread extends Thread {
  @Override
  public void run() {
    while (true) {
      boolean interrupted = isInterrupted();
      System.out.println("中断标记:" + interrupted);
   }
 }
}

这个时候,在主线程中调用一句thread.interrupt(),请问该线程是否会抛出异常?不会。

只有那些声明了会抛出InterruptedException的函数才会抛出异常,也就是下面这些常用的函数:

public static native void sleep(long millis) throws InterruptedException
{...}
public final void wait() throws InterruptedException {...}
public final void join() throws InterruptedException {...}

轻量级阻塞与重量级阻塞

能够被中断的阻塞称为轻量级阻塞,对应的线程状态是WAITING或者TIMED_WAITING.而像synchronized这种不能被中断的阻塞称为重量级阻塞,对应的状态是BLOCKED。如图所示:调用不同的方法后,一个线程的状态迁移过程。

初始线程处于NEW状态,调用start()开始执行后,进入RUNNING或者READY状态。如果没有调用任何的阻塞函数,线程只会在RUNNING和READY之间切换,也就是系统的时间片调度。这两种状态的切换是操作系统完成的,除非手动调用yield()函数,放弃对CPU的占用。

一旦调用了图中的任何阻塞函数,线程就会进入WAITING或者TIMED_WAITING状态,两者的区别只是前者为无限期阻塞,后者则传入了一个时间参数,阻塞一个有限的时间。如果使用了synchronized关键字或者synchronized块,则会进入BLOCKED状态。

不太常见的阻塞/唤醒函数,LockSupport.park()/unpark()。这对函数非常关键,Concurrent包中Lock的实现即依赖这一对操作原语。

因此thread.interrupt()的精确含义是“唤醒轻量级阻塞”,而不是字面意思“中断一个线程”。

thread.isInterrupted()与Thread.interrupted()的区别

因为thread.interrupt()相当于给线程发送了一个唤醒的信号,所以如果线程此时恰好处于WAITING或者TIMED_WAITING状态,就会抛出一个InterruptedException,并且线程被唤醒。而如果线程此时并没有被阻塞,则线程什么都不会做。但在后续,线程可以判断自己是否收到过其他线程发来的中断信号,然后做一些对应的处理。

这两个方法都是线程用来判断自己是否收到过中断信号的,前者是实例方法,后者是静态方法。二者的区别在于,前者只是读取中断状态,不修改状态;后者不仅读取中断状态,还会重置中断标志位。

线程的优雅关闭

stop与destory函数

线程是“一段运行中的代码”,一个运行中的方法。运行到一半的线程能否强制杀死?

不能。在Java中,有stop()、destory()等方法,但这些方法官方明确不建议使用。原因很简单,如果强制杀死线程,则线程中所使用的资源,例如文件描述符、网络连接等无法正常关闭。

因此,一个线程一旦运行起来,不要强行关闭,合理的做法是让其运行完(也就是方法执行完毕),干净地释放掉所有资源,然后退出。如果是一个不断循环运行的线程,就需要用到线程间的通信机制,让主线程通知其退出。

守护线程

daemon线程和非daemon线程的对比:

public class Main {
  public static void main(String[] args) {
    MyDaemonThread myDaemonThread = new MyDaemonThread();
    // 设置为daemon线程
    myDaemonThread.setDaemon(true);
    myDaemonThread.start();
    // 启动非daemon线程,当非daemon线程结束,不管daemon线程是否结束,都结束JVM进
程。
    new MyThread().start();
   }
}
public class MyDaemonThread extends Thread {
  @Override
  public void run() {
    while (true) {
      System.out.println(Thread.currentThread().getName());
      try {
        Thread.sleep(500);
           } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}
public class MyThread extends Thread {
  @Override
  public void run() {
    for (int i = 0; i < 10; i++) {
      System.out.println("非Daemon线程");
      try {
        Thread.sleep(500);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
     }
   }
}

对于上面的程序,在thread.start()前面加一行代码thread.setDaemon(true)。当main(…)函数退出后,线程thread就会退出,整个进程也会退出。

当在一个JVM进程里面开多个线程时,这些线程被分成两类:守护线程和非守护线程。默认都是非守护线程。

在Java中有一个规定:当所有的非守护线程退出后,整个JVM进程就会退出。意思就是守护线程“不算作数”,守护线程不影响整个JVM进程的退出。

例如,垃圾回收线程就是守护线程,它们在后台默默工作,当开发者的所有前台线程(非守护线程)都退出之后,整个JVM进程就退出了。

设置关闭的标志位

开发中一般通过设置标志位的方式,停止循环运行的线程。

例如:

public class MyThread extends Thread{

  private boolean running = true;

  @Override
  public void run() {
    while (running) {
      System.out.println("线程正在运行。。。");
      try {
               Thread.sleep(1000);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
     }
   }

  public void stopRunning() {
    this.running = false;
   }

  public static void main(String[] args) throws InterruptedException {
    MyThread myThread = new MyThread();
    myThread.start();
    Thread.sleep(5000);
    myThread.stopRunning();
    myThread.join();
   }
}

但上面的代码有一个问题:如果MyThread t在while循环中阻塞在某个地方,例如里面调用了object.wait()函数,那它可能永远没有机会再执行 while( ! stopped)代码,也就一直无法退出循环。

此时,就要用到InterruptedException与interrupt()函数。

以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: 并发编程
最后更新:2022年 6月 9日

RubinChu

一个快乐的小逗比~~~

打赏 点赞
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复
文章目录
  • Thread和Runnable
    • Java中的线程
    • Java中的线程特征和状态
    • Thread类和Runnable接口
    • Callable
  • synchronized关键字
    • 锁的对象
    • 锁的本质
    • 实现原理
  • wait与notify
    • 生产者−消费者模型
    • 为什么必须和synchronized一起使用
    • 为什么wait()的时候必须释放锁
    • wait()与notify()的问题
  • InterruptedException与interrupt()方法
    • Interrupted异常
    • 轻量级阻塞与重量级阻塞
    • thread.isInterrupted()与Thread.interrupted()的区别
  • 线程的优雅关闭
    • stop与destory函数
    • 守护线程
    • 设置关闭的标志位
最新 热点 随机
最新 热点 随机
问题记录之Chrome设置屏蔽Https禁止调用Http行为 问题记录之Mac设置软链接 问题记录之JDK8连接MySQL数据库失败 面试系列之自我介绍 面试总结 算法思维
SpringBoot之源码环境搭建 SpringCloud Netflix之Ribbon负载均衡 java面试系列之引用 数据结构之线性表 Neo4j之CQL Kafka之集群与运维

COPYRIGHT © 2021 rubinchu.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

京ICP备19039146号-1