V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐关注
Meteor
JSLint - a JavaScript code quality tool
jsFiddle
D3.js
WebStorm
推荐书目
JavaScript 权威指南第 5 版
Closure: The Definitive Guide
b1anker
V2EX  ›  JavaScript

rxjs 如何优雅的处理轮询任务?

  •  1
     
  •   b1anker · 2019-08-23 16:43:25 +08:00 · 5597 次点击
    这是一个创建于 1904 天前的主题,其中的信息可能已经有所发展或是发生改变。

    需求:

    • 假设异步请求返回有一个状态,值为 pending 或者 success
    • 异步请求如果返回 pending,则等待一秒后重新发送这个异步请求,直到返回 success
    • 最终使用 subscribe 只返回最终结果,中间过程的输出不走 subscribe

    自己的实现

    自己尝试实现了,但是结果不是自己想要的,而且如果多个异步轮询( A 轮询 -> B 轮询 -> C 轮询),可能就会使代码变得嵌套层级多,以下是实现的具体代码,有大佬指教指教么?

    轮询代码

    import { from, Observable, Subscriber } from 'rxjs';
    import { delay, mapTo, repeatWhen } from 'rxjs/operators';
    
    interface RetryOptions<T = any, P = any> {
      try: (tryRequest: P) => Promise<T>;
      tryRequest: P;
      retryUntil: (response: T) => boolean;
      maxTimes?: number;
      tick?: number;
    }
    
    export const polling = <T = any, P = any>(options: RetryOptions<T, P>) => {
      options = Object.assign(
        {
          maxTimes: 20,
          tick: 1000
        },
        options
      );
      let result = null;
    
      const notifier = () => {
        // 计数最大尝试次数
        let count = 0;
        const loop = (producer: Subscriber<any>) => {
          // 超过最大次数强制退出轮询
          if (count >= options.maxTimes) {
            producer.complete();
          } else {
            options
              .try(options.tryRequest)
              .then(res => {
                producer.next(count++);
                // 满足条件则退出轮询
                if (options.retryUntil(res)) {
                  producer.complete();
                } else {
                // 不满足条件则继续轮询
                  loop(producer);
                }
                // 保存请求结果
                result = res;
              })
              .catch(err => {
                producer.error(err);
              });
          }
        };
        return new Observable(producer => {
          loop(producer);
        });
      };
    
      return from([0]).pipe(
        delay(options.tick),
        // 当满足条件是,进行一下轮轮询
        repeatWhen(notifier),
        // 转换结果
        mapTo(() => result)
      );
    };
    

    测试用例

    import { polling } from './polling';
    
    let count = 0;
    
    const mockRequest = (): Promise<string> => {
      return new Promise((resolve, reject) => {
        setTimeout(() => {
          if (count < 6) {
            resolve('pending');
          } else {
            resolve('finish');
          }
          count++;
        }, 1000);
      });
    };
    
    polling<string, number>({
      try: mockRequest,
      tryRequest: count,
      retryUntil: res => {
        return res === 'finish';
      }
    }).subscribe((response) => {
      const result = response();
      console.log(result);
      if (result === 'finish') {
        console.log('轮询结束');
      }
      // 这个轮询结束后应该怎么继续轮询比较好?
      // 继续在这里 polling 下一个轮询吗?容易回调地狱啊
    });
    
    

    结果

    null
    pending
    pending
    pending
    pending
    pending
    pending
    finish // 上面的都不输出,只输出最后一个结果,因为上面的我并不关注
    轮询结束
    
    17 条回复    2019-09-30 17:43:32 +08:00
    leemove
        1
    leemove  
       2019-08-23 16:50:31 +08:00
    是不是有点把问题复杂化了.
    b1anker
        2
    b1anker  
    OP
       2019-08-23 16:54:05 +08:00
    @leemove 那应该怎么处理呢?
    momocraft
        3
    momocraft  
       2019-08-23 16:54:29 +08:00
    concatMap ?
    b1anker
        4
    b1anker  
    OP
       2019-08-23 16:57:50 +08:00
    发现有个 last 操作符,可以解决最后只输出最终结果
    leemove
        5
    leemove  
       2019-08-23 17:48:30 +08:00
    楼主可以看看我写的这个简单 demo 其实 rxjs 的操作符还是很强大的,而且 rxjs 的重试操作符是很强的.请求被我简化了,最大重试次数的逻辑没加.
    地址: https://stackblitz.com/edit/rxjs-playground-test
    leemove
        6
    leemove  
       2019-08-23 17:50:14 +08:00
    @leemove 不好意思上一条地址发错了 正确地址: https://stackblitz.com/edit/rxjs-playground-test-t8quzt
    wawaforya
        7
    wawaforya  
       2019-08-23 18:34:49 +08:00
    献丑了,有什么错误请轻拍😂

    ``` typescript
    import { Observable, of, race, timer } from 'rxjs';
    import { concatMapTo, skipWhile, take, tap } from 'rxjs/operators';

    type Result = 'pending' | 'success';
    const getResult: () => Observable<Result> = () => of<Result>('success').pipe(tap(() => console.log('Requested.'))); // 向接口请求数据的函数

    const limit = 20;
    const schedule = timer(0, 1000);
    const requestSource = schedule.pipe(concatMapTo(getResult()), skipWhile(result => result === 'pending'));
    const upperBound = schedule.pipe(skipWhile(value => value < limit));
    race([requestSource, upperBound]).pipe(
    take(1)
    ).subscribe(
    result => console.log(result), // 这里需要判断类型,如果是数字,说明 20 次了都还没有 success
    error => console.error(error),
    () => console.log('Completed')
    );
    ```
    wawaforya
        8
    wawaforya  
       2019-08-23 18:44:41 +08:00
    #7 好像不会生成一个新的请求,要把 `concatMapTo(getResult())` 改成 `concatMap(() => getResult())` 哈哈
    b1anker
        9
    b1anker  
    OP
       2019-08-23 19:00:52 +08:00
    @leemove 你这个一直报错呀
    b1anker
        10
    b1anker  
    OP
       2019-08-23 19:03:41 +08:00
    @wawaforya 你这个 timer,其实跟 interval 差不多,其实我一开始也是这么搞得,但是得考虑一种情况,有可能请求完成超过 1s,这样子就不好控制了
    ibufu
        11
    ibufu  
       2019-08-23 19:46:30 +08:00 via Android
    所以关键点是解决竞态。你 google 一下 rxjs 竞态,应该能搜到很多。
    b1anker
        12
    b1anker  
    OP
       2019-08-24 00:07:13 +08:00
    wawaforya
        13
    wawaforya  
       2019-08-24 11:53:00 +08:00 via Android
    @b1anker 哇,有道理,还要考虑请求时间的问题,学到了👍
    leemove
        14
    leemove  
       2019-08-27 14:05:12 +08:00
    @b1anker 没看到 expand 啊..这版和我那版好像是一样的啊...就是你用 promise 请求,我用的是 of 直接返回 Ob
    leemove
        15
    leemove  
       2019-08-27 14:07:04 +08:00
    @ibufu 轮询不涉及竞态的,就是一个简单的同步循环请求而已.
    b1anker
        16
    b1anker  
    OP
       2019-08-27 19:33:08 +08:00
    @leemove 你可能看错文件了,你看看 polling.ts
    malcolmyu
        17
    malcolmyu  
       2019-09-30 17:43:32 +08:00
    @b1anker 是否可以把 pending 作为一种异常推出来,然后用 retryWhen 来解决
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2813 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 02:02 · PVG 10:02 · LAX 18:02 · JFK 21:02
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.