Java语言提供了非常优秀的多线程支持,程序可以通过非常简单的方式来启动多线程,接下来就让我们一起来学习Java多线程编程的相关知识吧,包括:创建、启动、控制线程,以及多线程的同步操作,还有通过Java内建支持的线程池来提高多线程的性能。
一、线程概述 每个运行的程序就是一个进程,而当一个程序运行时,内部可能有多个顺序执行流,每个顺序执行流就是一个线程。
1、线程和进程 a、进程 程序进入内存运行时,就变成了一个进程(process),进程是处于运行过程中的程序,并且具有一定的独立性,进程是系统进行资源调度和分配的一个独立单位。一般而言,进程具有如下三个特征:
并发行和并行性是两个不同的概念:
并行:同一时刻,多个指令在多个处理器上同时处理
并发:同一时刻只能有一条指令执行,但是多个进程指令被快速轮换执行,适得其在宏观上具有多个进程同时执行的效果
2、多线程的优势 二、线程的创建和启动 1、继承Thread类创建线程类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class FirstThread extends Thread { private int i; @Override public void run () { super .run(); for (i = 0 ; i < 100 ; i++) { System.out.println(getName() + " " + i); } } public static void main (String[] args) { for (int i = 0 ; i < 100 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i); if (i == 20 ) { new FirstThread().start(); new FirstThread().start(); } } } }
2、实现Runnable接口创建线程类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class SecondThread implements Runnable { private int i; @Override public void run () { for (; i < 100 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i); } } public static void main (String[] args) { for (int i = 0 ; i < 100 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i); if (i == 20 ) { SecondThread st = new SecondThread(); new Thread(st, "进程1" ).start(); new Thread(st, "进程2" ).start(); } } } }
3、使用Callable和Future创建线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class ThirdThread { public static void main (String[] args) { FutureTask<Integer> task = new FutureTask<>( (Callable<Integer>)() -> { int i = 0 ; for (; i < 100 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i); } return i; }); for (int i = 0 ; i <100 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i); if (i == 20 ) { new Thread(task, "有返回值的线程" ).start(); } } try { System.out.println("子线程的返回值" + task.get()); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
4、创建线程的三种方式对比 三、线程的生命周期 1、新建和就绪状态 2、运行和阻塞状态 3、线程死亡 四、控制线程 1、join线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class JoinThread extends Thread { public JoinThread (String name) { super (name); } @SneakyThrows @Override public void run () { super .run(); for (int i = 0 ; i < 100 ; i++) { System.out.println(getName() + " " + i); if (i == 30 ) Thread.sleep((long ) 0.1 ); } } public static void main (String[] args) throws InterruptedException { new JoinThread("新线程" ).start(); for (int i = 0 ; i < 100 ; i++) { if (i == 20 ) { Thread jt = new JoinThread("被join的线程" ); jt.start(); jt.join(); } System.out.println(Thread.currentThread().getName() + " " + i); } } }
2、后台线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class DaemonThread extends Thread { @Override public void run () { for (int i = 0 ; i < 1000 ; i++) { System.out.println(getName() + " " + i); } } public static void main (String[] args) { DaemonThread t = new DaemonThread(); t.setDaemon(true ); t.start(); for (int i = 0 ; i < 10 ; i++) { System.out.println(Thread.currentThread().getName() + " " + i); } System.out.println("结束了" ); } }
3、线程睡眠:sleep 1 2 3 4 5 6 7 8 public class SleepTest { public static void main (String[] args) throws InterruptedException { for (int i = 0 ; i < 10 ; i++) { Thread.sleep(1000 ); System.out.println(Thread.currentThread().getName() + " " + i); } } }
4、改变线程优先级 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class PriorityTest extends Thread { public PriorityTest (String name) { super (name); } @Override public void run () { super .run(); for (int i = 0 ; i< 500 ; i++) { System.out.println(getName() + "的优先级为:" + getPriority() + ",循环变量值为" + i); } } public static void main (String[] args) { Thread.currentThread().setPriority(6 ); for (int i = 0 ; i < 30 ; i++) { if (i == 10 ) { PriorityTest low = new PriorityTest("低级" ); low.start(); System.out.println("创建之初的线程优先级:" + low.getPriority()); low.setPriority(Thread.MIN_PRIORITY); } if (i == 20 ) { PriorityTest high = new PriorityTest("高级" ); high.start(); System.out.println("创建之初的线程优先级:" + high.getPriority()); high.setPriority(Thread.MAX_PRIORITY); } } } }
五、线程同步 1、线程安全问题 a、定义账户类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Account { private String accountNo; private Double balance; public Account (String accountNo, Double balance) { this .accountNo = accountNo; this .balance = balance; } public String getAccountNo () { return accountNo; } public void setAccountNo (String accountNo) { this .accountNo = accountNo; } public Double getBalance () { return balance; } public void setBalance (Double balance) { this .balance = balance; } @Override public boolean equals (Object obj) { if (this == obj) { return true ; } else if (obj != null && obj.getClass() == Account.class) { Account target = (Account) obj; return target.getAccountNo().equals(accountNo); } else { return false ; } } @Override public int hashCode () { return accountNo.hashCode(); } }
b、定义取钱线程类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class DrawThread extends Thread { private Account account; private double drawAmount; public DrawThread (String name, Account account, double drawAmount) { super (name); this .account = account; this .drawAmount = drawAmount; } @Override public void run () { super .run(); if (account.getBalance() >= drawAmount) { System.out.println(getName() + " " + drawAmount); try { Thread.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } account.setBalance(account.getBalance() - drawAmount); System.out.println("余额为:" + account.getBalance()); } else { System.out.println("取钱失败!余额不足!" ); } } }
c、取钱逻辑: 1 2 3 4 5 6 7 8 9 public class DrawTest { public static void main (String[] args) { Account acct = new Account("123" , 1000.0 ); new DrawThread("甲" , acct, 800 ).start(); new DrawThread("乙" , acct, 800 ).start(); } }
2、同步代码块 将上面的 DrawThread
修改成如下形式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class DrawThread extends Thread { private Account account; private double drawAmount; public DrawThread (String name, Account account, double drawAmount) { super (name); this .account = account; this .drawAmount = drawAmount; } @Override public void run () { super .run(); synchronized (account) { if (account.getBalance() >= drawAmount) { System.out.println(getName() + " " + drawAmount); try { Thread.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } account.setBalance(account.getBalance() - drawAmount); System.out.println("余额为:" + account.getBalance()); } else { System.out.println("取钱失败!余额不足!" ); } } } }
3、同步方法 4、释放同步监视器的锁定 5、同步锁(lock) 6、死锁及常用处理策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class A { public synchronized void foo (B b) { System.out.println("当前线程名称:" + Thread.currentThread().getName() + " 进入了A实例的foo()方法" ); try { Thread.sleep(200 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("当前线程名称:" + Thread.currentThread().getName() + " 企图调用B的last()方法" ); b.last(); } public synchronized void last () { System.out.println("进入了A类l的ast()方法内部" ); } } class B { public synchronized void bar (A a) { System.out.println("当前线程名称:" + Thread.currentThread().getName() + " 进入了B实例的bar()方法" ); try { Thread.sleep(200 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("当前线程名称:" + Thread.currentThread().getName() + " 企图调用A的last()方法" ); a.last(); } public synchronized void last () { System.out.println("进入了B类的last()方法内部" ); } } public class DeadLock implements Runnable { A a = new A(); B b = new B(); public void init () { Thread.currentThread().setName("主线程" ); a.foo(b); System.out.println("进入了主线程之后" ); } @Override public void run () { Thread.currentThread().setName("副线程" ); b.bar(a); System.out.println("进入了副线程之后" ); } public static void main (String[] args) { DeadLock d1 = new DeadLock(); new Thread(d1).start(); d1.init(); } }
六、线程通信 1、传统的线程通信 a、定义账户类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Data public class Account { private String accountNo; private double balance; private boolean flag = false ; public Account (String accountNo, double balance) { this .accountNo = accountNo; this .balance = balance; } public double getBalance () { return this .balance; } public synchronized void draw (double drawAmount) { try { if (!flag) { wait(); } else { System.out.println(Thread.currentThread().getName() + "取钱:" + drawAmount); balance -= drawAmount; System.out.println("账户余额为:" + balance); flag = false ; notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } public synchronized void deposit (double depositAmount) { try { if (flag) { wait(); } else { System.out.println(Thread.currentThread().getName() + "存钱:" + depositAmount); balance += depositAmount; System.out.println("账户余额为:" + balance); flag = true ; notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
b、定义取钱线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class DrawThread extends Thread { private Account account; private double drawAmount; public DrawThread (String name, Account account, double drawAmount) { super (name); this .account = account; this .drawAmount = drawAmount; } @Override public void run () { for (int i = 0 ; i < 100 ; i++) { account.draw(drawAmount); } } }
c、定义存钱类: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class DepositThread extends Thread { private Account account; private double drawAmount; public DepositThread (String name, Account account, double drawAmount) { super (name); this .account = account; this .drawAmount = drawAmount; } @Override public void run () { for (int i = 0 ; i < 100 ; i++) { account.deposit(drawAmount); } } }
2、使用Condition控制线程通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public class Account { private final Lock lock = new ReentrantLock(); private final Condition cond = lock.newCondition(); private String accountNo; private double balance; private boolean flag = false ; public Account () { } public Account (String accountNo, double balance) { this .accountNo = accountNo; this .balance = balance; } public String getAccountNo () { return accountNo; } public void setAccountNo (String accountNo) { this .accountNo = accountNo; } public double getBalance () { return balance; } public void draw (double drawAmount) { lock.lock(); try { if (!flag) { cond.await(); } else { System.out.println(Thread.currentThread().getName() + "取钱:" + drawAmount); balance -= drawAmount; System.out.println("账户余额为:" + balance); flag = false ; cond.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void deposit (double depositAmount) { lock.lock(); try { if (flag) { cond.await(); } else { System.out.println(Thread.currentThread().getName() + "存钱:" + depositAmount); balance += depositAmount; System.out.println("账户余额为:" + balance); flag = true ; cond.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } @Override public boolean equals (Object o) { if (this == o) return true ; if (o == null || getClass() != o.getClass()) return false ; Account account = (Account) o; return Double.compare(account.balance, balance) == 0 && flag == account.flag && Objects.equals(lock, account.lock) && Objects.equals(cond, account.cond) && Objects.equals(accountNo, account.accountNo); } @Override public int hashCode () { return Objects.hash(lock, cond, accountNo, balance, flag); } }
3、使用阻塞队列(Blocking Queue)控制线程通信 阻塞队列简单使用:
1 2 3 4 5 6 7 8 9 10 11 public class BlockingQueueTest { public static void main (String[] args) throws InterruptedException { BlockingQueue<String> bq = new ArrayBlockingQueue<>(2 ); bq.put("Java" ); bq.put("Java" ); bq.put("Java" ); } }
使用 BlockingQueue(阻塞队列)
实现线程通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class Producer extends Thread { private BlockingQueue<String> bq; public Producer (BlockingQueue<String> bq) { this .bq = bq; } @Override public void run () { var strArr = new String[] {"Java" ,"Structs" ,"Spring" }; for (var i = 0 ; i < 9999999999L ; i++) { System.out.println(getName() + "生产者准备生产集合元素" ); try { Thread.sleep(200 ); bq.put(strArr[i % 3 ]); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName() + "生产完成:" + bq); } } } class Consumer extends Thread { private BlockingQueue<String> bq; public Consumer (BlockingQueue<String> bq) { this .bq = bq; } @Override public void run () { while (true ) { System.out.println(getName() + "消费者准备消费集合元素!" ); try { Thread.sleep(200 ); bq.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName() + "消费完成:" + bq); } } } public class BlockingQueueTest2 { public static void main (String[] args) { BlockingQueue<String> bq = new ArrayBlockingQueue<>(1 ); new Producer(bq).start(); new Producer(bq).start(); new Producer(bq).start(); new Consumer(bq).start(); } }
七、线程组和未处理的异常 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class MyThread extends Thread { public MyThread (String name) { super (name); } public MyThread (ThreadGroup threadGroup, String name) { super (threadGroup, name); } @Override public void run () { for (int i = 0 ; i < 20 ; i++) { System.out.println(getName() + " 线程i的变量" + i); } } } public class ThreadGroupTest { public static void main (String[] args) { ThreadGroup mainGroup = Thread.currentThread().getThreadGroup(); System.out.println("主线程组的名称:" + mainGroup.getName()); System.out.println("主线程组是否是后台线程组:" + mainGroup.isDaemon()); new MyThread("主线程组的线程" ).start(); var tg = new ThreadGroup("新线程组" ); tg.setDaemon(true ); System.out.println("tg线程组是否是后台线程组:" + tg.isDaemon()); var tt = new MyThread(tg, "tg组的线程甲" ); tt.start(); new MyThread(tg, "tg组的线程乙" ).start(); } }
下面程序为主线程设置了异常处理器,当主线程运行抛出未处理异常时,该异常处理器将会起作用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class MyExHandler implements Thread .UncaughtExceptionHandler { @Override public void uncaughtException (Thread t, Throwable e) { System.out.println(t + "线程出现了异常:" + e); } } public class ExHandler { public static void main (String[] args) { Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler()); var a = 5 / 0 ; System.out.println("程序正常结束!" ); } }
八、线程池 1、使用线程池管理线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class ThreadPoolTest { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(6 ); Runnable target = () -> { for (int i = 0 ; i < 100 ; i++) { System.out.println(Thread.currentThread().getName() + "值为" + i); } }; pool.submit(target); pool.submit(target); pool.submit(target); pool.shutdown(); } }
2、使用ForkJoinPool利用多CPU 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class PrintTask extends RecursiveAction { private static final int THRESHOLD = 50 ; private int start,end; public PrintTask (int start, int end) { this .end = end; this .start = start; } @Override protected void compute () { if (end - start < THRESHOLD) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + "值为" + i); } } else { int middle = (start + end) / 2 ; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); left.fork(); right.fork(); } } } public class ForkJoinPoolTest { public static void main (String[] args) throws InterruptedException { ForkJoinPool pool = new ForkJoinPool(); pool.submit(new PrintTask(0 , 300 )); pool.awaitTermination(2 , TimeUnit.SECONDS); pool.shutdown(); } }
下面程序示范使用了 RecursiveTask
对一个长度为100的数组元素值进行累加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class CalTask extends RecursiveTask <Integer > { private static final int THRESHOLD = 20 ; private int start,end; int arr[]; public CalTask (int [] arr, int start, int end) { this .arr = arr; this .end = end; this .start = start; } @Override protected Integer compute () { int sum = 0 ; if (end - start < THRESHOLD) { for (int i = start; i < end; i++) { sum += arr[i]; } return sum; } else { int middle = (start + end) / 2 ; CalTask left = new CalTask(arr, start, middle); CalTask right = new CalTask(arr, middle, end); left.fork(); right.fork(); return left.join() + right.join(); } } } public class Sum { public static void main (String[] args) throws InterruptedException, ExecutionException { int [] arr = new int [100 ]; Random rand = new Random(); int total = 0 ; for (int i = 0 , len = arr.length; i < len; i++) { int tmp = rand.nextInt(20 ); total += (arr[i] = tmp); } System.out.println(total); ForkJoinPool pool = ForkJoinPool.commonPool(); Future<Integer> future = pool.submit(new CalTask(arr, 0 , arr.length)); System.out.println(future.get()); pool.shutdown(); } }
九、线程相关类 1、ThreadLocal类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class Account { private ThreadLocal<String> name = new ThreadLocal<>(); public Account (String str) { this .name.set(str); System.out.println("---" + this .name.get()); } public String getName () { return this .name.get(); } public void setName (String str) { this .name.set(str); } } class MyTest extends Thread { private Account account; public MyTest (Account account, String name) { super (name); this .account = account; } public void run () { for (var i = 0 ; i < 10 ; i++) { if (i == 6 ) { account.setName(getName()); } System.out.println(account.getName() + "账户的i值" + i); } } } public class ThreadLocalTest { public static void main (String[] args) { var at = new Account("初始名" ); new MyTest(at, "线程甲" ).start(); new MyTest(at, "线程乙" ).start(); } }
2、包装线程不安全的集合 1 2 HashMap m = (HashMap) Collections.synchronizedMap(new HashMap());
3、线程安全的集合类 4、Java9新增的发布-订阅框架 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 class MySubscriber <T > implements Flow .Subscriber { private Flow.Subscription subscription; @Override public void onSubscribe (Flow.Subscription subscription) { this .subscription = subscription; subscription.request(1 ); } @Override public void onNext (Object item) { System.out.println("获取到数据" + item); subscription.request(1 ); } @Override public void onError (Throwable throwable) { throwable.printStackTrace(); synchronized ("java" ) { "java" .notifyAll(); } } @Override public void onComplete () { System.out.println("订阅结束时" ); synchronized ("java" ) { "java" .notifyAll(); } } } public class PubSubTest { public static void main (String[] args) { SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); MySubscriber<String> subscriber = new MySubscriber<>(); publisher.subscribe(subscriber); System.out.println("开发发布数据..." ); List.of("Java" , "Go" , "Erlang" , "Swift" , "Lua" ) .forEach(im -> { publisher.submit(im); try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } }); publisher.close(); synchronized ("java" ) { try { "java" .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
十、本章小结