public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
// 本地方法,底层c++,java无法直接操作硬件
private native void start0();
并发与并行
并发编程:并发、并行
并发(多线程操作同一个资源):CPU 一核 ,模拟出来多条线程,天下武功,唯快不破,快速交替
并行(多个人一起行走):CPU 多核 ,多个线程可以同时执行; 线程池
public class Test01 {
public static void main(String[] args) {
// 获取CPU核数
// CPU 密集型,IO密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
// newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 约定于21亿,容易造成电脑崩溃
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
本质上是调用了ThreadPoolExecutor,而里面的参数就是所谓的7大参数。
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
int maximumPoolSize,// 最大核心线程池大小
long keepAliveTime, // 超时了没有人调用就会释放
TimeUnit unit, // 超时单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, //线程工厂,创建线程的,一般不用动
RejectedExecutionHandler handler) { //拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
手动创建一个线程池
public class Demo2 {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.AbortPolicy()// 相等于 银行客户满了,但是还有客户进来,不处理这个进来的客户,抛出异常
// new ThreadPoolExecutor.CallerRunsPolicy()//哪里来的去哪里,就是公司让你去银行办卡,但是银行满了,让你会公司内部帮你弄
// new ThreadPoolExecutor.DiscardPolicy()//队列满了,但是不抛出异常,不处理多余的任务
new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,尝试去和最早的线程去竞争资源,如果成功了,就执行,不成功就抛弃,该方法也不会抛出异常
);
try {
// 最大承载:队列 + 最大线程数
for(int i=1;i<=9;i++){
// 使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + " ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}
四种解决策略
new ThreadPoolExecutor.AbortPolicy()// 相等于 银行客户满了,但是还有客户进来,不处理这个进来的客户,抛出异常
new ThreadPoolExecutor.CallerRunsPolicy()//哪里来的去哪里,就是公司让你去银行办卡,但是银行满了,让你会公司内部帮你弄
new ThreadPoolExecutor.DiscardPolicy()//队列满了,但是不抛出异常,不处理多余的任务
new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,尝试去和最早的线程去竞争资源,如果成功了,就执行,不成功就抛弃,该方法也不会抛出异常
小结
最大线程到底如何定义?
CPU密集型,几核就是几,可以保持CPU的效率最高
// 获取的核数
System.out.println(Runtime.getRuntime().availableProcessors());
ExecutorService threadPool = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
// FunctionalInterface有很多的实现,用来简化编程模型,在新版本框架底层中大量应用。
四大函数式接口:Consumer、Function、Predicate、Supplier
Function函数接口
/*
Function:函数型接口
*/
public class demo1 {
public static void main(String[] args) {
// 工具类:输出输入的值
// Function<String, String> function = new Function<>() {
// @Override
// public String apply(String str) {
// return str;
// }
// };
Function<String,String> function = (str)->{return str;};
System.out.println(function.apply("hello"));
}
}
Predicate:断定型接口,有一个输入参数,返回值只能是布尔值
// 断定型接口:有一个参数吗,返回值只能是布尔值
public class demo2 {
public static void main(String[] args) {
// 判断字符串是否为空
// Predicate<String> predicate = new Predicate<String>() {
// @Override
// public boolean test(String str) {
// return str.isEmpty();
// }
// };
Predicate<String> predicate = str -> {return str.isEmpty();};
System.out.println(predicate.test(""));
}
}
Consumer消费型接口
public class demo3 {
public static void main(String[] args) {
// Consumer<String> consumer = new Consumer<String>() {
// @Override
// public void accept(String str) {
// System.out.println(str);
// }
// };
// Consumer<String> consumer = (str)->{System.out.println(str);};
// 还能缩写为如下,需要打印的
Consumer<String> consumer = System.out::println;
consumer.accept("hello");
}
}
Supplier供给型接口
public class demo4 {
public static void main(String[] args) {
// Supplier<String> supplier = new Supplier<String>() {
// @Override
// public String get() {
// return null;
// }
// };
Supplier<String> supplier = ()->{return "1024";};
System.out.println(supplier.get());
}
}
Stream流式计算
什么是Stream流式计算
大数据分为存储+计算。
集合、MySQL的本质是用来存储东西的。
计算都应该交给流来实现。
创建User类
public class User {
private int id;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
测试
/** *
* 题目要求:一分钟内完成此题,只能用一行代码实现!
* 现在有5个用户!筛选:
* 1、ID 必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字母
* 4、用户名字母倒着排序
* 5、只输出一个用户!
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1,"a",21);
User u2 = new User(2,"b",22);
User u3 = new User(3,"c",23);
User u4 = new User(4,"d",24);
User u5 = new User(5,"e",25);
User u6 = new User(6,"f",26);
// 集合是用来存储的
List<User> userList = Arrays.asList(u1,u2,u3,u4,u5,u6);
// 计算交给流来计算
// lambda表达式、链式编程、函数式接口、Stream流式计算
userList.stream()
.filter(u->{return u.getId()%2 == 0;})
.filter(u->{return u.getAge() > 23;})
.peek(u-> u.setName(u.getName().toUpperCase()))
.sorted((a,b)->{return b.getName().compareTo(a.getName());})
.limit(1)
.forEach(System.out::println);
}
}
ForkJoin
什么是ForkJoin
ForkJoin在JDK1.7出现的,并行执行任务!提高效率,适合大数据量。
ForkJoin是将一个大任务拆分为多个子任务进行操作。就是分而治之思想。
ForkJoin特点:工作窃取
这个里面维护的都是双端队列。
public class ForkJoinDemo extends RecursiveTask<Long> {
private long start;
private long end;
// 临界值
private long temp = 10000L;
public ForkJoinDemo(Long start,Long end){
this.start = start;
this.end = end;
}
public void test(){
if((end - start) > temp){
}
}
@Override
protected Long compute() {
if((end - start) > temp){
long sum = 0L;
for(long i=start;i<=end;i++){
sum += i;
}
return sum;
}else{ // 超过10_0000_0000通过forkjoin实现
long middle = start+(end -start) / 2;
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork();//拆分任务,把任务压入到线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end);
task2.fork();
return task1.join() + task2.join();
}
}
public static void forkTest() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
// forkJoinPool.execute(task); //执行任务,没有结果
ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务,可以拿到结果
long end = System.currentTimeMillis();
long sum = submit.get(); // 会阻塞等待
System.out.println("sum="+sum+";时间:"+(end-start));
}
public static void streamTest(){
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L,10_0000_0000l).parallel().reduce(0,Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum="+sum+";时间:"+(end-start));
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
forkTest();
streamTest();
}
}
public class Holder {
private Holder(){
}
// 但是不安全,
public static Holder getInstance(){
return InnerClass.HOLDER;
}
// 静态内部类
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
//可以通过传入fair值,变成公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁
可重入锁(递归锁)
Sychronized
public class Demo1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{phone.sms();},"A").start();
new Thread(()->{phone.call();},"B").start();
}
}
class Phone{
public synchronized void sms(){
System.out.println(Thread.currentThread().getName()+"=>sms");
call();
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"=>call");
}
}
Lock
public class Demo2 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(()->{phone.sms();},"A").start();
new Thread(()->{phone.call();},"B").start();
}
}
class Phone2 {
Lock lock = new ReentrantLock();
public void sms() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=>sms");
call();
}catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void call() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=>call");
}catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
自旋锁
spinlock
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
自定义锁测试
public class SpinLockDemo {
private AtomicReference<Thread> reference = new AtomicReference<>();
//加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==> mylock" );
while (!reference.compareAndSet(null,thread)){
}
}
//解锁
public void myUnlock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==> myUnlock" );
while (!reference.compareAndSet(thread,null)){
}
}
public static void main(String[] args) {
// ReentrantLock lock = new ReentrantLock();
// lock.lock();
// lock.unlock();
// 底层使用自旋锁
SpinLockDemo lock2 = new SpinLockDemo();
new Thread(()->{
lock2.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}finally {
lock2.myUnlock();
}
},"T1").start();
new Thread(()->{
lock2.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}finally {
lock2.myUnlock();
}
},"T2").start();
}
}
死锁
什么是死锁
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB), "T1").start();
new Thread(new MyThread(lockB, lockA), "T2").start();
}
}
class MyThread implements Runnable {
private final String lockA;
private final String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "lock:" + lockA + "=>get" + lockB);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "lock:" + lockB + "=>get" + lockA);
}
}
}
}