V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
season8
V2EX  ›  问与答

请教一个 Java 多线程嵌套使用的问题

  •  
  •   season8 · 2020-11-05 00:26:47 +08:00 · 3437 次点击
    这是一个创建于 1240 天前的主题,其中的信息可能已经有所发展或是发生改变。

    同事碰到一个问题,我写了个 demo 复现,研究了好几天还是没头绪,多线程程场景也没有调试思路,干脆发个帖,想看看有没有大佬可以指点一二。

    模拟场景:三个消费组消费异步消费,每组有三个任务,任务之间异步执行,但必须都执行完毕后消费组才算结束。 设计上,消费组线程池给 3 个线程,控制每次只有三个组能消费。 任务线程池给的是大于 3*3,按我的理解是,外层 3 个消费组,每组三个任务,实时任务应该不会超过 9 个。

    但程序执行一会儿就发现会有消费组批量涌入,导致里层线程池触发 reject 。

    Demo 如下:

    public static void main(String[] args) throws InterruptedException {
    
    		ThreadPoolExecutor outter = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(197));
    		ThreadPoolExecutor inner = new ThreadPoolExecutor(9, 12, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
    
    		for (int i = 0; i < 200; i++) {
    			int group = i;
    			outter.execute(() -> {
    				System.out.println("开始第  " +group+"  组消费");
    				CountDownLatch countDownLatch = new CountDownLatch(3);
    				for (int j = 0; j < 3; j++) {
    					int task = j;
    					inner.execute(() -> {
    						System.out.println(group + "--消费数据:" + task);
    						countDownLatch.countDown();
    					});
    				}
    				try {
    					countDownLatch.await();
    					System.out.println(group + "--消费完成");
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			});
    		}
    	}
    
    第 1 条附言  ·  2020-11-06 23:25:47 +08:00

    晚上从头到尾读了一下ThreadPoolExecutor源码,从注释开始读起,对线程池有了一个更加明确的认识,同时也发现昨天的理解任然有些不对。

    昨天提到的 execute 中的 这段:

            if (workerCountOf(c) < corePoolSize) {             @1    
                if (addWorker(command, true))                    @2
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {   @3
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))   @4
                    reject(command);
                else if (workerCountOf(recheck) == 0)               @5
                    addWorker(null, false);
            }
    
    • @1 这里workerCountOf 就是拿当前的工作线程数(不是active 线程数,见原话:The workerCount is the number of workers that have been permitted to start and not permitted to stop),当小于核心线程数,走@2
    • @2 直接增加worker,worker是需要分配线程的,这里第二个参数给的true表示分配的线程作为核心线程。
    • @3 这里的isRunning不是判断线程是否在执行,而是判断线程池状态,Running状态可以添加worker和queue(这里由于已经不满足@1,所以是入队列)
    • @4 double check ,加完worker发现线程池状态变了,吐出来。。
    • @5 double check 发现线程池状态是running ,且 没有工作线程(corePoolSize可为0),直接加worder,第二个参数为false,表示额度上限为maximumPoolSize。

    还有很多细节,并为理解的很透彻,这里结论也写的比较简洁和枯燥,如有错误的还请大佬不吝指正。

    13 条回复    2020-11-06 00:11:04 +08:00
    micean
        1
    micean  
       2020-11-05 02:21:44 +08:00
    个人猜测
    countDownLatch.countDown() 之后,outter 完成了这次任务并开始下一个了,但是 inner 还没有完成,队列塞不下
    把 inner 的队列从 1 调高一些就行了
    season8
        2
    season8  
    OP
       2020-11-05 09:23:24 +08:00
    @micean
    countDownLatch 不是最后一个 inner 线程执行完成后唤醒 outter 线程吗?那 outter 线程结束应该就意味着有三个 inner 结束。

    而且,尝试过,countDownLatch.await();之后 sleep,也是存在这个问题
    Vedar
        3
    Vedar  
       2020-11-05 09:27:07 +08:00
    你这个 outer 不停的在刷,第五组消费的时候就已经超过 inner 的容纳能力了 肯定会 reject 掉,主要原因还是你 outer 没有阻塞
    Vedar
        4
    Vedar  
       2020-11-05 09:29:19 +08:00
    @season8 你是在 outer 线程池里面开一个线程去 await 的 这根本没阻塞 outer
    micean
        5
    micean  
       2020-11-05 10:31:38 +08:00
    @season8
    outter 线程结束并不意味着有三个 inner 结束,countDownLatch 释放之后,outter 和 3 个 inner 可没有先后执行顺序
    lancelee01
        6
    lancelee01  
       2020-11-05 10:33:24 +08:00
    3L 正解,countDownLatch 没什么用,感觉你的场景需要的是限流器,全局限流即可。同时 LinkedBlockingQueue 这个队列的可能和你想的不太一样,网上的八股文不对,你试试-_-!
    wysnylc
        7
    wysnylc  
       2020-11-05 10:42:27 +08:00
    别用 countDownLatch,换成 Completablefuture
    1194129822
        8
    1194129822  
       2020-11-05 12:00:48 +08:00
    建议创建线程池时传 ThreadFactory 参数,打印不要用 System.out.println,请换成 log 可以查看是具体那条线程。inner 线程池最大任务数 = 12 + 1 (建议不要设置非核心线程)。出现这个问题并没有什么高深的原理,仅仅是线程运行的不确定性,第一轮 outter 给 inner 提交了 9 个 task,此时 inner 正常,outter 三个线程被正确的阻塞。当 inner 运行所有 countDown 后,第一轮 inner 执行还没完全结束,outter 三个线程被唤醒,**注意**,此时线程执行没有了先后顺序和逻辑关系,完全靠 os 调度器调度,如果第二轮 outter 线程三个线程先提交任务,此时 inner 线程池最多可以接受 4 个任务,就是说这一轮已经可能出现错误了。而且一旦触发 RejectedExecutionException,try-catch 没有捕获这个异常,则直接杀死 outter 的核心线程,造成 outter 线程池,execute->Rejected->kill thread->create thread->execute 的恶性循环。代码根本没走到 await,所以一旦 Rejected 就不再阻塞了。
    micean
        9
    micean  
       2020-11-05 12:51:15 +08:00
    源码里是这样的:
    final void runWorker(Worker w) {
    Runnable task = w.firstTask
    ...
    try {
    while (task != null || (task = getTask()) != null) { //queueSize-1 (返回 null 时 workerSize-1 )
    ...
    task.run(); // 任务跑完了
    ...
    }
    } finally {
    processWorkerExit(w, completedAbruptly); // workerSize-1
    }
    }

    而 reject 的条件是 [queue 满] 或者 [worker 满] ,你觉得 countDown 结束了,其实只是跑完了 run()而已
    zoharSoul
        10
    zoharSoul  
       2020-11-05 13:37:58 +08:00
    这直接用 rxjava 多方便啊...
    yexiangyang
        11
    yexiangyang  
       2020-11-05 14:16:03 +08:00
    @micean 这个源码分析很有道理啊!
    season8
        12
    season8  
    OP
       2020-11-06 00:09:19 +08:00
    @Vedar @1194129822 @lancelee01 @micean 感谢各位的热情解答,我很受启发。再结合朋友给的例子,我仔细读了下源码,已经大致能复盘这个错误了。

    **inner 线程池 reject 的原因:**

    1. 主要原因:队列太小,这里给的是 1,实际每个 outer 线程要产生 3 个任务
    2. 次要原因:outter 线程里面使用 countdownlatch 确实不能起到很好的限流作用,

    **次要原因分析:**
    如 runWorker()源码所示,run 执行完毕并不能代表线程任务执行完毕。这意味着 outter 线程与 inner 线程的空闲线程数可能不是 1:3 的关系。但这里可以通过让 outter 线程 sleep 等待 inner 先执行完成,规避这个因素的影响。规避后,问题还是会存在,说明不是主要原因。

    **主要原因分析:**
    先来看个案例
    ```
    static class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
    public MyLinkedBlockingQueue(int capacity) {
    super(capacity);
    }

    @Override
    public boolean offer(E o) {
    System.out.println("任务加入,当前队列数:" + this.size());
    return super.offer(o);
    }
    }

    public static void main(String[] args) throws InterruptedException {
    BlockingQueue queue = new MyLinkedBlockingQueue<>(1);

    // 3 个线程的线程池
    ThreadPoolExecutor taskPoolExecutor = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, queue);

    // 先将线程池拉满
    for (int i = 0; i < 3; i++) {
    final int finalI = i;
    taskPoolExecutor.execute(() -> {
    logger.info("{}", finalI);
    });
    }

    // 等待全部任务执行完
    Thread.sleep(1000);

    // 再次执行任务,发现每一个任务都触发加入队列操作。
    for (int i = 10; i < 12; i++) {
    final int finalI = i;
    // 多线程更容易触发 reject
    // new Thread(()-> taskPoolExecutor.execute(() -> logger.info("{}", finalI))).start();
    taskPoolExecutor.execute(() -> logger.info("{}", finalI));
    }
    }
    ```

    执行结果:

    > 23:12:39.988 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$main$0:34 - 2
    23:12:39.988 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$main$0:34 - 1
    23:12:39.988 [pool-1-thread-1] INFO c.r.s.Demo8.lambda$main$0:34 - 0
    任务加入,当前队列数:0
    23:12:40.997 [pool-1-thread-3] INFO c.r.s.Demo8.lambda$null$1:46 - 10
    任务加入,当前队列数:0
    23:12:41.000 [pool-1-thread-2] INFO c.r.s.Demo8.lambda$null$1:46 - 11

    跑完这个案例我感觉我根本不懂线程池,我翻了下源码:
    ```
    public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }

    // 线程池满了后,直接不创建核心线程了
    // 这里 isRunning 看的我懵逼,明明任务都执行完了,为啥还是 isRunning,先接受,后面再研究 [1]
    // 然后就触发入队列
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    else if (!addWorker(command, false))
    reject(command);
    ```

    我以为的线程池是:只要有空闲线程,任务是直接丢给线程去执行的。
    **实际情况是:当核心线程数满,不管已有线程是否空闲,任务是先丢到队列,然后空闲线程从队列里面自取。**

    案例中,我给的队列大小是 1,当队列满的时候,会扩容线程池到最大线程池大小到 12,此时如果队列是满的(不管线程是否空闲),继续添加就会 reject 。案例中每组有三个任务,只要线程从队列 take 任务不及时,队列很容易满,从而触发 reject 。

    **验证:**
    1. countDownLatch.await(); 后面加上 sleep,让 outter 线程等 inner 线程结束,排除最开始说的第二个因素的影响。
    2. 将队列改成 3,适当调整线程执行时间(也可以不调),reject 很少触发或不触发。
    3. 将队列改成 9,没有触发 reject

    **总结:**
    1. 这个任务表面是多线程嵌套调用,内外线程调度不确定性导致的线程池问题,其实本质是对线程池理解不对导致线程池滥用的问题。
    2. 任务是添加到队列,空闲线程调用 take()获取,而不是有空闲线程就直接丢到空闲线程(实际任务也难以主动去找空闲线程,还容易造成等待,让线程自取则是生产消费的模式。)
    3. isRunning(c) 这个方法以及相关机制,还要再研究一下。


    再次感谢各位,如有不对的地方,还请指出。。
    season8
        13
    season8  
    OP
       2020-11-06 00:11:04 +08:00
    啊。。评论不支持 md,排版好丑,又有点长,各位见谅。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   5607 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 06:32 · PVG 14:32 · LAX 23:32 · JFK 02:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.