自己尝试实现了,但是结果不是自己想要的,而且如果多个异步轮询( A 轮询 -> B 轮询 -> C 轮询),可能就会使代码变得嵌套层级多,以下是实现的具体代码,有大佬指教指教么?
import { from, Observable, Subscriber } from 'rxjs';
import { delay, mapTo, repeatWhen } from 'rxjs/operators';
interface RetryOptions<T = any, P = any> {
try: (tryRequest: P) => Promise<T>;
tryRequest: P;
retryUntil: (response: T) => boolean;
maxTimes?: number;
tick?: number;
}
export const polling = <T = any, P = any>(options: RetryOptions<T, P>) => {
options = Object.assign(
{
maxTimes: 20,
tick: 1000
},
options
);
let result = null;
const notifier = () => {
// 计数最大尝试次数
let count = 0;
const loop = (producer: Subscriber<any>) => {
// 超过最大次数强制退出轮询
if (count >= options.maxTimes) {
producer.complete();
} else {
options
.try(options.tryRequest)
.then(res => {
producer.next(count++);
// 满足条件则退出轮询
if (options.retryUntil(res)) {
producer.complete();
} else {
// 不满足条件则继续轮询
loop(producer);
}
// 保存请求结果
result = res;
})
.catch(err => {
producer.error(err);
});
}
};
return new Observable(producer => {
loop(producer);
});
};
return from([0]).pipe(
delay(options.tick),
// 当满足条件是,进行一下轮轮询
repeatWhen(notifier),
// 转换结果
mapTo(() => result)
);
};
import { polling } from './polling';
let count = 0;
const mockRequest = (): Promise<string> => {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (count < 6) {
resolve('pending');
} else {
resolve('finish');
}
count++;
}, 1000);
});
};
polling<string, number>({
try: mockRequest,
tryRequest: count,
retryUntil: res => {
return res === 'finish';
}
}).subscribe((response) => {
const result = response();
console.log(result);
if (result === 'finish') {
console.log('轮询结束');
}
// 这个轮询结束后应该怎么继续轮询比较好?
// 继续在这里 polling 下一个轮询吗?容易回调地狱啊
});
null
pending
pending
pending
pending
pending
pending
finish // 上面的都不输出,只输出最后一个结果,因为上面的我并不关注
轮询结束
1
leemove 2019-08-23 16:50:31 +08:00
是不是有点把问题复杂化了.
|
3
momocraft 2019-08-23 16:54:29 +08:00
concatMap ?
|
4
b1anker OP 发现有个 last 操作符,可以解决最后只输出最终结果
|
5
leemove 2019-08-23 17:48:30 +08:00
楼主可以看看我写的这个简单 demo 其实 rxjs 的操作符还是很强大的,而且 rxjs 的重试操作符是很强的.请求被我简化了,最大重试次数的逻辑没加.
地址: https://stackblitz.com/edit/rxjs-playground-test |
6
leemove 2019-08-23 17:50:14 +08:00
@leemove 不好意思上一条地址发错了 正确地址: https://stackblitz.com/edit/rxjs-playground-test-t8quzt
|
7
wawaforya 2019-08-23 18:34:49 +08:00
献丑了,有什么错误请轻拍😂
``` typescript import { Observable, of, race, timer } from 'rxjs'; import { concatMapTo, skipWhile, take, tap } from 'rxjs/operators'; type Result = 'pending' | 'success'; const getResult: () => Observable<Result> = () => of<Result>('success').pipe(tap(() => console.log('Requested.'))); // 向接口请求数据的函数 const limit = 20; const schedule = timer(0, 1000); const requestSource = schedule.pipe(concatMapTo(getResult()), skipWhile(result => result === 'pending')); const upperBound = schedule.pipe(skipWhile(value => value < limit)); race([requestSource, upperBound]).pipe( take(1) ).subscribe( result => console.log(result), // 这里需要判断类型,如果是数字,说明 20 次了都还没有 success error => console.error(error), () => console.log('Completed') ); ``` |
8
wawaforya 2019-08-23 18:44:41 +08:00
#7 好像不会生成一个新的请求,要把 `concatMapTo(getResult())` 改成 `concatMap(() => getResult())` 哈哈
|
10
b1anker OP @wawaforya 你这个 timer,其实跟 interval 差不多,其实我一开始也是这么搞得,但是得考虑一种情况,有可能请求完成超过 1s,这样子就不好控制了
|
11
ibufu 2019-08-23 19:46:30 +08:00 via Android
所以关键点是解决竞态。你 google 一下 rxjs 竞态,应该能搜到很多。
|
12
b1anker OP |