RxJS 使用指南

使用 RxJS 中的问题总结

switchMap

switchMap 和其他打平操作符的主要区别是它具有取消效果。在每次发出时,会取消前一个内部 observable (你所提供函数的结果) 的订阅,然后订阅一个新的 observable。你可以通过短语切换成一个新的 observable来记忆它。 它能在像 typeaheads 这样的场景下完美使用,当有新的输入时便不再关心之前请求的响应结果。在内部 observable 长期存活可能会导致内存泄露的情况下,这也是一种安全的选择,例如,如果你使用 mergeMap 和 interval,并忘记正确处理内部订阅。记住,switchMap 同一时间只维护一个内部订阅。

不过要小心,在每个请求都需要完成的情况下,考虑写数据库,你可能要避免使用 switchMap。如果源 observable 发出速度足够快的话,switchMap 可以取消请求。在这些场景中,mergeMap 是正确的选择。

一个 v2ex 上的帖子

js
1id$ = new Subject();
2
3price$ = id$.pipe(
4    switchMap(id => getPriceById(id))
5)
6
7// merge + map 很干净,省一个 subject
8isLoading$ = merge(
9    id$.pipe(mapTo(true)),
10    price$.pipe(mapTo(false))
11);
12
13getPriceById 需要返回一个 observable, unsubscribe 时 abort 请求即可。
14切商品直接 id$.next(newId), price$ 和 isLoading$ 会自动更新。
15上一次没完成的请求 switchMap 会自动 unsubscribe,简直毫无负担。

mergeMap

  • flatMap 是 mergeMap 的别名!
  • 如果同一时间应该只有一个内部 subscription 是有效的,请尝试 switchMap
  • 如果内部 observables 发送和订阅的顺序很重要,请尝试 concatMap

exhaustMap

返回的 Observable 基于应用一个函数来发送项,该函数提供给源 Observable 发出的每个项,并返回一个 (所谓的“内部”) Observable。当它将源值投射成 Observable 时,输出 Observable 开始发出由投射的 Observable 发出的项。然而,如果前一个投射的 Observable 还未完成的话,那么 exhaustMap 会忽略每个新投射的 Observable。一旦完成,它将接受并打平下一个 内部 Observable,然后重复此过程。

js
1只要没有当前活动的计时器,那么每次点击就会运行一个有限的计时器。
2var clicks = Rx.Observable.fromEvent(document, 'click');
3var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000).take(5));
4result.subscribe(x => console.log(x));

总结一下

rxjs 把 Promise 转成 Observable

在 RxJS 中,你可以使用from操作符将 Promise 转换为 Observable。这样,你就可以在 RxJS 的管道中使用 Observable 的各种操作符和方法。

以下是一个示例代码,展示了如何将 Promise 转换为 Observable:

javascript
1import { from } from "rxjs";
2
3const promise = new Promise((resolve, reject) => {
4  setTimeout(() => {
5    resolve("Promise resolved");
6  }, 2000);
7});
8
9const observable = from(promise);
10
11observable.subscribe({
12  next: (value) => console.log(value),
13  complete: () => console.log("Observable completed"),
14});

在上面的示例中,我们创建了一个 Promise,在 2 秒后解析 (resolve)。然后,使用from操作符将这个 Promise 转换为 Observable。最后,我们订阅这个 Observable,并在next回调中打印解析的值。

当 Promise 解析后,Observable 会发出相应的值,然后调用complete回调来表示 Observable 完成。

这样,你就可以使用 RxJS 丰富的操作符和方法来处理异步操作了。

endWith 参数可以是 Observable ?

在 RxJS 中,endWith操作符用于在 Observable 的末尾添加一个值或多个值。它并不接受 Observable 作为参数,而是接受一到多个参数值。

如果你想要在 Observable 的末尾添加一个 Observable,你可以使用concat操作符来实现。concat操作符将两个或多个 Observables 连接起来,确保前一个 Observable 完成后才会订阅并发出下一个 Observable 的值。

以下是一个示例代码,演示了如何使用concat操作符将 Observable 添加到另一个 Observable 的末尾:

javascript
1import { of, concat } from "rxjs";
2
3const source1$ = of("Value 1", "Value 2");
4const source2$ = of("Value 3", "Value 4");
5
6const result$ = concat(source1$, source2$);
7
8result$.subscribe((value) => console.log(value));

在上面的示例中,我们创建了两个 Observable:source1$source2$。然后,使用concat操作符将它们连接起来,创建一个新的 Observable result$

最后,我们订阅result$并打印所有的值。运行上面的代码将会输出以下内容:

mathematica
1Value 1
2Value 2
3Value 3
4Value 4

这样,你可以将一个 Observable 添加到另一个 Observable 的末尾,并确保它们的顺序被保持。

关于 contact 操作符的一个场景

ts
1const download = spawn(downloadCommand, {
2  shell: true,
3});
4const data$ = fromEvent(download.stdout, "data", (event: Buffer) => {
5  return {
6    data: event.toString("utf8"),
7  };
8});
9const error$ = fromEvent(download.stderr, "data", (event: Buffer) => {
10  return {
11    data: event.toString("utf8"),
12  };
13});
14const close$ = fromEvent(download, "close").pipe(
15  tap(() => {
16    this.logger.debug("download close fromEvent");
17  }),
18);
19
20const getFilename = spawn(getFilenameCommand, {
21  shell: true,
22});
23const _data$ = fromEvent(getFilename.stdout, "data", (event: Buffer) => {
24  this.logger.debug("getFilename stdout", event.toString("utf8"));
25  const filename = basename(event.toString("utf8"));
26  const downloadUrl = `https://api.powerfulyang.com/yt-dlp/${filename}`;
27  return {
28    type: "done",
29    data: downloadUrl,
30  };
31});
32const _error$ = fromEvent(getFilename.stderr, "data", (event: Buffer) => {
33  this.logger.debug("getFilename stderr", event.toString("utf8"));
34  return {
35    type: "error",
36    data: event.toString("utf8"),
37  };
38});
39const _close$ = fromEvent(getFilename, "close").pipe(
40  tap(() => {
41    this.logger.debug("getFilename close fromEvent");
42  }),
43);
44
45concat(
46  merge(data$, error$).pipe(takeUntil(close$)),
47  merge(_data$, _error$).pipe(takeUntil(_close$))
48).subscribe((event) => {
49  this.logger.debug("event", event.data);
50});

关于 concat 的几个关键点:

  1. 顺序执行concat 会等待当前的 Observable 完成后,才会开始订阅下一个 Observable。
  2. 完成是关键:如果前面的 Observable 没有完成(比如,它永远不发出 complete 通知),concat 将不会继续到下一个 Observable。
  3. 错误处理:如果任何一个 Observable 发生错误且未被适当处理(即没有 catchError),concat 会立即停止,并将错误传递给最终的观察者。

由于这个特点,当 downloadCommand 执行完成之后才会订阅 getFilenameCommand 但是存在 getFilenameCommand 先执行完的情况,这时再去订阅 getFilenameCommand fromEvent 不能监听到任何消息,但是使用 getFilenameCommand.on('<event>') 可以监听到消息,因为 fromEvent 需要订阅之后才会收到消息,但是该情况下执行完了之后才订阅所以没有消息。