05-JUC并发编程与源码分析

线程基础知识

什么是JUC

java.util.concurrent在并发编程中使用的工具包

  • java.util.concurrent

  • java.util.concurrent.atomic

  • java.util.concurrent.locks

为什么要学好多线程

  • 提高程序性能,高并发系统

  • 提高程序吞吐量,异步+回调等生成需求

start线程解读

public class ThreadBaseDemo {
    public static void main(String[] args) {
        Thread t1 = new Thread(()->{

        },"t1");
        t1.start();
    }
}

查看start方法:

public synchronized void start() {
    /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
        }
    }
}
// 调用第三方模块实现的
private native void start0();

native调用了本地方法,我们可以通过下载官网OpenJDK查看其源码

  • thread.c

java线程是通过start的方法启动执行的,主要内容在native方法start0中Openjdk的写JNI一般是一一对应的,Thread.java对应的就是Thread.c。start0其实就是JVM_StartThread。此时查看源代码可以看到在jvm.h中找到了声明,jvm.cpp中有实现。

  • jvm.cpp

在这里插入图片描述
  • thread.cpp:终于在这里调用了操作系统的线程启动os::start_thread(thread);

在这里插入图片描述

Java多线程相关概念

这里的锁指的是synchronized,后面会学习到。

并发与并行

并发

  • 是在同一实体上的多个事件

  • 是在同一台处理器上“同时”处理多个任务

  • 同一时刻,其实是只有一个事件在发生

并行

  • 是在不同实体上的多个事件

  • 是在多台处理器上同时处理多个任务

  • 同一时刻,大家都真的在做事情,你做你的,我做我的(需要多核)

在这里插入图片描述

进程

系统中运行的一个应用程序就是一个进程,每一个进程都有它自己的内存空间和系统资源。

线程

也被称为轻量级进程,在同一个进程内基本会有一个或多个线程,是大多数操作系统进行调度的基本单元。

管程

  • Monitor(监视器),也就是我们平时说的锁。

  • Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。

  • JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象,

  • Monitor对象会和Java对象一同创建并销毁,它底层是由C++语言来实现的。

用户线程和守护线程

Java线程分为用户线程和守护线程

  • 用户线程:是系统的工作线程,它会完成这个程序需要完成的业务操作

  • 守护线程:是一种特殊的线程,为其他线程服务的,在后台默默地完成一些系统性的服务,比如垃圾回收线程。

  • 线程的daemon属性为

    • true表示是守护线程

    • false表示是用户线程。

public class DaemonDemo
{
    public static void main(String[] args)
    {
        Thread t1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"\t 开始运行,"+(Thread.currentThread().isDaemon() ? "守护线程":"用户线程"));
            while (true) {

            }
        }, "t1");
        //线程的daemon属性为true表示是守护线程,false表示是用户线程
        //---------------------------------------------
        t1.setDaemon(true); 
        //-----------------------------------------------
        t1.start();
        //3秒钟后主线程再运行
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

        System.out.println("----------main线程运行完毕");
    }
}
  • 守护线程作为一个服务线程,没有服务对象就没有必要继续运行了,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可退出了。假如当系统只剩下守护线程的时候,java虚拟机会自动退出。

  • setDaemon(true)方法必须在start()之前设置,否则报IIIegalThreadStateException异常。

CompletableFuture

Future接口

什么是Future?

Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。(异步:可以被叫停,可以被取消)

一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

Future能干什么

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。老师在上课,但是口渴,于是让班长这个线程去买水,自己可以继续上课,实现了异步任务。

目的:异步多线程任务执行且有返回结果,三个特点:多线程/有返回/异步任务(班长作为老师去买水作为新启动的异步多线程任务且买到水有结果返回)

FutureTask实现类

  • FutureTask(实现了x接口,x接口又继承了a和v接口)

    • 在源码可以看到,他既继承了RunnableFuture接口,也在构造方法中实现了Callable接口(有返回值、可抛出异常)和Runnable接口

image-20220609211814836
image-20220609211843251
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> task1 = new FutureTask<>(new MyThread2());
        FutureTask task2 = new FutureTask(new MyThread(),null);

        Thread t1 = new Thread(task1,"t1");
        Thread t2 = new Thread(task2,"t1");
        t1.start();

        // 使用get获取返回值
        String value = task1.get();
        System.out.println(value);
    }
}

class MyThread implements Runnable{

    @Override
    public void run() {
    }
}

class MyThread2 implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("-----come in call() ----异步执行");
        return "hello Callable 返回值";
    }
}

Future优点

  • future+线程池异步多线程任务配合,能显著提高程序的执行效率。

  • 方案一,3个任务1个main线程处理,大概1541ms

public class FutureThreadPoolDemo {
    public static void main(String[] args) {
        // 3个任务1个main线程处理,大概1541ms

        long startTime = System.currentTimeMillis();

        // 暂停毫秒
        try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}
        try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}
        try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}

        long endTime = System.currentTimeMillis();

        System.out.println("costTime:"+(endTime - startTime) + "ms");

        System.out.println(Thread.currentThread().getName());
    }
}
  • 方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概41毫秒

public class FutureThreadPoolDemo {
    public static void main(String[] args) {

        long startTime = System.currentTimeMillis();

        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        // 3个任务3个线程,利用线程池
        FutureTask<String> task1 = new FutureTask<>(()->{
            try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}
            return "task1 over";
        });

        threadPool.submit(task1);

        FutureTask<String> task2 = new FutureTask<>(()->{
            try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}
            return "task2 over";
        });

        threadPool.submit(task2);

        FutureTask<String> task3 = new FutureTask<>(()->{
            try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}
            return "task3 over";
        });

        threadPool.submit(task3);

        threadPool.shutdown();

        long endTime = System.currentTimeMillis();

        System.out.println("costTime:"+(endTime - startTime) + "ms");

        System.out.println(Thread.currentThread().getName());
    }
}

Future缺点

get()阻塞

一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> task = new FutureTask<>(()->{
            System.out.println(Thread.currentThread().getName() + "\n");
            try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}
            return "task";
        });

        Thread thread = new Thread(task);
        thread.start();

        String value = task.get();
        // String value = task.get(3,TimeUnit.SECONDS); // 设置超过等待时间
        System.out.println(value);
    }
}

isDone()轮询

利用if(futureTask.isDone())的方式使得FutureTask在结束之后才get(),但是也会消耗cpu。

public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> task = new FutureTask<>(()->{
            System.out.println(Thread.currentThread().getName() + "\n");
            try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}
            return "task";
        });

        Thread thread = new Thread(task);
        thread.start();

//        String value = task.get();
//        System.out.println(value);

        while (true){
            if(task.isDone()){
                System.out.println(task.get());
                break;
            }else{
                // 暂停毫秒
                try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}
                System.out.println("正在处理中");
            }
        }
    }
}

future应用现状

  • 对于简单的业务场景使用Future完全OK

    • 回调通知

      • 前面的isDone()方法耗费cpu资源,一般应该还是利用回调函数,在Future结束时自动调用该回调函数。应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知

    • 创建异步任务

      • Future+线程池配合

    • 多个任务前后依赖可以组合处理(水煮鱼)

      • 想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值,将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果

      • 比如买鱼-加料-烹饪

    • 对计算速度选最快完成的(并返回结果)

      • 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果。

CompletableFuture

CompletableFuture基本介绍

**阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。**因此,JDK8设计出CompletableFuture

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    ...
}
image-20220610122149352
  • 在Java 8中, CompletableFuture提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合CompletableFuture的方法。

  • 它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。

  • 它实现了FutureCompletionStage接口

CompletionStage基本介绍

  • CompletionStage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.thenApply(x->square(x)).then Accept(x->System.out.print(x)).thenRun()->System.out.println() ),一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

四个核心静态API方法

  • 利用核心的四个静态方法创建一个异步操作 | 不建议用new

  • 关键就是 |有没有返回值|是否用了线程池|

  • 参数说明:

    • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。

    • 如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码

runAsync无返回值

runAsync

public static CompletableFuture<Void> runAsync(Runnable runnable)

实例:

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            // 暂停几秒钟线程
            try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
        });

        System.out.println(completableFuture.get());
    }
}

runAsync+线程池

没有指定线程池,会使用默认线程池。

public static CompletableFuture<Void> runAsync(Runnable runnable,
                                               Executor executor)

实例:

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            // 暂停几秒钟线程
            try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
        },threadPool);

        System.out.println(completableFuture.get());
        threadPool.shutdown();
    }
}

supplyAsync有返回值

supplyAsync

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

实例:

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "completableFuture";
        });

        System.out.println(completableFuture.get());
    }
}

supplyAsync+线程池

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

实例:

public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            // 暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "completableFuture";
        },threadPool);

        System.out.println(completableFuture.get());
        threadPool.shutdown();
    }
}

通用异步编程

基本功能

CompletableFuture可以完成Future的功能

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            int result = ThreadLocalRandom.current().nextInt(10);

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("1秒钟后出结果:" + result);
            return result;
        });

        System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");

        System.out.println(completableFuture.get());
    }
}

减少阻塞和轮询whenComplete

CompletableFuture通过whenComplete减少阻塞和轮询(自动回调)。

注意不要让主线程死亡,不然用户线程也会跟随死亡。

public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            int result = ThreadLocalRandom.current().nextInt(10);

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("1秒钟后出结果:" + result);
            return result;
        }).whenComplete((v,e)->{
            if(e == null){
                System.out.println("计算完成:"+v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println("异常情况");
            return null;
        });

        //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

CompletableFuture优点总结

  • 异步任务结束时,会自动回调某个对象的方法;

  • 主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行

  • 异步任务出错时,会自动回调某个对象的方法。

Completable案例精讲-电商网站的比价需求

面试题

image-20220610143342290

编程必备技能准备

函数式接口

函数式接口定义

任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口。对于函数式接口,我们可以通过lambda表达式来创建该接口的对象。

public interface Runnable{
  public abstract void run();
}

常见函数式接口

Runnable

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Function-功能性函数式接口

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}

Consumer-消费性函数式接口

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}

Supplier-供给性函数式接口

@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

Biconsumer-消费性函数式接口

(Bi代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)