首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX  ›  程序员

一条面试题引发的思考--浅谈 Java 公平锁与内存模型

  •  2
     
  •   samray · 33 天前 · 2964 次点击
    这是一个创建于 33 天前的主题,其中的信息可能已经有所发展或是发生改变。

    博客原文 一条面试题引发的思考--浅谈 Java 公平锁与内存模型

    前言

    春天来了,春招还会远么? 又到了春招的季节,随之而来的是各种的面试题。今天就看到组内大佬面试实习生的一道 Java 题目:

    编写一个程序,开启 3 个线程 A,B,C,这三个线程的输出分别为 A、B、C,每个线程将自己的 输出在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC....

    掉进坑里

    出于好奇的心态,我花了点时间来尝试解决这个问题, 主要的难点是让线程顺序地如何顺序地输出,线程之间如何交换。很快就按着思路写出了一个版本,用 Lock 来控制线程的顺序,A,B,C 线程依次启动,因为 A 线程先启动,所以 A 线程会最先拿到锁,B,C 阻塞;但是 A 输出完字符串,释放锁,B 线程获得锁,C,A 线程阻塞; 依此循环:

    public void Test(){
        private static Integer index = 0;
    
        Lock lock = new ReentrantLock();
    	
    	@Test
        public void testLock(){
            Thread threadA = work(i -> i % 3 == 0, () -> System.out.println("A"));
            Thread threadB = work(i -> i % 3 == 1, () -> System.out.println("B"));
            Thread threadC = work(i -> i % 3 == 2, () -> System.out.println("C"));
            threadA.start();
            threadB.start();
            threadC.start();
        }
    
        private Thread work(Predicate<Integer> condition, Runnable function) {
            return new Thread(() -> {
                while (index < 30) {
                    lock.lock();
                    if (condition.test(index)) {
                        function.run();
                        index++;
                    }
                    lock.unlock();
                }
            });
        }
    }
    

    输入结果如我预期那般,ABCABC 交替输出,也成功输出了 10 次,奇怪的是 A,B 却多输出了一次?

    为什么会多输出一次,不是应该恰好是输出 30 次么, 为什么会多输出一次 A,B 真的百思不得其解. 所以我把index 也打印出来查看, 结果相当奇怪:

    ...
    function.run();
    System.out.println(index);
    ....
    

    为什么 A 会是 30, B 会是 31, 不是有(index.intvalue<30) 的条件判断么, 为什么还会出现这样的数据?灵异事件?

    解惑

    灵异事件自然是不存在的,仔细分析了一番代码之后,发现了问题:

    while (index.intValue() < 30) {  // 1
        lock.lock(); // 2
        if (condition.test(index.intValue())) {
            function.run();
            index++;
        }
        lock.unlock();
    }
    

    将 1,2 行的操作做了这三件事,如下:

    1. 线程读取 index 的值
    2. 比较 index 的值是否大于 30
    3. 如果小于 30, 尝试获取锁

    换言之,当 index=29 时,线程 C 持有锁,但是锁只能阻止线程 A,线程 B 修改 index 的值,并不能阻止线程 A, 线程 B 在获取锁之前读取 index 的值,所以线程 A 读取 index=29, 并把值保持到线程的内部,如下图:

    当线程 C 执行完,还没释放锁的时候,线程 A 的 index 值为 29 ;当线程 C 释放锁,线程 A 获取锁,进入同步块的时候,因为 Java 内存模型有内存可见性的要求, 兼之 Lock 的实现类实现了内存可见,所以线程 A 的 index 值会变成 30, 这就解析了为什么线程 A index=30 的时候能跳过(index.intValue<30)的判断条件,因为执行这个判断条件的时候线程 A index=29, 进入同步块之后变成了 30:

    CPU 缓存

    为什么每个线程都会持有一个 index 值呢?来看看下面的分级缓存图:

     _______________    ______________  
     |     CPU 1     |  |     CPU 2    |  
     |   _________   |  |   _________  |  
     |  | Level 1 |  |  |  | Level 1 | |  
     |  |   Cache |  |  |  |  Cache  | |  
     |  |         |  |  |  |         | |
     |  |_________|  |  |  |_________| |  
     |_______________|  |______________|
               | |              | |
               | |              | |
              _|_|______________|_|__
             |                       |
             |      MAIN MEMORY      | 
             |_______________________|
    

    众所周知,在计算机的存储体系中,分布式存储系统比硬盘慢,硬盘比内存跑得慢,内存比 Cpu L3 level Cache 跑得慢,L3 Cache 比 L2 Cache 等等。空间越大,速度越慢,速度越快,空间越小,本质上就是空间与时间的取舍。例如,为了提高效率,就会把预计即将用到的变量index从主存缓存到 CPU 缓存,而 CPU 有多个核心,每个核心都缓存变量index。前文的问题本质就是 CPU 缓存的index与主存index不一致,而内存可见性说的就是强制 CPU 从主存获取变量index, 从而规避缓存不一致的问题

    解决方案

    把问题剖析清楚之后,解决方案就呼之欲出了:

    while (index.intValue() < 30) {  // 1
        lock.lock(); // 2
    	if(index>=30){
    		continue;
    	}
        if (condition.test(index.intValue())) {
            function.run();
            index++;
        }
        lock.unlock();
    }
    

    这种解决方法不禁让我想起单例模式里面的双重校验:

    public static Singleton getSingleton() {
        if (instance == null) {                         //Single Checked
            synchronized (Singleton.class) {
                if (instance == null) {                 //Double Checked
                    instance = new Singleton();
                }
            }
        }
        return instance ;
    }
    

    只是当时并不清楚 Double Checked 的作用,究竟解决了什么问题?只是知道不加这条语句就会造成初始化多个示例,的确是需要知其然知其所以然.

    公平锁与非公平锁

    前文说到,

    这个程序是用 Lock 来控制线程的顺序,A,B,C 线程依次启动,因为 A 线程先启动,所以 A 线程会最先拿到锁,B,C 阻塞;但是 A 输出完字符串,释放锁,B 线程获得锁,C,A 线程阻塞; 依此循环。

    粗看似乎没什么问题, 但是这里是存在着一个问题: 当线程 A 释放锁的时候,获取锁的是否一定是线程 B, 而不是线程 C, 线程 C 是否能够"插队"抢占锁? 这个就涉及到了公平锁和非公平锁的定义了: 公平锁: 线程 C 不能抢占,只能排队等待线程 B 获取并释放锁 非公平锁:线程 C 能抢占,抢到锁之后线程 B 只能继续等(有点惨!)

    而 ReentrantLock 默认恰好是非公平锁, 查看源码可知:

    /**
         * Creates an instance of {@code ReentrantLock}.
         * This is equivalent to using {@code ReentrantLock(false)}.
         */
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    

    因此为了规避非公平锁抢占的问题, 上述的代码在同步块增加了判断条件:

     if (condition.test(index.intValue())) {
       ....
     }
    

    只有符合条件的线程才能进行操作,否则就是线程自旋.(但是加锁+自旋实现起来,效率不会太高效!)

    使用公平锁

    如果使用公平锁,也可以不需要上述这样的判断条件,直接让线程顺序排队和唤醒: 通过让ReentrantLock成为公平锁. 方法也很简单, 只需要构造参数加上一个 boolean 值:

    /**
         * Creates an instance of {@code ReentrantLock} with the
         * given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    公平锁改造之后的代码如下:

    public void Test(){
        private static Integer index = 0;
        Lock lock = new ReentrantLock(true);
      @Test
        public void testLock(){
            Thread threadA = work(() -> System.out.println("A"));
            Thread threadB = work(() -> System.out.println("B"));
            Thread threadC = work(() -> System.out.println("C"));
            threadA.setName("threadA");
            threadA.start();
            threadB.setName("threadB");
            threadB.start();
            threadC.setName("threadC");
            threadC.start();
            System.out.println();
        }
    
        private Thread work(Runnable function) {
            return new Thread(() -> {
                while (index.intValue() < 30) {
                    lock.lock();
                    if (index >= 30) {
                        continue;
                    }
                    function.run();
                    index++;
                    lock.unlock();
                }
            });
        }
    }
    

    后记

    组内路过的 @碳素大佬看到我在研究这道问题的时候,给出了不一样的解决方案: 使用 AtomicInteger 作为控制手段,循环三十次,线程 A 在index%3==0时输出,线程 B 在index%3==1时输出, 线程 C 在index%3==2时输出, 不加锁, 不符合条件的线程作自旋。根据思路整理代码如下:

    public void Test(){
          private static AtomicInteger index = new AtomicInteger(0);
       @Test
        public void testLock(){
            Thread threadA = work(i -> i % 3 == 0, () -> System.out.println("A"));
            Thread threadB = work(i -> i % 3 == 1, () -> System.out.println("B"));
            Thread threadC = work(i -> i % 3 == 2, () -> System.out.println("C"));
            threadA.setName("threadA");
            threadA.start();
            threadB.setName("threadB");
            threadB.start();
            threadC.setName("threadC");
            threadC.start();
            System.out.println();
        }
    
        private Thread work(Predicate<Integer> condition, Runnable function) {
            return new Thread(() -> {
                while (index.intValue() < 30) {
                    if (condition.test(index.intValue())) {
                        function.run();
                        index.incrementAndGet();
                    }
                }
            });
        }
    }
    

    就这样,把前文所有存在的问题都完美规避,实现还很优雅,效率目测也比加锁的方式高,碳总牛🍺!!!

    55 回复  |  直到 2019-04-07 23:28:26 +08:00
        1
    zealot0630   33 天前 via Android
    如果我是考官,你这个公平锁依然是 0 分,即使在 99.99%的情况能按顺序打印出来
        2
    samray   33 天前
    公平锁只能控制顺序排队,不能控制线程每次只输出一次,所以还是需要条件的, 文章有误,特此更正:

    private Thread work(Runnable function) {
    return new Thread(() -> {
    while (index.intValue() < 30) {
    lock.lock();
    if (index >= 30) {
    continue;
    }
    if (condition.test(index.intValue())) {
    function.run();
    index++;
    }
    lock.unlock();
    }
    });
    }
        3
    zealot0630   33 天前 via Android
    CPU 调度本来就有一定随机性的,这题目要求即使在 work 里面加上 sleep(random)情况下,仍然能正确按顺序输出,你的程序做不到
        4
    jinhan13789991   33 天前
    很好的一次实践,第二个解法也很惊艳~
        5
    samray   33 天前
    @zealot0630 感谢指正,在你批评的时候,我已经意识到这个问题了,只是文章无法编辑了
        6
    zealot0630   33 天前 via Android
    这题目考点是 condition variable,去学学这玩意怎么用吧。下面那个 spin 的不评价了,虽然能工作,但是笔试估计也是 0 分,面试我会让他重写。
        7
    mortonnex   33 天前   ♥ 1
    建议看下 AQS 怎么实现 Condition 的
        8
    bxb100   33 天前
    直接 while(true) 然后将 if (index>= 30) break; 放到 lock 里面不就好了吗, 而且也要注意用 finally 包裹 lock.unlock() 吧
        9
    bxb100   33 天前
    而且用 volatile 修饰更好理解 : )
        10
    kiddult   33 天前
    最后一个答案反而错了
        11
    petelin   33 天前
    上一个 go 的版本
    ```
    package main

    import (
    "fmt"
    "sync"
    "time"
    )

    var l = sync.Mutex{}
    var cond = sync.NewCond(&l)

    var turn = 0

    func main() {
    s := []string{"A", "B", "C"}
    for _, item := range s{
    go func(x string) {
    for i := 0; i < 10; i++ {
    l.Lock()
    if s[turn] == x{
    // my turn
    fmt.Println(i, x)
    turn = (turn+1) % 3
    cond.Broadcast()
    }
    cond.Wait()
    l.Unlock()
    }
    }(item)
    }
    time.Sleep(time.Hour)
    }

    ```
        12
    pifuant   33 天前
    考题和公平锁没什么关系, 锁嘛, 直接看 aqs
        13
    petelin   33 天前
        14
    petelin   33 天前
    @petelin 有 bug 我在看下
        15
    zealot0630   33 天前 via Android
    @petelin 11 楼 只能给 50 分,因为你没理解 cv
        16
    hhhsuan   33 天前 via Android
    要我就用三个 condition,1 唤醒 2,2 唤醒 3,3 唤醒 1,完美。
        17
    kiddult   33 天前
    @zealot0630 考 condition ?用 condition 实现很难看吧,需要在外部构建锁,然后侵入 thread 的实现,这么玩感觉还不如直接 synchronized
        18
    petelin   33 天前
    @zealot0630 难道要创建三个 condition 然后挨个传递?
        19
    petelin   33 天前
    @petelin fix bug, 因为我想要在 for 循环一次就打出来 0:A 0:B 0:C 就得一直等不能跳过循环. 或者每次自己打印的时候记录一下自己打了多少次.满足就返回
        20
    lihongjie0209   33 天前
        21
    zealot0630   33 天前 via Android   ♥ 1
    16 楼理解正确,题目考察唤醒的效率,把题目中的 3 个线程换成 10000 个线程试试,使用 10000 个 cv 能精确唤醒下一个需要被唤醒的线程,而不是 10000 个线程去抢一把锁
        22
    lihongjie0209   33 天前
    @hhhsuan 还需要一个状态可以使 A 开始运行, 不然就是死锁了
        23
    zealot0630   33 天前 via Android
    @lihongjie0209 不需要,cv 本身是无状态的,你去找 example 看,cv.wait 是包在一个判断的 while 里面,由那个判断来控制状态
        24
    zealot0630   33 天前 via Android
    就是说,除了 cv 外,你还需要一个变量来记录,下面轮到谁了,先修改这个变量,再 signal 对应的 cv
        25
    zealot0630   33 天前 via Android
    而且这两个操作必须是原子的,解释了为什么 signal 只能在持有锁时候做
        26
    lihongjie0209   33 天前
    @zealot0630 cv 没有问题, 关键是 A 要特殊处理一下, 不然 A 在等待 C 而不是第一次直接运行, 那么就死锁了
        27
    zealot0630   33 天前 via Android
    @lihongjie0209 不会的,代码这样写:while(not A's turn) cv_a.wait(); A 第一次进来时候 A's turn 为真,根本不会进入等待
        28
    petelin   33 天前



    @lihongjie0209 java 太啰嗦了, 入口函数去通知 A 启动就可以了
    @zealot0630 这样写可以吗? signal 好像要等, 对应的 wait 的时候才可以生效(我就偷懒利用 goroutine 启动完的时间差)
        29
    zealot0630   33 天前 via Android
    @lihongjie0209 看了你 20 楼的代码,说真的 也很难给你及格分。最严重的错误就是你需要把 lock/unlock 放到 while 外面。因为 cv.wait 时候会释放锁
        30
    petelin   33 天前
    @petelin 没说清楚, 就是必须要 condition 调用了 wait 之后在调用 singal 才能解除阻塞, 提前调用 singal 是不行的
        31
    zealot0630   33 天前 via Android
    @petelin 不及格。看一下我 29 楼说的。并且,假唤醒了解一下,在仔细看两遍示例代码。
        32
    lihongjie0209   33 天前
    @zealot0630

    javadoc 不是这么说的

    参考: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html
    ```
    * <p>In all cases, before this method can return the current thread must
    * re-acquire the lock associated with this condition. When the
    * thread returns it is <em>guaranteed</em> to hold this lock.

    ```
        33
    zealot0630   33 天前
    @lihongjie0209 在 cv.wait 等待过程中,锁会被释放. 在等待成功后,会重新获取锁. 你再仔细理解一下.
        34
    lihongjie0209   33 天前
    @zealot0630
    javadoc 给的 demo, 看起来没问题啊

    ```
    class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
    lock.lock();
    try {
    while (count == items.length)
    notFull.await();
    items[putptr] = x;
    if (++putptr == items.length) putptr = 0;
    ++count;
    notEmpty.signal();
    } finally {
    lock.unlock();
    }
    }

    public Object take() throws InterruptedException {
    lock.lock();
    try {
    while (count == 0)
    notEmpty.await();
    Object x = items[takeptr];
    if (++takeptr == items.length) takeptr = 0;
    --count;
    notFull.signal();
    return x;
    } finally {
    lock.unlock();
    }
    }
    }


    ```
        35
    exonuclease   33 天前
    手痒来个 c++版本的。。。
    #include <thread>
    #include <atomic>
    #include <vector>

    using namespace std;

    atomic_int counter = 0;
    const vector<char> letters = { 'A','B','C' };
    void worker(int target) {
    int local_counter = 0;
    while (local_counter < 10)
    {
    if (counter.load() % 3 == target) {
    cout << letters[target] << endl;
    ++counter;
    ++local_counter;
    }
    }
    }

    int main()
    {
    thread a(worker, 0);
    thread b(worker, 1);
    thread c(worker, 2);
    a.join();
    b.join();
    c.join();
    return 0;
    }
        36
    zealot0630   33 天前   ♥ 2
    来抄答案吧,lock 放在 while 是效率考虑,虽然放在里面也可以,但是你上一个 unlock 紧接着下一次循环的 lock,你觉得这种代码有意义么?我们期望在 cv.wait 处等待,而不是在 rl.lock 地方等待



    https://scastie.scala-lang.org/k5ijcVnoTTGXtwLWqJQa2A
        37
    xrlin   33 天前   ♥ 1
    我来个 go 的好了

    ```go
    func main() {
    channels := make([]chan bool, 3)
    nextChannel := make(chan bool)

    assets := [...]string{"A", "B", "C"}
    for i := 0; i < len(channels); i++ {
    channels[i] = make(chan bool)
    go func(i int) {
    for {

    <-channels[i]
    fmt.Print(assets[i])
    nextChannel <- true

    }
    }(i)
    }

    for i := 0; i < 30; i++ {
    channels[i%len(channels)] <- true
    <-nextChannel
    }

    }
    ```
        39
    rwdy2008   32 天前
    ```
    package main

    import (
    "fmt"
    "sync"
    "sync/atomic"
    )

    func main() {
    fmt.Println("Start...")

    var x int32
    done := false
    lock := new(sync.Mutex)
    cond := sync.NewCond(lock)

    cond.L.Lock()

    go func() {
    for {
    if x%3 == 0 {
    fmt.Println("A", x)
    atomic.AddInt32(&x, 1)
    if x > 3*10-3 {
    return
    }
    }
    }
    }()

    go func() {
    for {
    if x%3 == 1 {
    fmt.Println("B", x)
    atomic.AddInt32(&x, 1)
    if x > 3*10-2 {
    return
    }
    }
    }
    }()

    go func() {
    for {
    if x%3 == 2 {
    fmt.Println("C", x)
    atomic.AddInt32(&x, 1)
    if x > 3*10-1 {
    done = true
    cond.Signal()
    }
    }
    }
    }()

    for !done {
    cond.Wait()
    }

    return
    }
    ```
        40
    xrlin   32 天前
    @rwdy2008 #39 你的代码有问题,在 GOMAXPROCS 过小(比如 1 )时会导致阻塞,goroutine 执行是不确定的,但是你的 goroutine 里都是死循环,且没有等待信号量或者 channel。
        41
    ToDyZHu   32 天前
    大三的我仍不会做这题怎么办。。。
        42
    scnace   32 天前 via Android
    @xrlin 终于看到一个 channel 版本的了🌚👍
        43
    scnace   32 天前 via Android
    这难道就是传说中的睡排序升级版?
        44
    NSVitus   32 天前
    回复的代码不排版下吗?
        45
    ydc886   32 天前
    public class Key implements Runnable {

    public final String s;
    public volatile static String flag = "A";
    public static volatile int count = 0;
    public ReentrantLock lock;
    public Condition condition;

    public static void main(String[] args) {

    final ReentrantLock lock = new ReentrantLock();
    new Thread(new Key("A", lock), "Thread-A").start();
    new Thread(new Key("B", lock), "Thread-B").start();
    new Thread(new Key("C", lock), "Thread-C").start();
    }

    public Key(final String s, final ReentrantLock lock) {
    this.s = s;
    this.lock = lock;
    this.condition = lock.newCondition();
    }

    @Override
    public void run() {
    while (true) {
    lock.lock();
    try {

    while (!flag.equalsIgnoreCase(s)) {
    try {
    condition.await(10, TimeUnit.MILLISECONDS);
    }
    catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    if (count > 29) {
    System.exit(0);
    }

    System.out.print(s + "-" + (count++) + "; ");

    switch (flag) {
    case "A":
    flag = "B";
    break;
    case "B":
    flag = "C";
    break;
    case "C":
    flag = "A";
    break;
    default:
    throw new RuntimeException("");
    }
    condition.signalAll();
    }
    finally {
    lock.unlock();
    }
    }
    }
    }
    ---

    这种实现可行吗?
        46
    Graypn   32 天前
    二次判断 >= 30 的时候,是不是要先 unlock() ,再 break,释放资源,要不然进程没办法结束
        47
    rwdy2008   32 天前
    @xrlin 你说的很对,感谢
        48
    ydc886   32 天前
        49
    cyhulk   32 天前
    稍微修改下,加上 volatile 和 lock 进入的位置,应该就可以了
    public class ThreadTest {
    private static volatile Integer index = 0;

    Lock lock = new ReentrantLock();

    @Test
    public void testLock() {
    Thread threadA = work(i -> i % 3 == 0, () -> System.out.println("A"));
    Thread threadB = work(i -> i % 3 == 1, () -> System.out.println("B"));
    Thread threadC = work(i -> i % 3 == 2, () -> System.out.println("C"));
    threadA.start();
    threadB.start();
    threadC.start();
    }

    private Thread work(Predicate<Integer> condition, Runnable function) {
    return new Thread(() -> {
    while (true) {
    lock.lock();
    if (index >= 30) {
    break;
    }
    if (condition.test(index)) {
    function.run();
    index++;
    }
    lock.unlock();
    }
    });
    }
    }
        50
    ZiLong   32 天前
    @ydc886 目测,大佬喊过不要再 while 里面 lock,提到 while 外面去
        51
    wdmx007   32 天前
    来一发 java 升级版的,求大佬们指教

        52
    bwangel   29 天前


    楼主,是这样的么,检查条件,i++ 都要上锁
        53
    yehen   25 天前
    package com.ljw.HelloJava;

    import java.util.concurrent.TimeUnit;
    import java.util.function.Predicate;

    public class ABCThreads {
    private static Integer index = 0;
    private static Integer max = 6;
    private static Object lock = new Object();

    public static void main(String[] args) {

    Thread a = getThread(i -> i % 3 == 0, "A");
    Thread b = getThread(i -> i % 3 == 1, "B");
    Thread c = getThread(i -> i % 3 == 2, "C");
    a.start();
    b.start();
    c.start();

    }

    private static Thread getThread(Predicate<Integer> condition, String value) {
    return new Thread(() -> {
    while (true) {
    synchronized (lock) {
    while (!condition.test(index)) {
    try {
    lock.wait();
    } catch (InterruptedException e) {
    System.out.println(e.getMessage());
    }
    }
    if (index >= max) {
    return;
    }
    System.out.printf("index:%s,value:%s\n", index, value);
    index++;
    lock.notifyAll();
    }
    }
    });
    }
    }
        54
    oaix   16 天前
    为什么不适用 Semaphore 呢?
        55
    coffeSlider   15 天前 via Android
    第一反应就是用 AtomicInteger 控制。
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2858 人在线   最高记录 5043   ·  
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 23ms · UTC 12:21 · PVG 20:21 · LAX 05:21 · JFK 08:21
    ♥ Do have faith in what you're doing.
    沪ICP备16043287号-1