主题
Rxjs Marble Diagrams
我們用 - 來表達一小段時間,這些 - 串起就代表一個 observable。
----------------
X (大寫 X)則代表有錯誤發生
---------------X
| 則代表 observable 結束
----------------|
1.of
observable 是同步送值的時候, 小括號代表著同步發生。
javascript
var source = Rx.Observable.of(1, 2, 3, 4);
(1234)|
2. interval
javascript
var source = Rx.Observable.interval(1000);
-----0-----1-----2-----3--...
3. map
javascript
var source = Rx.Observable.interval(1000);
var newest = source.map((x) => x + 2);
newest.subscribe(console.log);
// 2
// 3
// 4
// 5..
source: -----0-----1-----2-----3--...
map(x => x + 1)
newest: -----1-----2-----3-----4--...
4. mapTo
javascript
var source = Rx.Observable.interval(1000);
var newest = source.mapTo(2);
newest.subscribe(console.log);
// 2
// 2
// 2
// 2..
source: -----0-----1-----2-----3--...
mapTo(2)
newest: -----2-----2-----2-----2--...
5. filter
javascript
var source = Rx.Observable.interval(1000);
var newest = source.filter((x) => x % 2 === 0);
newest.subscribe(console.log);
// 0
// 2
// 4
// 6..
source: -----0-----1-----2-----3-----4-...
filter(x => x % 2 === 0)
newest: -----0-----------2-----------4-...
6. take
javascript
var source = Rx.Observable.interval(1000);
var example = source.take(3);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 1
// 2
// complete
source : -----0-----1-----2-----3--..
take(3)
example: -----0-----1-----2|
7. first
和 take(1)一致
javascript
var source = Rx.Observable.interval(1000);
var example = source.first();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// complete
source : -----0-----1-----2-----3--..
first()
example: -----0|
8. takeLast
takeLast 必須等到整個 observable 完成(complete),才能知道最後的元素有哪些,並且同步送出
javascript
var source = Rx.Observable.interval(1000).take(6);
var example = source.takeLast(2);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 4
// 5
// complete
source : ----0----1----2----3----4----5|
takeLast(2)
example: ------------------------------(45)|
9. last
相当于 takeLast(1),用來取得最後一個元素。
javascript
var source = Rx.Observable.interval(1000).take(6);
var example = source.last();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 5
// complete
source : ----0----1----2----3----4----5|
last()
example: ------------------------------(5)|
10. takeUntil
當 takeUntil 傳入的 observable 發送值時,原本的 observable 就會直接進入完成(complete)的狀態,並且發送完成訊息。
javascript
var source = Rx.Observable.interval(1000);
var click = Rx.Observable.fromEvent(document.body, "click");
var example = source.takeUntil(click);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 1
// 2
// 3
// complete (點擊body了)
source : -----0-----1-----2------3--
click : ----------------------c----
takeUntil(click)
example: -----0-----1-----2----|
11. concat
concat 可以把多個 observable 實例合併成一個
javascript
var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3);
var source3 = Rx.Observable.of(4, 5, 6);
var example = source.concat(source2, source3);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// complete
source : ----0----1----2|
source2: (3)|
source3: (456)|
concat()
example: ----0----1----2(3456)|
另外 concat 還可以當作靜態方法使用
javascript
var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3);
var source3 = Rx.Observable.of(4, 5, 6);
var example = Rx.Observable.concat(source, source2, source3);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
12. concatAll
Observable 送出的元素又是一個 observable,就像是二維陣列,陣列裡面的元素是陣列,可以用 concatAll 把它攤平成一維陣列. 把二维阵列变成维.
javascript
var click = Rx.Observable.fromEvent(document.body, "click");
var source = click.map((e) => Rx.Observable.of(1, 2, 3));
var example = source.concatAll();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
click : ------c------------c--------
map(e => Rx.Observable.of(1,2,3))
source : ------o------------o--------
\ \
(123)| (123)|
concatAll()
example: ------(123)--------(123)------------
這裡需要注意的是 concatAll 會處理 source 先發出來的 observable,必須等到這個 observable 結束,才會再處理下一個 source 發出來的 observable。
javascript
var obs1 = Rx.Observable.interval(1000).take(5);
var obs2 = Rx.Observable.interval(500).take(2);
var obs3 = Rx.Observable.interval(2000).take(1);
var source = Rx.Observable.of(obs1, obs2, obs3);
var example = source.concatAll();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 1
// 2
// 3
// 4
// 0
// 1
// 0
// complete
source : (o1 o2 o3)|
\ \ \
--0--1--2--3--4| -0-1| ----0|
concatAll()
example: --0--1--2--3--4-0-1----0|
13. skip
原本從 0 開始的就會變成從 3 開始,但是記得原本元素的等待時間仍然存在,也就是說此範例第一個取得的元素需要等 4 秒,用 Marble Diagram 表示如下。
javascript
var source = Rx.Observable.interval(1000);
var example = source.skip(3);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 3
// 4
// 5...
source : ----0----1----2----3----4----5--....
skip(3)
example: -------------------3----4----5--...
14. startWith
startWith 的值是一開始就同步發出的,這個 operator 很常被用來保存程式的起始狀態! 下面的例子,不会过 1 秒才出现 0,而是马上出现 0
javascript
var source = Rx.Observable.interval(1000);
var example = source.startWith(0);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0 马上发出
// 0 过1秒
// 1
// 2
// 3...
source : ----0----1----2----3--...
startWith(0)
example: (0)----0----1----2----3--...
15. merge
merge 把多個 observable 同時處理,這跟 concat 一次處理一個 observable 是完全不一樣的.
javascript
var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = source.merge(source2);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 0
// 1
// 2
// 1
// 3
// 2
// 4
// 5
// complete
source : ----0----1----2|
source2: --0--1--2--3--4--5|
merge()
example: --0-01--21-3--(24)--5|
這很常用在一個以上的按鈕具有部分相同的行為。 例如一個影片播放器有兩個按鈕,一個是暫停(II),另一個是結束播放(口)。這兩個按鈕都具有相同的行為就是影片會被停止,只是結束播放會讓影片回到 00 秒,這時我們就可以把這兩個按鈕的事件 merge 起來處理影片暫停這件事。
javascript
var stopVideo = Rx.Observable.merge(stopButton, endButton);
stopVideo.subscribe(() => {
// 暫停播放影片
});
非同步最難的地方在於,當有多個非同步行為同時觸發且相互依賴,這時候我們要處理的邏輯跟狀態就會變得極其複雜. 它們都是在多個元素送進來時,只輸出一個新元素.
16. combineLatest
它會取得各個 observable 最後送出的值,再輸出成一個值
描述:
- newest 和 source 其中一个送出值的时候
- 和另外一个最新送出的值
- 传入 callback
- 当 newest 和 source 都结束了, 才会 complete
例子流程:
- newest 送出了
0
,source 没有送出,不會執行 callback - source 送出了
0
,newest 送出0
,callback 等于0
- newest 送出了
1
,source 送出0
,callback 等于1
........................ - newest 和 source 都結束了,complete
javascript
var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = source.combineLatest(newest, (x, y) => x + y);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete
source : ----0----1----2|
newest : --0--1--2--3--4--5|
combineLatest(newest, (x, y) => x + y);
example: ----01--23-4--(56)--7|
应用: combineLatest 很常用在運算多個因子的結果; A 和 B 的值求出 C, A 变化, 则 C 变化; B 变化, 则 C 也变化
17. zip
描述:
- newest 和 source 其中一个送出值的时候
- 若当前另 1 个同位置有值
- 传入 callback
- 任意 1 个结束, 就会 complete
例子流程:
- newest 送出了 0,source 没有送出,不會執行 callback
- source 送出了 0,newest 送出 0,callback 等于 0
- ….. 一直不执行 callback
- source 送出了 1,newest 虽然现在最新的是 2,但是之前送出的是 1,callback 等于 2(1+1)
- ….. 一直不执行 callback
- source 送出了 2,newest 之前送出的是 2, callback 等于 4
- source 結束 example 就結束,因為 source 跟 newest 不會再有對應位置的值
javascript
var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = source.zip(newest, (x, y) => x + y);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 2
// 4
// complete
source : ----0----1----2|
newest : --0--1--2--3--4--5|
zip(newest, (x, y) => x + y)
example: ----0----2----4|
应用: zip 适用原本只能同步送出的資料變成了非同步的, 常拿來做 demo 使用,比如:
javascript
var source = Rx.Observable.from("hello");
var source2 = Rx.Observable.interval(100);
var example = source.zip(source2, (x, y) => x);
source : (hello)|
source2: -0-1-2-3-4-...
zip(source2, (x, y) => x)
example: -h-e-l-l-o|
18. withLatestFrom
withLatestFrom 運作方式跟 combineLatest 有點像,只是他有主從的關係,只有在主要的 observable 送出新的值時,才會執行 callback,附隨的 observable 只是在背景下運作
描述:
- main 送出值的时候
- 和另外一个最新送出的值
- 传入 callback
- main 结束时, 就会 complete
例子流程:
- main 送出了 h,some 上一次送出的值為 0,callback 得到 h。
- main 送出了 e,some 上一次送出的值為 0,callback 得到 e。
- main 送出了 l,some 上一次送出的值為 0,callback 得到 l。
- main 送出了 l,some 上一次送出的值為 1,callback 得到 L。
- main 送出了 o,some 上一次送出的值為 1,callback 得到 O。
javascript
var main = Rx.Observable.from("hello").zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0, 1, 0, 0, 0, 1]).zip(Rx.Observable.interval(300), (x, y) => x);
var example = main.withLatestFrom(some, (x, y) => {
return y === 1 ? x.toUpperCase() : x;
});
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
main : ----h----e----l----l----o|
some : --0--1--0--0--0--1|
withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x);
example: ----h----e----l----L----O|
应用: withLatestFrom 很常用在一些 checkbox 型的功能,例如說一個編輯器,我們開啟粗體後,打出來的字就都要變粗體,粗體就像是 some observable,而我們打字就是 main observable。
19. scan
javascript
var source = Rx.Observable.from("hello").zip(Rx.Observable.interval(600), (x, y) => x);
var example = source.scan((origin, next) => origin + next, "");
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// h
// he
// hel
// hell
// hello
// complete
source : ----h----e----l----l----o|
scan((origin, next) => origin + next, '')
example: ----h----(he)----(hel)----(hell)----(hello)|
scan 很常用在狀態的計算處理,最簡單的就是對一個數字的加減,我們可以綁定一個 button 的 click 事件,並用 map 把 click event 轉成 1,之後送處 scan 計算值再做顯示。
javascript
const addButton = document.getElementById("addButton");
const minusButton = document.getElementById("minusButton");
const state = document.getElementById("state");
const addClick = Rx.Observable.fromEvent(addButton, "click").mapTo(1);
const minusClick = Rx.Observable.fromEvent(minusButton, "click").mapTo(-1);
const numberState = Rx.Observable.empty()
.startWith(0)
.merge(addClick, minusClick)
.scan((origin, next) => origin + next, 0);
numberState.subscribe({
next: (value) => {
state.innerHTML = value;
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
20. buffer
5 个相关的 operator
- buffer
- bufferCount
- bufferTime
- bufferToggle
- bufferWhen
buffer 要傳入一個 observable(source2),它會把原本的 observable (source)送出的元素緩存在陣列中,等到傳入的 observable(source2) 送出元素時,就會觸發把緩存的元素送出。
javascript
var source = Rx.Observable.interval(300);
/*
相当于
var source2 = source.bufferTime(1000);
*/
var source2 = Rx.Observable.interval(1000);
var example = source.buffer(source2);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...
source : --0--1--2--3--4--5--6--7..
source2: ---------0---------1--------...
buffer(source2)
example: ---------([0,1,2])---------([3,4,5])
除了用時間來作緩存外,我們更常用數量來做緩存,範例如下:
javascript
var source = Rx.Observable.interval(300);
var example = source.bufferCount(3);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...
可以用 buffer 來做某個事件的過濾,例如像是滑鼠連點才能真的執行, 這裡我們只有在 500 毫秒內連點兩下,才能成功印出 ‘success’, 這個功能在某些特殊的需求中非常的好用,也能用在批次處理來降低 request 傳送的次數! 代码如下:
javascript
const button = document.getElementById("demo");
const click = Rx.Observable.fromEvent(button, "click");
const example = click.bufferTime(500).filter((arr) => arr.length >= 2);
example.subscribe({
next: (value) => {
console.log("success");
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
21. delay
javascript
var source = Rx.Observable.interval(300).take(5);
var example = source.delay(500);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 1
// 2
// 3
// 4
source : --0--1--2--3--4|
delay(500)
example: -------0--1--2--3--4|
delay 除了可以傳入毫秒以外,也可以傳入 Date 型別的資料
javascript
var source = Rx.Observable.interval(300).take(5);
var example = source.delay(new Date(new Date().getTime() + 1000));
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
22. delayWhen
delayWhen 可以影響每個元素,而且需要傳一個 callback 並回傳一個 observable
javascript
var source = Rx.Observable.interval(300).take(5);
var example = source.delayWhen((x) => Rx.Observable.empty().delay(100 * x * x));
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
source : --0--1--2--3--4|
.delayWhen(x => Rx.Observable.empty().delay(100 * x * x));
example: --0---1----2-----3-----4|
23. debounce
debounce 跟 debounceTime 一個是傳入 observable 另一個則是傳入毫秒,比較常用到的是 debounceTime 描述:
- 每次收到元素,等待 1000 毫秒
- 若 1000 毫秒内没有元素送出, 则把最新收到的元素送出; 若 1000 毫秒内收到新的元素, 重新等待 1000 毫秒.
- 注意若是最后一个元素, 则不需要等待 1000 毫秒, 直接送出
javascript
var source = Rx.Observable.interval(300).take(5);
var example = source.debounceTime(1000);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 4
// complete
source : --0--1--2--3--4|
debounceTime(1000)
example: --------------4|
具体例子, 自動傳送使用者打的字到後端:
javascript
const searchInput = document.getElementById("searchInput");
const theRequestValue = document.getElementById("theRequestValue");
Rx.Observable.fromEvent(searchInput, "input")
.debounceTime(300)
.map((e) => e.target.value)
.subscribe((value) => {
theRequestValue.textContent = value;
// 在這裡發 request
});
24. throttle
描述:
- 第 1 次收到元素先送出,
- 1000 毫秒以内处于关闭状态, 1000 毫秒后, 若收到元素, 则送出
- 最后 1 个元素不一定会送出
javascript
var source = Rx.Observable.interval(400).take(8);
var example = source.throttleTime(1000);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
// 0
// 3
// 6
// complete
source : ---0---1---2---3---4---5---6---7|
throttleTime(1000)
example: ---0-----------3-----------6----|
下面要说 3 个处理 Higher Order Observable. 所謂的 Higher Order Observable 就是指一個 Observable 送出的元素還是一個 Observable,就像是二維陣列一樣,一個陣列中的每個元素都是陣列。 一共有 3 个方法:
- concatAll
- mergeAll
- switch
25. concatAll
concatAll 會一個一個處理,一定是等前一個 observable 完成(complete)才會處理下一個 observable
javascript
var click = Rx.Observable.fromEvent(document.body, "click");
var source = click.map((e) => Rx.Observable.interval(1000).take(3));
var example = source.concatAll();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
click : ---------c-c------------------c--..
map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
\ \ \
\ ----0----1----2| ----0----1----2|
----0----1----2|
concatAll()
example: ----------------0----1----2----0----1----2--..
26. switch
当有新的 observable 来的时候, 就会把旧的退訂, 永远只处理新的 observable. 下面的例子, 当第 2 次点击的时候,由于第 2 个 observable 和第 1 个时间小于 1 秒,所以第 1 个就被退订了
javascript
var click = Rx.Observable.fromEvent(document.body, "click");
var source = click.map((e) => Rx.Observable.interval(1000));
var example = source.switch();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
click : ---------c-c------------------c--..
map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
\ \ \----0----1--...
\ ----0----1----2----3----4--...
----0----1----2----3----4--...
switch()
example: -----------------0----1----2--------0----1--...
27. mergeAll
它會把二維的 observable 轉成一維的,並且能夠同時處理所有的 observable。 若传入的参数是 1, 则和 concatAll 是一摸一样的
javascript
var click = Rx.Observable.fromEvent(document.body, "click");
var source = click.map((e) => Rx.Observable.interval(1000));
var example = source.mergeAll();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
click : ---------c-c------------------c--..
map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
\ \ \----0----1--...
\ ----0----1----2----3----4--...
----0----1----2----3----4--...
switch()
example: ----------------00---11---22---33---(04)4--...
另外 mergeAll 可以傳入一個數值,這個數值代表他可以同時處理的 observable 數量
javascript
var click = Rx.Observable.fromEvent(document.body, "click");
var source = click.map((e) => Rx.Observable.interval(1000).take(3));
var example = source.mergeAll(2);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
click : ---------c-c----------o----------..
map(e => Rx.Observable.interval(1000))
source : ---------o-o----------c----------..
\ \ \----0----1----2|
\ ----0----1----2|
----0----1----2|
mergeAll(2)
example: ----------------00---11---22---0----1----2--..
28. concatMap
concatMap= map+concatAll
javascript
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.map((e) => Rx.Observable.interval(1000).take(3)).concatAll();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
相当于
javascript
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.concatMap((e) => Rx.Observable.interval(100).take(3));
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
Marble Diagram:
source : -----------c--c------------------...
concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-1-2-0-1-2---------...
常用于發送 request 如下, 每一個 request 會等前一個 request 完成才做處理。
javascript
function getPostData() {
return fetch("https://jsonplaceholder.typicode.com/posts/1").then((res) => res.json());
}
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.concatMap((e) => Rx.Observable.from(getPostData()));
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
concatMap 還有第二個參數是一個 selector callback,這個 callback 會傳入四個參數,分別是:
- 外部 observable 送出的元素
- 內部 observable 送出的元素
- 外部 observable 送出元素的 index
- 內部 observable 送出元素的 index
javascript
function getPostData() {
return fetch("https://jsonplaceholder.typicode.com/posts/1").then((res) => res.json());
}
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.concatMap(
(e) => Rx.Observable.from(getPostData()),
(e, res, eIndex, resIndex) => res.title
);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
29. switchMap
switchMap = map+switch switchMap 跟 concatMap 一樣有第二個參數 selector callback 可用來回傳我們要的值
javascript
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.map((e) => Rx.Observable.interval(1000).take(3)).switch();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
相当于:
javascript
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.switchMap((e) => Rx.Observable.interval(100).take(3));
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
Marble Diagram:
source : -----------c--c-----------------...
concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0--0-1-2-----------...
switchMap 用在 HTTP request, 雖然我們發送了多個 request 但最後真正印出來的 log 只會有一個
javascript
function getPostData() {
return fetch("https://jsonplaceholder.typicode.com/posts/1").then((res) => res.json());
}
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.switchMap((e) => Rx.Observable.from(getPostData()));
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
30. mergeMap
mergeMap = map + mergeAll
javascript
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.map((e) => Rx.Observable.interval(1000).take(3)).mergeAll();
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
相当于
javascript
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.mergeMap((e) => Rx.Observable.interval(100).take(3));
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
Marble Diagram:
source : -----------c-c------------------...
concatMap(c => Rx.Observable.interval(100).take(3))
example: -------------0-(10)-(21)-2----------...
mergeMap 也能傳入第 2 個參數和 concatMap 是一样的,但 mergeMap 传入第 3 个参数限制并行数量 (限制但是不是阻止的意思) 下面的例子,若连续点击 4 下,第 4 个 HTTP request 需要等第 1 个结束以后才可以发送
javascript
function getPostData() {
return fetch("https://jsonplaceholder.typicode.com/posts/1").then((res) => res.json());
}
var source = Rx.Observable.fromEvent(document.body, "click");
var example = source.mergeMap(
(e) => Rx.Observable.from(getPostData()),
(e, res, eIndex, resIndex) => res.title,
3
);
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});
switchMap, mergeMap, concatMap 相同和不同的地方 共同的特性: 可以把第一個參數所回傳的 promise 物件直接轉成 observable
javascript
function getPersonData() {
return fetch("https://jsonplaceholder.typicode.com/posts/1").then((res) => res.json());
}
var source = Rx.Observable.fromEvent(document.body, "click");
// 不需要写成
// var example = source.concatMap(e => Rx.Observable.from(getPersonData()));
var example = source.concatMap((e) => getPersonData());
//直接回傳 promise 物件
example.subscribe({
next: (value) => {
console.log(value);
},
error: (err) => {
console.log("Error: " + err);
},
complete: () => {
console.log("complete");
}
});