RxJS 使用指南
使用 RxJS 中的问题总结
switchMap
switchMap
和其他打平操作符的主要区别是它具有取消效果。在每次发出时,会取消前一个内部 observable (你所提供函数的结果) 的订阅,然后订阅一个新的 observable。你可以通过短语切换成一个新的 observable来记忆它。
它能在像 typeaheads 这样的场景下完美使用,当有新的输入时便不再关心之前请求的响应结果。在内部 observable 长期存活可能会导致内存泄露的情况下,这也是一种安全的选择,例如,如果你使用 mergeMap 和 interval,并忘记正确处理内部订阅。记住,switchMap
同一时间只维护一个内部订阅。
不过要小心,在每个请求都需要完成的情况下,考虑写数据库,你可能要避免使用 switchMap
。如果源 observable 发出速度足够快的话,switchMap
可以取消请求。在这些场景中,mergeMap 是正确的选择。
1id$ = new Subject();
2
3price$ = id$.pipe(
4 switchMap(id => getPriceById(id))
5)
6
7// merge + map 很干净,省一个 subject
8isLoading$ = merge(
9 id$.pipe(mapTo(true)),
10 price$.pipe(mapTo(false))
11);
12
13getPriceById 需要返回一个 observable, unsubscribe 时 abort 请求即可。
14切商品直接 id$.next(newId), price$ 和 isLoading$ 会自动更新。
15上一次没完成的请求 switchMap 会自动 unsubscribe,简直毫无负担。
mergeMap
- flatMap 是 mergeMap 的别名!
- 如果同一时间应该只有一个内部 subscription 是有效的,请尝试
switchMap
。 - 如果内部 observables 发送和订阅的顺序很重要,请尝试
concatMap
。
exhaustMap
返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项,并返回一个 (所谓的“内部”) Observable。当它将源值投射成 Observable 时,输出 Observable 开始发出由投射的 Observable 发出的项。然而,如果前一个投射的 Observable 还未完成的话,那么 exhaustMap 会忽略每个新投射的 Observable。一旦完成,它将接受并打平下一个 内部 Observable,然后重复此过程。
1只要没有当前活动的计时器,那么每次点击就会运行一个有限的计时器。
2var clicks = Rx.Observable.fromEvent(document, 'click');
3var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000).take(5));
4result.subscribe(x => console.log(x));
总结一下
rxjs 把 Promise 转成 Observable
在 RxJS 中,你可以使用from
操作符将 Promise 转换为 Observable。这样,你就可以在 RxJS 的管道中使用 Observable 的各种操作符和方法。
以下是一个示例代码,展示了如何将 Promise 转换为 Observable:
1import { from } from "rxjs";
2
3const promise = new Promise((resolve, reject) => {
4 setTimeout(() => {
5 resolve("Promise resolved");
6 }, 2000);
7});
8
9const observable = from(promise);
10
11observable.subscribe({
12 next: (value) => console.log(value),
13 complete: () => console.log("Observable completed"),
14});
在上面的示例中,我们创建了一个 Promise,在 2 秒后解析 (resolve)。然后,使用from
操作符将这个 Promise 转换为 Observable。最后,我们订阅这个 Observable,并在next
回调中打印解析的值。
当 Promise 解析后,Observable 会发出相应的值,然后调用complete
回调来表示 Observable 完成。
这样,你就可以使用 RxJS 丰富的操作符和方法来处理异步操作了。
endWith 参数可以是 Observable ?
在 RxJS 中,endWith
操作符用于在 Observable 的末尾添加一个值或多个值。它并不接受 Observable 作为参数,而是接受一到多个参数值。
如果你想要在 Observable 的末尾添加一个 Observable,你可以使用concat
操作符来实现。concat
操作符将两个或多个 Observables 连接起来,确保前一个 Observable 完成后才会订阅并发出下一个 Observable 的值。
以下是一个示例代码,演示了如何使用concat
操作符将 Observable 添加到另一个 Observable 的末尾:
1import { of, concat } from "rxjs";
2
3const source1$ = of("Value 1", "Value 2");
4const source2$ = of("Value 3", "Value 4");
5
6const result$ = concat(source1$, source2$);
7
8result$.subscribe((value) => console.log(value));
在上面的示例中,我们创建了两个 Observable:source1$
和source2$
。然后,使用concat
操作符将它们连接起来,创建一个新的 Observable result$
。
最后,我们订阅result$
并打印所有的值。运行上面的代码将会输出以下内容:
1Value 1
2Value 2
3Value 3
4Value 4
这样,你可以将一个 Observable 添加到另一个 Observable 的末尾,并确保它们的顺序被保持。
关于 contact 操作符的一个场景
1const download = spawn(downloadCommand, {
2 shell: true,
3});
4const data$ = fromEvent(download.stdout, "data", (event: Buffer) => {
5 return {
6 data: event.toString("utf8"),
7 };
8});
9const error$ = fromEvent(download.stderr, "data", (event: Buffer) => {
10 return {
11 data: event.toString("utf8"),
12 };
13});
14const close$ = fromEvent(download, "close").pipe(
15 tap(() => {
16 this.logger.debug("download close fromEvent");
17 }),
18);
19
20const getFilename = spawn(getFilenameCommand, {
21 shell: true,
22});
23const _data$ = fromEvent(getFilename.stdout, "data", (event: Buffer) => {
24 this.logger.debug("getFilename stdout", event.toString("utf8"));
25 const filename = basename(event.toString("utf8"));
26 const downloadUrl = `https://api.powerfulyang.com/yt-dlp/${filename}`;
27 return {
28 type: "done",
29 data: downloadUrl,
30 };
31});
32const _error$ = fromEvent(getFilename.stderr, "data", (event: Buffer) => {
33 this.logger.debug("getFilename stderr", event.toString("utf8"));
34 return {
35 type: "error",
36 data: event.toString("utf8"),
37 };
38});
39const _close$ = fromEvent(getFilename, "close").pipe(
40 tap(() => {
41 this.logger.debug("getFilename close fromEvent");
42 }),
43);
44
45concat(
46 merge(data$, error$).pipe(takeUntil(close$)),
47 merge(_data$, _error$).pipe(takeUntil(_close$))
48).subscribe((event) => {
49 this.logger.debug("event", event.data);
50});
关于 concat
的几个关键点:
- 顺序执行:
concat
会等待当前的 Observable 完成后,才会开始订阅下一个 Observable。 - 完成是关键:如果前面的 Observable 没有完成(比如,它永远不发出
complete
通知),concat
将不会继续到下一个 Observable。 - 错误处理:如果任何一个 Observable 发生错误且未被适当处理(即没有
catchError
),concat
会立即停止,并将错误传递给最终的观察者。
由于这个特点,当 downloadCommand
执行完成之后才会订阅 getFilenameCommand
但是存在 getFilenameCommand
先执行完的情况,这时再去订阅 getFilenameCommand
fromEvent 不能监听到任何消息,但是使用 getFilenameCommand.on('<event>')
可以监听到消息,因为 fromEvent 需要订阅之后才会收到消息,但是该情况下执行完了之后才订阅所以没有消息。