RxJS๋ JavaScript์ ReactiveX ๊ตฌํ์ ๋๋ค. ReactiveX๋ ๊ด์ฐฐ ๊ฐ๋ฅํ ์คํธ๋ฆผ์ ์ฌ์ฉํ ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ์ํ API์ ๋๋ค. Java์ฉ RxJava, Java์ฉ Rx.NET ๋ฑ ๋ค๋ฅธ ์ธ์ด๋ก ReactiveX๋ฅผ ๊ตฌํํ๋ ๋ฐฉ๋ฒ์ด ๋ ๋ง์ด ์์ต๋๋ค. C#, Swift์ฉ RxSwift ๋ฑ.
๋ถ์ธ ์ฑ๋ช ํ๊ตญ์ด ์ค๋ ฅ์ด ๋ถ์ ํ์ฌ ์ด ๊ธ์ด ๊ตฌ๊ธ ๋ฒ์ญ๊ธฐ๋ฅผ ์ฃผ๋ก ํ์ฉํ๊ธฐ ๋๋ฌธ์ ๋ถ์ ํํ ๋ฌธ๋ฒ๊ณผ ์ดํ๊ฐ ์์์ ์์ต๋๋ค. ์ด ์ ์ํด ๋ถํ๋๋ฆฌ๋ฉฐ, ์ถํ์ ๋ค์ ๊ฒํ ํ์ฌ ์์ ํ๋๋ก ํ๊ฒ ์ต๋๋ค.
๋ฐ์ดํฐ ์คํธ๋ฆผ์๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ด๋ฒคํธ, DOM ์ด๋ฒคํธ ๋ฐ ํ์ผ ์ ๋ก๋๊ฐ ํฌํจ๋ฉ๋๋ค.
์ค์น
RxJS๋ ์์ค์ ๋์ ์๋ ๋ ธ๋ ํจํค์ง ๊ด๋ฆฌ์๋ฅผ ์ฌ์ฉํ์ฌ ์ค์นํ ์ ์์ผ๋ฉฐ ์ฌ๊ธฐ์๋ Yarn์ ์ฌ์ฉํ์ฌ ๋ฐ๋ชจ๋ฅผ ์งํํฉ๋๋ค. ๋ํ ์๊ฒฉํ ์ ํ ์ง์ ์ ํตํด ์ฝ๋๋ฅผ ๋์ฑ ๊ฐ๋ ฅํ๊ณ ์ฝ๊ธฐ ์ฝ๊ฒ ๋ง๋ค ์ ์์ผ๋ฏ๋ก TypeScript๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
yarn add rxjs
yarn add -D typescript ts-loaderWebpack ๋๋ ๊ธฐํ JavaScript ๋ฒ๋ค๋ฌ๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ๋ฒ๋ค๋ฌ๊ฐ ์ ์ ํ๊ฒ ์คํ๋๋๋ก ๊ตฌ์ฑํ๊ณ package.json ํ์ผ์ ์์ ์คํฌ๋ฆฝํธ๊ฐ ์๋์ง ํ์ธํ์ธ์.
{
"scripts": {
"start": "webpack-dev-server --mode development"
}
}Angular
Angular๋ Google์์ ๊ฐ๋ฐํ JavaScript ํ๋ ์์ํฌ์ ๋๋ค. RxJS๋ ๊ธฐ๋ณธ์ ์ผ๋ก Angular์ ๊ตฌ์์ ธ ์์ผ๋ฏ๋ก ์ฒ์๋ถํฐ ์ฌ์ฉํ ์ ์์ผ๋ฏ๋ก ๋ณ๋๋ก ์ค์นํ ํ์๊ฐ ์์ต๋๋ค. ์์ ํ ์ ํ๋ก์ ํธ๋ฅผ ๋ง๋๋ ๋ฐ ํ์ํ ๊ฒ์ Angular CLI๋ฟ์ ๋๋ค.
ng new <your-project-name>์ข ์์ฑ์ด ์ค์น๋ ํ ํ๋ก์ ํธ๋ฅผ ์์ํ์ญ์์ค.
์ต์ ๋ฒ๋ธ
๊ตฌ๋ ํ ์ ์๋ ๋ฐ์ดํฐ ์กฐ๊ฐ์ ๋๋ฌ์ผ ๋ํผ์ ๋๋ค. ๊ทธ๋ฌ๋ฉด ๋ฐ์ดํฐ ์์ฒด์ ๋ณ๊ฒฝ ์ฌํญ์ด ์์ ๋ ํด๋น ๋ฐ์ดํฐ์ ๊ตฌ๋ ์์๊ฒ ์๋ฆผ์ด ์ ์ก๋ฉ๋๋ค.
Observable์ ๋ง ๊ทธ๋๋ก "๊ด์ฐฐํ ์ ์๋ ๊ฒ"์ ์๋ฏธํฉ๋๋ค. ๋ฐ์ดํฐ ํ์ดํ๋ผ๊ณ ๋ ์๊ฐํ ์ ์์ต๋๋ค.
Observable์ ์์ฑํ๋ ๋ค์ ์ฝ๋๋ ๋ฐ๋ชจ ๋ชฉ์ ์ผ๋ก๋ง ์ฌ์ฉ๋ฉ๋๋ค. Observable์ RxJS ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์์ฒด์์ ์ ๊ณตํ๋ ์ผ๋ถ ์ฐ์ฐ์๋ฅผ ํตํด์๋ง ์ ์ฉํ ๋ฐฉ์์ผ๋ก ์์ฑ๋ ์ ์์ต๋๋ค.
์ด ์ฝ๋๋ ๊ตฌ๋
์ hello ํ
์คํธ๋ฅผ ๋ณด๋ด๋ Observable์ ์์ฑํฉ๋๋ค.
import { Observable } from 'rxjs'
var observable = Observable.create((observer) => {
observer.next('hello')
observer.next('hello')
})๊ด์ฐฐ์๋ฅผ ๊ตฌ๋
ํ๋ ค๋ฉด subscribe ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๊ณ ํ๋์ ํ์ ์ฝ๋ฐฑ๊ณผ ๋ ๊ฐ์ ์ ํ์ ์ฝ๋ฐฑ์ ์ธ์๋ก ๋ฐ์ต๋๋ค.
var observer = observable.subscribe(
(x) => console.log('onSuccess: ', x),
(err) => console.error('onError', err),
() => console.log('onComplete')
)๊ตฌ๋
์ Observable์ ํ์ฑํํ๊ณ onSuccess: hello ๋ ์ค์ด ๋ธ๋ผ์ฐ์ ๊ฐ๋ฐ ๋๊ตฌ์ ๋ํ๋์ผ ํฉ๋๋ค.
๊ด์ฐฐ์๊ฐ ์๋ฃ๋ก ํ์๋๋ฉด ๋นํ์ฑํ๋๊ณ ๋ ์ด์ ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ผ ์ ์์ต๋๋ค.
var observable = Observable.create((observer) => {
observer.next('hey')
observer.next('hey')
observer.complete()
observer.next('hey') // ์ ์ก๋์ง ์์
})์ต์ ๋ฒ๋ธ ๋ง๋ค๊ธฐ
์์์ ์ธ๊ธํ๋ฏ์ด Observable์ RxJS๊ฐ ๊ณต์์ ์ผ๋ก ์น์ธํ ๋ฐฉ์์ผ๋ก ์์ฑ๋์ด์ผ ํฉ๋๋ค. Observable์ ์์ฑํ๋ ๋ช ๊ฐ์ง ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
import { Observable, of, from, interval, fromEvent } from 'rxjs'Observable ๋ด๋ถ์ ์์ ๊ฐ์ ๋ํํ๋ ค๋ฉด of๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ด๋ ํ ๋ฒ ๋ํ๋ ๊ฐ๋ง ๋ด๋ณด๋ด๋ฏ๋ก ์ํํธ์จ์ด ํ
์คํธ์ ์ ์ฉํฉ๋๋ค. ๊ทธ๋ฌ๋ of๊ฐ ํ๋ก๋์
์ฝ๋์์๋ ์ ์ฉํ ์ ์๋ ๊ฒฝ์ฐ๊ฐ ์์ต๋๋ค.
const hello$ = of('hello')
hello$.subscribe((x) => console.log(x)) // hello๋ค์์ผ๋ก from ์ฐ์ฐ์๋ ๋ฐ๋ณต ๊ฐ๋ฅํ ํญ๋ชฉ์ ๊ฐ์ ธ์ ํ๋์ฉ ๋ด๋ณด๋
๋๋ค.
const hello$ = from('hello')
hello$.subscribe((x) => console.log(x)) // h, e, l, l, o๋ค์์ผ๋ก fromEvent ์ฐ์ฐ์๋ DOM์ ์ด๋ฒคํธ๋ฅผ ๊ด์ฐฐ ๊ฐ๋ฅ ํญ๋ชฉ์ผ๋ก ๊ตฌ์ฑํ๋ ๋ฐ ์ ์ฉํฉ๋๋ค. 'fromEvent'๋ DOM ์์๋ฅผ ์ฒซ ๋ฒ์งธ ๋งค๊ฐ๋ณ์๋ก, ์์ ํ ์ด๋ฒคํธ๋ฅผ ๋ ๋ฒ์งธ ๋งค๊ฐ๋ณ์๋ก ์ฌ์ฉํฉ๋๋ค.
const event$ = fromEvent(document, 'click')
event$.subscribe((x) => console.log(x))๋ ๋ค๋ฅธ ๊ด์ฐฐ์ ์์ฑ ๋ฐฉ๋ฒ์ 'interval'๋ก, ๋ฐ๋ฆฌ์ด ๋จ์์ ์๊ฐ ๊ฐ๊ฒฉ์ ์ทจํ๊ณ 0๋ถํฐ ์์ํ์ฌ 1์ฉ ์ ์๋ฅผ ์ง์์ ์ผ๋ก ์ฆ๊ฐ์ํต๋๋ค.
const periodic$ = interval(1000)
// 5์ด๊ฐ ์ง๋ฌ๋ค
periodic$.subscribe((x) => console.log(x)) // 0, 1, 2, 3, 4๋๊ธฐ์ ๋ฐ ๋น๋๊ธฐ์
RxJS๋ ๋๊ธฐ์๊ณผ ๋น๋๊ธฐ์์ด ๋ชจ๋ ๊ฐ๋ฅํฉ๋๋ค.
const hello$ = of('hello')
hello$.subscribe((x) => console.log(x))
console.log('world')์์ ์ฝ๋๋ ๋ฉ์ธ ์ค๋ ๋ ๋ด์์ ์ฝ๋๊ฐ ์์์ ์๋๋ก ๋ชจ๋ ๋๊ธฐ์ ์ผ๋ก ์คํ๋๊ธฐ ๋๋ฌธ์ hello๋ฅผ ๋จผ์ ์์ฑํ ๋ค์ world์ ๊ฒฐ๊ณผ๋ฅผ ์์ฑํฉ๋๋ค.
๋น๋๊ธฐ์์ผ๋ก ๋ง๋ค๋ ค๋ฉด asyncScheduler๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
import { asyncScheduler } from 'rxjs'
const hello$ = of('hello', asyncScheduler)
hello$.subscribe((x) => console.log(x))
console.log('world')๊ตฌ๋
์ ๋น๋๊ธฐ ์ด๋ฒคํธ ๋ฃจํ์ ๋ ๋ฒ์งธ ๋ฐ๋ณต์์๋ง ๋ฐ์ํ๋ ๋ฐ๋ฉด world๋ฅผ ์ธ์ํ๋ ์ค์ ์ฒซ ๋ฒ์งธ ์ด๋ฒคํธ ๋ฃจํ์์ ์ด๋ฏธ ์๋ฃ๋์๊ธฐ ๋๋ฌธ์ ์ถ๋ ฅ์ world ๋ค์ hello์
๋๋ค.
๋จ๊ฒ๊ณ ์ฐจ๊ฐ์ด ์ต์ ๋ฒ๋ธ
๋ฐ์ดํฐ๊ฐ Observable ์์ฒด์ ์ํด ์์ฑ๋๋ฉด ์ด๋ฅผ ์ฝ๋ Observable์ด๋ผ๊ณ ๋ถ๋ฆ ๋๋ค. ๋ฐ์ดํฐ๊ฐ Observable ์ธ๋ถ์์ ์์ฑ๋๋ฉด ์ด๋ฅผ ํซ Observable์ด๋ผ๊ณ ๋ถ๋ฆ ๋๋ค. Hot Observable์ ์ฌ๋ฌ ๊ตฌ๋ ์ ๊ฐ์ง ์ ์๋ ๋ฐ๋ฉด Cold Observable์ ๊ตฌ๋ ์ ํ๋๋ง ๊ฐ์ง ์ ์์ต๋๋ค. ์ฝ๋ Observable์ ๋ํ ๊ตฌ๋ ์ด ๋ ๊ฐ ์ด์์ธ ๊ฒฝ์ฐ ์ป์ ๋ฐ์ดํฐ๊ฐ ๋ค๋ฅผ ์ ์์ต๋๋ค.
Cold Observable์ ๊ฒ์ผ๋ฅด๋ค. ๊ตฌ๋ ํ๊ธฐ ์ ๊น์ง๋ ๊ฐ์ ์์ฑํ์ง ์์ต๋๋ค. ๋ค์์ Cold Observable์ ์์ ๋๋ค.
const cold$ = Observable.create((observer) => observer.next(Math.random()))
cold$.subscribe(console.log) // 0.5
cold$.subscribe(console.log) // 0.89๊ทธ๋ฌ๋ ์ด๋ ์ค์ ์๋๋ฆฌ์ค์์๋ ์ ์ฉํ์ง ์์ ์ ์์ผ๋ฉฐ ๋ฐ์ดํฐ๊ฐ ์ผ๊ด๋๊ธฐ๋ฅผ ์ํฉ๋๋ค. ์ด๋ฅผ ๋ฌ์ฑํ๋ ค๋ฉด ์ฐจ๊ฐ์ด Observable์ Hot Observable๋ก ๋ณํํด์ผ ํฉ๋๋ค.
์ฒซ ๋ฒ์งธ ๋ฐฉ๋ฒ์ ๋ฐ์ดํฐ ์์ฑ์ ๊ด์ฐฐ ๊ฐ๋ฅ ์ธ๋ถ๋ก ์ด๋ํ๋ ๊ฒ์ ๋๋ค.
const random = Math.random()
const hot$ = Observable.create((observer) => observer.next(random))
hot$.subscribe(console.log) // 0.5
hot$.subscribe(console.log) // ๊ฐ์ด ์์๋ ๋ฒ์งธ ๊ตฌ๋ ์๋ ์ฒซ ๋ฒ์งธ ๊ด์ฐฐ์๊ฐ ๊ตฌ๋ ํ ๋ ๋ฐ์ดํฐ๊ฐ ์ด๋ฏธ ๋ฐฉ์ถ๋์๊ธฐ ๋๋ฌธ์ ์๋ฌด ๊ฐ๋ ๋ฐ์ง ๋ชปํฉ๋๋ค.
Cold Observable์ Hot Observable๋ก ๋ณํํ๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ share ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์
๋๋ค.
const cold$ = Observable.create((observer) => observer.next(Math.random()))
const hot$ = cold$.pipe(share())
hot$.subscribe(console.log) // 0.5
hot$.subscribe(console.log) // ๊ฐ์ด ์์๋ ๋ฒ์งธ ๊ตฌ๋
์๊ฐ ๋ง์ง๋ง์ผ๋ก ๋ด๋ณด๋ธ ๊ฐ์ ์์ ํ๋๋ก ํ๋ ค๋ฉด share ์ฐ์ฐ์๋ฅผ ๋์ฒดํ์ฌ shareReplay๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
const cold$ = Observable.create((observer) => observer.next(Math.random()))
const hot$ = cold$.pipe(shareReplay())
hot$.subscribe(console.log) // 0.5
hot$.subscribe(console.log) // 0.5Subject
Subject๋ ์์ฑ ํ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ๊ฐ์ ํธ์ํ ์ ์๋ ๋ค๋ฅธ ์ ํ์ ๊ด์ฐฐ ๊ฐ๋ฅ ํญ๋ชฉ์ ๋๋ค.
import { Subject } from 'rxjs'
var subject = new Subject()
subject.subscribe(console.log)
subject.next('The first thing has been sent')
var observer = subject.subscribe(console.log)
subject.next('The second thing has been sent')
observer.unsubscribe()
subject.next('The third thing has been sent')Behaviour Subject
Behaviour subject๋ ์ ๊ท ๊ตฌ๋ ์ ๋ง์ง๋ง์ผ๋ก ์บ์๋ ๊ฐ์ ๋ด๋ณด๋ ๋๋ค.
var subject = new BehaviorSubject('First')
subject.subscribe((data) => addItem('observer 1 ', data))Replay Subject
Behaviour Subject๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฆ๊ฒ ์จ ์ฌ๋์ ๋ง์ง๋ง์ผ๋ก ๋ฐฉ์ถ๋ ์์ดํ
๋ง ๋ฐ์ ์ ์์ต๋๋ค. ํ์ง๋ง Replay Subject๋ฅผ ์ด์ฉํ๋ฉด ํ๋ฐ์ฃผ์๋ ๊ตฌ๋
์
var subject = new ReplaySubject(3)
subject.next(1)
subject.next(2)
subject.subscribe(console.log) // 1, 2
subject.next(3) // 3
subject.next(4) // 4
subject.subscribe(console.log) // 2, 3, 4Async Subject
๊ฐ์ฅ ๊ฐ๋จํ ์ฃผ์ ์ ๋๋ค. ์๋ฃ ์ ๋ง์ง๋ง ๊ฐ๋ง ๋ด๋ณด๋ ๋๋ค.
var subject = new AsyncSubject()
subject.next(1)
subject.subscribe(console.log)
subject.complete() // 1์ฐ์ฐ์
- ์ ์ ์ฐ์ฐ์: ์ด ์ฐ์ฐ์๋ ์ผ๋ฐ์ ์ผ๋ก ๊ด์ฐฐ ๊ฐ๋ฅ ํญ๋ชฉ์ ๋ง๋๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค.
- ์ธ์คํด์ค ์ฐ์ฐ์: ๊ด์ฐฐ ๊ฐ๋ฅํ ์ธ์คํด์ค์ ๋ํ ์ด๋ฌํ ๋ฉ์๋(๋๋ถ๋ถ์ RxJS)
์์ ์ ์ฐ์ฐ์
์ด๋ฌํ ์ฐ์ฐ์๋ ๊ธฐ์กด ๊ฐ์ ๋ณํํ๊ณ ๋ฐ์ดํฐ ํ๋ฆ์ ์์ ํฉ๋๋ค.
import { map, filter, take, scan } from 'rxjs/operators'
const source$ = from([1, 2, 3, 4, 5])
const modified$ = source$.pipe(
map((x) => x + 1), // 2, 3, 4, 5, 6
scan((acc, val) => acc + val), // 2, 5, 9, 14, 20
filter((x) => x > 10), // 14, 20
take(1) // 14
)Pluck
๊ฐ์ฒด ๋ฐฐ์ด์์ ํน์ ํค๋ง ์ ํํ๊ธฐ ์ํ map์ฉ ํฉ์ฑ ์คํ์
๋๋ค.
const list$ = of([
{
name: 'Shino',
age: 20,
address: 'Tokyo',
},
{
name: 'Anthony',
age: 21,
address: 'Berkeley',
},
])
const names$ = list$.pipe(pluck('name'))
names$.subscribe(console.log) // 'Shino', 'Anthony'Tap
์ด ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ฉด ํ์ดํ ๋ด์์ ๋ถ์์ฉ์ด ํธ๋ฆฌ๊ฑฐ๋ ์ ์์ต๋๋ค.
source$.pipe(
tap(console.log),
map((x) => x.toUpperCase()),
tap(async (x) => {
await Promise.resolve()
alert(x)
})
)๋ฐฐ์ ์ฒ๋ฆฌ
๋ฐฐ์์ ์ค์ ๋ก ํ์ํ ๊ฒ๋ณด๋ค ์๋์ ์ผ๋ก ๋ง์ ์์ ๊ฐ์ ๋ฐฉ์ถํ๋ ๊ด์ฐฐ ๊ฐ๋ฅ ํญ๋ชฉ์ ๋๋ค. ๋ง์ฐ์ค ์์ง์์ ์ํด ํธ๋ฆฌ๊ฑฐ๋๋ DOM ์ด๋ฒคํธ์ ์ ์ ์ด ๊ทธ ์ ํ์ด ๋ ๊ฒ์ ๋๋ค.
์ด๋ฅผ ์ฒ๋ฆฌํ๋ ์ฒซ ๋ฒ์งธ ์ ๋ต์ ์ด๋ฒคํธ๋ฅผ ๋๋ฐ์ด์ฑํ๋ ๊ฒ์ ๋๋ค. Debounce๋ ์ผ์ ๊ธฐ๊ฐ ๋์ ์์ ์ด ์ค์ง๋ ๋๊น์ง ์ด๋ฒคํธ๋ฅผ ์์ฑํ์ง ์์ผ๋ฉฐ ์ด๋ ์ฌ์ฉ์๊ฐ ์ ๋ ฅ ํ๋๋ฅผ ์ฑ์ธ ๋ ์๋ ์์ฑ์ ์ ์ฉํ ์ ์์ผ๋ฉฐ ์ ํจ์ฑ ๊ฒ์ฌ๋ ์ ๋ ฅ์ ์๋ฃํ ํ์๋ง ํธ๋ฆฌ๊ฑฐ๋ฉ๋๋ค.
import { fromEvent } from 'rxjs'
import { debounceTime } from 'rxjs/operators'
const event$ = fromEvent(document, 'mousemove')
const debounced$ = event$.pipe(debounceTime(1000))
debounced$.subscribe(console.log)์ง์ ๋ ์๊ฐ ๊ฐ๊ฒฉ์ ๋ฐ๋ผ ์ด๋ฒคํธ ์๊ฐ ํฌ๊ฒ ์ค์ด๋ค๊ธฐ ๋๋ฌธ์ ์ด๋ฒคํธ๋ฅผ ์กฐ์ ํ๋ ๊ฒ๋ ์ ์ฉํ ์ ์์ต๋๋ค. ์กฐ์ ์ ์๋ ์ ํ์ผ๋ก ์๊ฐํ ์ ์์ต๋๋ค.
import { throttleTime } from 'rxjs/operators'
const event$ = fromEvent(document, 'mousemove')
const throttled$ = event$.pipe(throttleTime(1000))
throttled$.subscribe(console.log)๋ฐ๋ฉด์ ๋ฒํผ ์๋ ๋ชจ๋ ์ด๋ฒคํธ๋ฅผ ๋ฐฐ์ด๋ก ์ ์งํ๊ณ ๋ฒํผ ์ฉ๋์ ๋๋ฌํ๋ฉด ๋ชจ๋ ์ด๋ฒคํธ๋ฅผ ํ ๋ฒ์ ๋ด๋ณด๋ ๋๋ค.
import { bufferCount } from 'rxjs/operators'
const event$ = fromEvent(document, 'mousemove')
const buffered$ = event$.pipe(bufferCount(10))
buffered$.subscribe(console.log)Switch Map
Switch Map์ ์ฌ์ฉํ๋ฉด ๋ ๊ฐ์ ๊ด๊ณํ ๊ด์ฐฐ ๊ฐ๋ฅ ํญ๋ชฉ์ด ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ๋ฅผ ์ํด ์ํธ ์ด์ฉ๋ ์ ์์ต๋๋ค.
const user$ = of({ uid: Math.random() })
const fetchOrders$ = (userId: number) => of(`${userId}'s order data'`)๋จผ์ ์ฃผ๋ฌธ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ธฐ ์ ์ ์ฌ์ฉ์ ID๊ฐ ํ์ํฉ๋๋ค. ์ด๋ฅผ ์ํํ๋ ์ง๊ด์ ์ธ ๋ฐฉ๋ฒ์ ๊ตฌ๋ ์ ์ค์ฒฉํ๋ ๊ฒ์ ๋๋ค.
user$.subscribe({ uid } => {
fetchOrders$(uid).subscribe(console.log)
})๊ด๊ณํ ํธ์ถ์ ์ํํ๋ ๋ ์ข์ ๋ฐฉ๋ฒ์ ์ค์์น ๋งต์ ์ฌ์ฉํ๋ ๊ฒ์ ๋๋ค.
const orders$ = user$.pipe(switchMap((user) => fetchOrders$(user.uid)))
orders$.subscribe(console.log)์กฐํฉ ์ฐ์ฐ์
Observable์ ๊ฒฐํฉํ๋ ๋ฐฉ๋ฒ์๋ ์ฌ๋ฌ ๊ฐ์ง๊ฐ ์์ต๋๋ค. ์ต์ ๋ฒ์ ๊ฒฐํฉ์ Observable ๋ฐฐ์ด์ ๊ฐ์ ธ์ค๊ณ ๊ฐ ๋ ๋ฆฝ Observable์ ๋ชจ๋ ๊ฐ์ด ํด๋น ๊ฐ์ ํ์ธํ๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๊ณ ๋ชจ๋ ๊ฐ๋ง ๋ฐฐ์ด๋ก ํจ๊ป ๋ด๋ณด๋ ๋๋ค.
import { combineLatest } from 'rxjs'
import { delay } from 'rxjs/operators'
const randSync$ = Observable.create((o) => o.next(Math.random()))
const randAsync$ = randSync$.pipe(delay(1000))
const combined$ = combineLatest([randSync$, randAsync$])
combined$.subscribe(console.log) // [0.5, 0.8]๋ณํฉ ๋ฐ๋ฉด์ ๋ ๊ฐ์ Observable์ ํ๋๋ก ์ตํฉํ์ฌ ์ผ๋ฐ์ ์ธ Observable์ ์์ฑํฉ๋๋ค.
import { merge } from 'rxjs'
import { delay } from 'rxjs/operators'
const randSync$ = Observable.create((o) => o.next(Math.random()))
const randAsync$ = randSync$.pipe(delay(1000))
const merged$ = merge([randSync$, randAsync$])
merged$.subscribe(console.log) // 0.5, 0.8Skip Until์ ์ฌ์ฉํ๋ฉด ๋ ๋ฒ์งธ Observable์ด ๊ฐ์ ๋ฐฉ์ถํ ๋๊น์ง ์์ค Observable์ ๋ฌด์ํ ์ ์์ต๋๋ค.
var skipped$ = observable1$.skipUntil(observable2$)์ค๋ฅ ์ฒ๋ฆฌ
ํ์ดํ ๋ด๋ถ์์ ๊ด์ฐฐ ๊ฐ๋ฅํ ํญ๋ชฉ์ ๋ํด ์ค๋ฅ ์ฒ๋ฆฌ๋ฅผ ์ํํ ์ ์์ต๋๋ค. ์ฌ์๋ ๋ฉ์ปค๋์ฆ์ retry ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ์ฌ ๊ตฌํํ ์๋ ์์ต๋๋ค.
import { catchError, retry } from 'rxjs/operators'
someObservable$.pipe(
catchError((err) => of('default value')),
retry(2)
)๋ฉ๋ชจ๋ฆฌ ๋์
์ฅ๊ธฐ ์คํ Observable์ ๊ตฌ๋ ์ทจ์ํ๋ ๊ฒ์ ์์ง ๋ง์ธ์.
const source$ = interval(100)
const subscription = source.subscribe((x) => {
console.log(x)
if (x > 10) {
subscription.unsubscribe()
}
})์ด๋ฅผ ์ฒ๋ฆฌํ๋ ๋ ์ข์ ๋ฐฉ๋ฒ์ ์กฐ๊ฑด์ด ๋ ์ด์ ์ถฉ์กฑ๋์ง ์์ ๋ ๊ฐ ๋ฐฉ์ถ์ ์ค์งํ๋ takeWhile์ ์ฌ์ฉํ๋ ๊ฒ์
๋๋ค.
source$.pipe(takeWhile((x) => x <= 10))๋์ ๋ค๋ฅธ Observable์ ์์กดํ์ฌ ๊ฐ ๋ฐฉ์ถ์ ์ค์งํ๋ ค๋ฉด ๋ค๋ฅธ Observable์ด ๊ฐ์ ๋ฐฉ์ถํ๋ฉด ํ์ฌ Observable์ ๋ํ ๊ตฌ๋
์ด ์๋์ผ๋ก ์ทจ์๋๋ฏ๋ก takeUntil์ ์ฌ์ฉํ ์ ์์ต๋๋ค.
source$.pipe(takeUntil(of('something')))