V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
Gct012
V2EX  ›  程序员

弱鸡求教一个关于 Java 多层 for 循环效率问题

  •  1
     
  •   Gct012 · 2023-03-10 14:56:34 +08:00 · 3202 次点击
    这是一个创建于 656 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我是一个菜鸡开发,目前遇到一个需求想请教下各位大佬。假设需要改造以下 For 循环

    for(int i = 0; i < list.size() ; i ++){
    
      List resultList = HttpRequest.post(url).body(list.get(i));
      
      for(int j = 0 ; j < resultList.size() ; j++){
        var resultA = functionA(resultList.get(j));
        var resultB = functionB(resultA);
        var resultC = functionC(resultB);
      }
    }
    

    其中 list 数据来源 API 接口,数据量在 100 到 1000 不等。functionA 、B 、C 都有业务逻辑( Http 请求,数据库查询等,都是需要串行执行的)。目前单线程运行比较慢,想问下有什么比较好的办法可以提高处理效率?

    我打算使用多线程并行处理 list 的数据,但是里面那层 for 循环数据量也比较大(多的可能有 1 万条),里面那层不知道有没有办法也可以加快效率的?或者针对这类场景是否有比较通用的解决办法?

    31 条回复    2023-03-10 22:55:25 +08:00
    idealhs
        1
    idealhs  
       2023-03-10 15:03:21 +08:00
    没写过 java ,个人思路应该把 List resultList = HttpRequest.post(url).body(list.get(i)); 提出来,结果使用 yield return 。给下面的业务逻辑调用。然后你的 HttpRequest 应该可以换成 Async 的方法,在业务逻辑里面使用到 resultList[i]的时候, 去 await
    sbex
        2
    sbex  
       2023-03-10 15:04:49 +08:00
    这个主要还是得分析具体性能瓶颈在哪里,单从代码看目前只能想到线程池。
    RyanLeeCUP
        3
    RyanLeeCUP  
       2023-03-10 15:07:59 +08:00
    你可以先合并一下不可并行的操作,比如 List 可以并行处理的,如果量大就拆分成若干个批次去并行
    function A B C 不能并行,所以看成一个行为,对 resultList 也可以并行化
    把这个当成管道 pipe(listAction).pipe(resultAction).execute()
    大概这样
    LeegoYih
        4
    LeegoYih  
       2023-03-10 15:09:46 +08:00
    如果只需要保证 functionABC 的调用顺序可以用 Fork/Join
    resultList.parallelStream()
    .map(o -> functionA(o))
    .map(o -> functionB(o))
    .map(o -> functionC(o))
    .collect(Collectors.toList());
    Ericality
        5
    Ericality  
       2023-03-10 15:11:16 +08:00
    function ABC 是否可以直接用多线程来执行? 如果可以的话就可以节约很多资源
    或者直接用 parallelStream 来代替 for 循环处理 也可以直接并行处理
    但是具体要看对数据的操作方式吧 如果没有时序要求可以考虑
    还有我也是个菜鸡 如果说的不对望后面大佬指出
    awalkingman
        6
    awalkingman  
       2023-03-10 15:11:41 +08:00
    内层也开线程,CountDownLatch 等待归集结果。
    jiajianjava
        7
    jiajianjava  
       2023-03-10 15:14:18 +08:00
    这个应该是生产消费模式, 多线程生产者请求数据,把数据提交个阻塞队列, 多线程消费者从队列获取数据 处理 ABC 任务
    Gct012
        8
    Gct012  
    OP
       2023-03-10 15:21:11 +08:00
    @Ericality 里面的 Function ABC 目前看下来只能串行操作。外层的 For 循环我打算用 parallelStream 来遍历,但是不太确定里面的那层循环是否还能开并行流...
    justNoBody
        9
    justNoBody  
       2023-03-10 15:23:36 +08:00
    把 4 楼和 7 楼的答案结合一下就好了,然后记得如果使用 parallelStream 务必要自定义线程池,使用默认的线程池会导致其他任务阻塞
    Gct012
        10
    Gct012  
    OP
       2023-03-10 15:26:04 +08:00
    @RyanLeeCUP 大佬你说的这个 pipe 是标准还是第三方库的额,我貌似没搜到类似的写法...
    awalkingman
        11
    awalkingman  
       2023-03-10 15:27:00 +08:00   ❤️ 1
    for(int i = 0; i < list.size() ; i ++){
    List resultList = HttpRequest.post(url).body(list.get(i));
    CountDownLatch latch = new CountDownLatch(resultList.size());
    for(int j = 0 ; j < resultList.size() ; j++){
    // 这里继续开线程
    {
    var resultA = functionA(resultList.get(j));
    var resultB = functionB(resultA);
    var resultC = functionC(resultB);
    latch.countDown();
    }
    latch.await();
    }
    }
    Gct012
        12
    Gct012  
    OP
       2023-03-10 15:28:22 +08:00
    @justNoBody 那这样的话是不是外层和里层的循环得拆成两个队列?
    dqzcwxb
        13
    dqzcwxb  
       2023-03-10 15:28:45 +08:00
    不要用 parallelStream 去做 io 操作,parallelStream 只推荐在 cpu 密集型任务时使用
    你这个用 completablefuture 是很合适的
    Gct012
        14
    Gct012  
    OP
       2023-03-10 15:29:32 +08:00
    @newskillsget 感谢大佬!我试试
    DreamStar
        15
    DreamStar  
       2023-03-10 15:31:04 +08:00
    先从业务上调整, 能整合的整合, 能合并的合并.
    其次同步转异步, 事件驱动用消息队列+本地事件表,根据具体的消费能力调整并发即可.
    你这个量用单进程多线程做稳定性太差,吞吐量太低,没啥可观测性.
    Martin9
        16
    Martin9  
       2023-03-10 15:31:24 +08:00
    以下回答来自 chatgpt, 供你参考。

    针对这个场景,使用多线程并行处理 list 数据可以提高处理效率。你可以使用 Java 的线程池来实现多线程处理。Java 提供的线程池可以在多个线程之间共享一组线程,可以重复利用线程,减少线程创建和销毁的开销,从而提高效率。

    在处理大量数据时,可以考虑使用分治思想,将数据分成若干份,分别交给不同的线程去处理,处理完成后再将结果合并。这样可以充分利用多核 CPU 的性能,提高并行处理的效率。

    对于里面那层 for 循环的处理,你可以使用并行流来提高处理效率。Java 8 引入了 Stream API ,可以方便地进行并行处理。你可以使用 stream() 方法将结果列表转换成流,然后使用 parallel() 方法将流转换为并行流,最后使用 forEach() 方法对流进行处理。

    List<List> resultLists = new ArrayList<>();
    IntStream.range(0, list.size())
    .parallel()
    .forEach(i -> {
    List resultList = HttpRequest.post(url).body(list.get(i));
    resultLists.add(resultList);
    });

    List results = resultLists.stream()
    .flatMap(List::stream)
    .parallel()
    .map(result -> {
    var resultA = functionA(result);
    var resultB = functionB(resultA);
    var resultC = functionC(resultB);
    return resultC;
    })
    .collect(Collectors.toList());
    这里使用了 Java 8 的 Stream API ,通过并行处理来提高处理效率。第一个 forEach() 方法将结果列表转换成流,并行地处理列表中的每个元素,将结果添加到结果列表中。第二个流中的 flatMap() 方法将多个结果列表合并成一个流,然后并行地对每个结果进行处理,最后将结果收集到一个列表中。
    Ericality
        17
    Ericality  
       2023-03-10 15:35:46 +08:00
    @Gct012 可以 但是好像你想要 ABC 顺序执行 那为什么不直接在外面开多线程呢
    即 ThreadUtils.excutor.excute(list ->
    functionA(list.get(i));
    functionB(resultA);
    functionC(resultC);
    )
    同时我注意到 resultList 是一个 http 请求 那是不是这个 list 请求一遍就可以了?
    List resultList = HttpRequest.post(url).body(list.get(i));
    //下面是原来的 for 循环的代替
    resultList.stream.paralla.map(currentList ->{
    ThreadUtiles.excutor.excute
    .....
    }

    )
    以上都是伪代码哈 只是提供一个思路
    awalkingman
        18
    awalkingman  
       2023-03-10 15:38:03 +08:00
    @jiajianjava 优雅一点做法确实如此。多线程开发调试比较麻烦的,可以把外层结果都提取到队列里,然后再开多个消费者消费,这样观测性高(看生产者有没有生产,看队列有没有消费和消费后的结果是否符合预期)和调试难度也比较低。
    leonshaw
        19
    leonshaw  
       2023-03-10 15:48:46 +08:00
    内层 10000 个开线程太多了,池满了一样阻塞。要改成异步或者试试虚线程。
    luckyrayyy
        20
    luckyrayyy  
       2023-03-10 15:51:52 +08:00
    list.parallelStrem()
    .flatMap(item -> {return HttpRequest.post(url).body(item).stream();})
    .parallel()
    .forEach(item -> {
    funcC(funcB(funcA(item)));
    });

    这样?
    Aluneth
        21
    Aluneth  
       2023-03-10 16:18:09 +08:00
    能不能 f 将 uncABC 视为一个函数,去掉外部循环,将需要跟中间件交流的地方都改成批量执行。最终还是要看这个循环耗时的点具体在什么地方。
    RyanLeeCUP
        22
    RyanLeeCUP  
       2023-03-10 16:28:59 +08:00
    @Gct012 不是库 类似伪代码,担心并发量就分批,然后把串行操作抽象成一个节点,节点任务并行化,最后会成管道式的流程,最后管道本身也可以被并行化 按这个思路去设计
    ldyisbest
        23
    ldyisbest  
       2023-03-10 16:30:31 +08:00
    List resultList = HttpRequest.post(url).body(list.get(i)); 提到最外层, 用批量了接口,思想是减少 http 请求次数,functionA,B,C 里面也这样做
    levintrueno
        24
    levintrueno  
       2023-03-10 17:51:44 +08:00
    public class Code {

    // 执行外层任务的线程池
    static ExecutorService outerExecutor = Executors.newFixedThreadPool(8);

    // 执行内层任务的线程池
    static ExecutorService innerExecutor = Executors.newFixedThreadPool(16);

    // 任务总数
    static AtomicInteger taskCount = new AtomicInteger();

    static String url = "url";

    static Random random = ThreadLocalRandom.current();

    public static void optimization() {

    StopWatch stopWatch = new StopWatch();
    stopWatch.start();

    // 模拟任务
    final int maxTask = random.nextInt(1000);

    System.out.println("外层总任务数:" + maxTask);

    List<String> list = IntStream.rangeClosed(1, maxTask).mapToObj(String::valueOf).collect(Collectors.toList());

    // 50 个任务一组
    final List<List<String>> partition = Lists.partition(list, 50);

    System.out.println("拆分任务数量:" + partition.size());

    partition.parallelStream()
    .map(task -> CompletableFuture.runAsync(new OuterTask(task), outerExecutor))
    .forEach(CompletableFuture::join);

    System.out.println("taskCount = " + taskCount);
    stopWatch.stop();
    System.out.println("耗时:" + stopWatch.getTotalTimeSeconds());

    innerExecutor.shutdown();
    outerExecutor.shutdown();

    }

    private static class OuterTask implements Runnable {

    private final List<String> tasks;

    public OuterTask(List<String> tasks) {
    this.tasks = tasks;
    }

    @Override
    public void run() {
    tasks.parallelStream()
    .map(task -> CompletableFuture.runAsync(new InnerTask(task), innerExecutor))
    .forEach(CompletableFuture::join);
    }
    }

    private static class InnerTask implements Runnable {

    private final String body;

    public InnerTask(String body) {
    this.body = body;
    }

    @Override
    public void run() {
    final List<String> responseResult = HttpRequest.post(url).body(body);

    for (String aParam : responseResult) {
    final String bParam = functionA(aParam);
    final String cParam = functionB(bParam);
    final String result = functionC(cParam);

    // handle result...

    taskCount.incrementAndGet();
    }
    }
    }
    }

    考虑不周,仅作参考。。。
    Dahunvwu
        25
    Dahunvwu  
       2023-03-10 17:58:03 +08:00
    disruptor 框架了解一下,或者使用 1.8 的 CompletionService
    disruptor.handleEventsWithWorkerPool(poolA)
    .thenHandleEventsWithWorkerPool(poolB)
    .thenHandleEventsWithWorkerPool(poolC)
    .thenHandleEventsWithWorkerPool(poolD)
    ymz
        26
    ymz  
       2023-03-10 18:04:42 +08:00
    线程池,以及 resultList 需要和数据库打交道的一起提出来,切分后一条 sql 处理若干个,CPU 计算很快的,主要还是 IO
    ration
        27
    ration  
       2023-03-10 18:30:02 +08:00 via Android
    看看瓶颈在哪里,http 请求还是数据库,添加日志看看时间多少。接着有些能合并请求的合并,多线程作为最后的手段。曾经试过下载文件的,实际上太多线程没用,网络限制在那里,提高带宽就好了。
    janus77
        28
    janus77  
       2023-03-10 18:37:44 +08:00
    你拿到 A 只是为了传入 B ?拿到 B 只是为了传入 C ?那可以把 funcABC 压缩合并简化一下
    night98
        29
    night98  
       2023-03-10 22:28:57 +08:00
    第二行可以看看接口提供方能否提供批量接口,这样速度会快一点,然后下面的 abc 方法只能通过增加线程数的方式提升效率,或者上异步。
    L0L
        30
    L0L  
       2023-03-10 22:47:02 +08:00 via Android
    @luckyrayyy 20 楼流操作大师,并发,还不会冲突。
    ychost
        31
    ychost  
       2023-03-10 22:55:25 +08:00
    Java19 的 Loom 开一开试一试,不行的话实时 reactor
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5604 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 03:18 · PVG 11:18 · LAX 19:18 · JFK 22:18
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.