java并发(一)

导航

线程与进程

  • 进程是系统进行资源调用或分配的最小单位,一个进程拥有多个线程,多个线程共享资源
    • 因为线程之间共享资源,所以线程之间的通信比进程更加容易,所以多线程并发也会带来数据安全问题
    • 进程的常见的通信方式
      • 不同机器上:套接字 socket
      • 映射共享内存
      • 读写进程利用管道进行通信
  • 进程切换时涉及前后 cpu 环境的保存与设置,而线程切换是仅需要保存和设置少量寄存器内容
    • 所以线程的上下文切换比进程快许多
      • 虽然线程的上下文比进程快许多,但依然是有额外消耗的。那为什么我们还要用多线程并发而不是单线程呢?
        • 因为虽然多线程不会提高程序的运行速度,但可以减少用户的等待时间,提升资源的利用率。
        • 因为 cpu 计算效率非常高,远远快于对资源的读写等操作。所以当一个线程因为请求资源或其他原因处于堵塞状态时,可以切换到其他线程进行操作,提升了资源的利用率。

线程的状态

  • 新建(new)
  • 阻塞
    • blocked:进入 synchronized 代码块之前申请监视器锁等待时
    • waiting:调用了 Object.wait()或者 Thread.join()或者 LockSupport.park(),等待唤醒
    • timed_waiting
  • 运行(runable)
  • 消亡(terminated)

常用函数

  • sleep 与 wait
    • sleep:当前线程睡眠,睡眠不释放锁
    • wait:必须和 syncronized 关键字一起使用,进入阻塞状态,必须 notify()或 notifyAll()并且重新占用互斥锁后才会继续运行。wait 时释放锁。
  • join:等待调用线程运行执行完毕后执行
  • yield:放弃当前 cpu 时间片,但不阻塞,随时可再次分得时间片

线程活性故障

  • 死锁:多个线程互相等待对方而被永远暂停
    • 条件
      • 资源互斥
      • 请求与保持
      • 不剥夺
      • 循环等待
    • 避免方法
      • 粗锁法
      • 锁排序法
  • 锁死:唤醒条件无法成立或其他条件无法唤醒
    • 信号丢失
    • 嵌套监视器锁死
  • 活锁:当一个线程一直处于运行状态,但是其所执行的任务却没有任何进展。比如,一个线程一直在申请其所需要的资源,但是却无法申请成功。
  • 线程饥饿

线程安全

  • 原子性:对共享变量的操作,在其他线程看来是不可分割的
    • cas
    • 除 long 和 double 外任何类型变量的写操作
    • volatile 修饰的变量的写操作
  • 可见性:对共享变量的更新,对后续访问线程是否可见
  • 有序性

关键字

  • synchronized
    • 非公平调度。jvm 给每个内部锁分配一个入口集(entry list),用于记录等待获得相应内部锁的线程。当锁被释放时,该入口集任意一个线程将会被唤醒,从而获得申请锁的机会。同时,被唤醒的线程等待占用处理器时间时,可能还有其他性的活跃线程抢占锁。
    • 锁升级:偏向锁->轻量级锁->重量级锁
  • volatile:轻量级锁,保证可见性与有序性
    • 可见性

常用类

Lock

  • 和synchronized区别
    • lock:api级别,synchronized:jvm级别
    • lock:可以实现公平锁
    • lock:可以通过condition绑定多个条件
    • lock:同步非阻塞,采用的是乐观并发策略,synchronized:同步阻塞,使用的是悲观并发策略
    • lock:发生异常时需要主动unlock()释放锁,synchronized:发生异常时自动释放锁
    • lock:响应中断,synchronized:不响应

线程池 Executor

  • 刚提交任务时,如果工作线程少于核心线程数(corePoolSize),则添加新线程;若大于等于核心线程数,则加入队列(workQueue);若队列已满,则创建新线程;若线程数量超过最大线程数(maximumPoolSize),则执行拒绝策略。
  • 参数
    • corePoolSize
    • maximumPoolSize
    • keepAliveTime:线程空闲但不被回收的时间
    • unit:时间单位
    • workQueue
    • threadFactory
    • handler:拒绝策略
  • 常见线程池类型
    • newCachedThreadPool:核心线程数 0,最大线程 Integer.MAX_VALUE,来一个创建一个线程
      • 使用 SynchronousQueue:生产完成后,必须消费掉才能继续生产
    • newFixedThreadPool:核心线程数 n,最大线程数 n,当达到核心线程数,就不会再增加
      • 使用 LinkedBlockingQueue
    • newSingleThreadExecutor:核心线程数 1,最大线程数 1,只有一个线程
      • 使用 LinkedBlockingQueue

CountDownLatch 和 CyclicBarrier

  • CountDownLatch:倒计时协调器,可以实现一或多个线程等待其余线程完成特定操作后,继续运行
    • 内部维护一计时器,countDown()每执行一次计数器减一
    • 计数器不为 0 时,调用 await()的线程将处于等待状态
    • 计数器为 0 时,所有等待线程被唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class T {
private static CountDownLatch countDownLatch = new CountDownLatch(5);

private static class Player implements Runnable {
@Override
public void run() {
System.out.println("player: " + Thread.currentThread().getName() + " 完成");
countDownLatch.countDown();
}
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new Player(), i + "");
thread.start();
}
countDownLatch.await();

System.out.println("全部玩家准备完成");
}
}

运行结果

1
2
3
4
5
6
player: 1 完成
player: 3 完成
player: 4 完成
player: 2 完成
player: 0 完成
全部玩家准备完成
  • CyclicBarrier:栅栏,可以实现多个线程相互等待至指定地点,再一起接着执行
    • 维护了一个显示锁 ReentrantLock。参与方执行 await()实现等待,当最后一个参与方调用 await()时,所有参与方继续执行
    • 内部维护了一个计数器 count=参与方的个数,调用 await()使 count-1。
    • 当判断是最后一个参与方时,调用 singalAll()唤醒所有
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class T2 {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

private static class Player implements Runnable {
@SneakyThrows
@Override
public void run() {
for (int i = 0; i < 3; i++) {
System.out.println("player: " + Thread.currentThread().getName() + " 翻过了第 " + i + " 个栅栏");
cyclicBarrier.await();
}
}
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new Player(), i+"");
thread.start();
}

System.out.println("main end");
}

}

运行结果

1
2
3
4
5
6
7
8
9
10
main end
player: 0 翻过了第 0 个栅栏
player: 2 翻过了第 0 个栅栏
player: 1 翻过了第 0 个栅栏
player: 1 翻过了第 1 个栅栏
player: 2 翻过了第 1 个栅栏
player: 0 翻过了第 1 个栅栏
player: 0 翻过了第 2 个栅栏
player: 2 翻过了第 2 个栅栏
player: 1 翻过了第 2 个栅栏

Semaphore

基于计数的信号量,可以设定一个阈值,是多个线程竞争获取许可,超过许可后,线程将会被堵塞。

1
2
3
4
5
6
7
8
9
Semaphore semaphore = new Semaphore(3);

try {
semaphore.acquire();
} catch (InterruptedException e) {

} finally {
semaphore.release();
}

ThreadLocal

  • 为每个线程提供独立的变量副本
  • 每个线程维护 ThreadLocalMap,每个 entry:ThreadLocal->线程特有的对象
  • entry 对 key 的引用是弱引用,对 value 的引用是强引用

AQS(AbstractQueuedSynchronizer)

抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,许多同步类都依赖它,比如ReentrantLock、Semaphore、CountDownLatch等。AQS维护了一个volatile int state和一个fifo等待队列(多线程争用资源被阻塞是会进入此队列)。AQS定义了两种资源的共享方式,一种是独占式(exclusice,如reentrantlock),一种是共享式(share,如semephore、countDownLatch)。

自定义同步器主要实现:

  • isHeldExclusively():该线程是否正在独占资源,只有用到condition时需要实现
  • tryAcquire(int) bool:独占式。尝试获取资源
  • tryRelease(int) bool:独占式。尝试释放资源
  • tryAcquireShared(int) int:共享式。尝试获取资源,小于0失败,等于0成功但无剩余可用资源,大于0成功且有可用资源
  • tryReleaseShared(int) bool:共享式。尝试释放资源,释放后允许唤醒后续等待结点返回true,否则返回false