JavaScript

RxJS

์œ ์šฉํ•œ RxJs ์—ฐ์‚ฐ์ž ๋ชฉ๋ก

  • #reactive
  • #rxjs
  • #cheatsheet

RxJS๋Š” JavaScript์˜ ReactiveX ๊ตฌํ˜„์ž…๋‹ˆ๋‹ค. ReactiveX๋Š” ๊ด€์ฐฐ ๊ฐ€๋Šฅํ•œ ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•œ ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์œ„ํ•œ API์ž…๋‹ˆ๋‹ค. Java์šฉ RxJava, Java์šฉ Rx.NET ๋“ฑ ๋‹ค๋ฅธ ์–ธ์–ด๋กœ ReactiveX๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๋ฐฉ๋ฒ•์ด ๋” ๋งŽ์ด ์žˆ์Šต๋‹ˆ๋‹ค. C#, Swift์šฉ RxSwift ๋“ฑ.

๋ถ€์ธ ์„ฑ๋ช…

ํ•œ๊ตญ์–ด ์‹ค๋ ฅ์ด ๋ถ€์ ํ•˜์—ฌ ์ด ๊ธ€์ด ๊ตฌ๊ธ€ ๋ฒˆ์—ญ๊ธฐ๋ฅผ ์ฃผ๋กœ ํ™œ์šฉํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ๋ถ€์ •ํ™•ํ•œ ๋ฌธ๋ฒ•๊ณผ ์–ดํœ˜๊ฐ€ ์žˆ์„์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ์  ์–‘ํ•ด ๋ถ€ํƒ๋“œ๋ฆฌ๋ฉฐ, ์ถ”ํ›„์— ๋‹ค์‹œ ๊ฒ€ํ† ํ•˜์—ฌ ์ˆ˜์ •ํ•˜๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์—๋Š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ด๋ฒคํŠธ, DOM ์ด๋ฒคํŠธ ๋ฐ ํŒŒ์ผ ์—…๋กœ๋“œ๊ฐ€ ํฌํ•จ๋ฉ๋‹ˆ๋‹ค.

์„ค์น˜

RxJS๋Š” ์‹œ์ค‘์— ๋‚˜์™€ ์žˆ๋Š” ๋…ธ๋“œ ํŒจํ‚ค์ง€ ๊ด€๋ฆฌ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์„ค์น˜ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ ์—ฌ๊ธฐ์„œ๋Š” Yarn์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ๋ชจ๋ฅผ ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ๋˜ํ•œ ์—„๊ฒฉํ•œ ์œ ํ˜• ์ง€์ •์„ ํ†ตํ•ด ์ฝ”๋“œ๋ฅผ ๋”์šฑ ๊ฐ•๋ ฅํ•˜๊ณ  ์ฝ๊ธฐ ์‰ฝ๊ฒŒ ๋งŒ๋“ค ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ TypeScript๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค.

yarn add rxjs
yarn add -D typescript ts-loader

Webpack ๋˜๋Š” ๊ธฐํƒ€ JavaScript ๋ฒˆ๋“ค๋Ÿฌ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ๋ฒˆ๋“ค๋Ÿฌ๊ฐ€ ์ ์ ˆํ•˜๊ฒŒ ์‹คํ–‰๋˜๋„๋ก ๊ตฌ์„ฑํ•˜๊ณ  package.json ํŒŒ์ผ์— ์‹œ์ž‘ ์Šคํฌ๋ฆฝํŠธ๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•˜์„ธ์š”.

package.json
{
    "scripts": {
        "start": "webpack-dev-server --mode development"
    }
}

Angular

Angular๋Š” Google์—์„œ ๊ฐœ๋ฐœํ•œ JavaScript ํ”„๋ ˆ์ž„์›Œํฌ์ž…๋‹ˆ๋‹ค. RxJS๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ Angular์— ๊ตฌ์›Œ์ ธ ์žˆ์œผ๋ฏ€๋กœ ์ฒ˜์Œ๋ถ€ํ„ฐ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ๋ณ„๋„๋กœ ์„ค์น˜ํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค. ์ž‘์—…ํ•  ์ƒˆ ํ”„๋กœ์ ํŠธ๋ฅผ ๋งŒ๋“œ๋Š” ๋ฐ ํ•„์š”ํ•œ ๊ฒƒ์€ Angular CLI๋ฟ์ž…๋‹ˆ๋‹ค.

ng new <your-project-name>

์ข…์†์„ฑ์ด ์„ค์น˜๋œ ํ›„ ํ”„๋กœ์ ํŠธ๋ฅผ ์‹œ์ž‘ํ•˜์‹ญ์‹œ์˜ค.

์˜ต์ €๋ฒ„๋ธ”

๊ตฌ๋…ํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐ์ดํ„ฐ ์กฐ๊ฐ์„ ๋‘˜๋Ÿฌ์‹ผ ๋ž˜ํผ์ž…๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋ฉด ๋ฐ์ดํ„ฐ ์ž์ฒด์— ๋ณ€๊ฒฝ ์‚ฌํ•ญ์ด ์žˆ์„ ๋•Œ ํ•ด๋‹น ๋ฐ์ดํ„ฐ์˜ ๊ตฌ๋…์ž์—๊ฒŒ ์•Œ๋ฆผ์ด ์ „์†ก๋ฉ๋‹ˆ๋‹ค.

Observable์€ ๋ง ๊ทธ๋Œ€๋กœ "๊ด€์ฐฐํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒƒ"์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ๊ณ ๋„ ์ƒ๊ฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Observable์„ ์ƒ์„ฑํ•˜๋Š” ๋‹ค์Œ ์ฝ”๋“œ๋Š” ๋ฐ๋ชจ ๋ชฉ์ ์œผ๋กœ๋งŒ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. Observable์€ RxJS ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์ž์ฒด์—์„œ ์ œ๊ณตํ•˜๋Š” ์ผ๋ถ€ ์—ฐ์‚ฐ์ž๋ฅผ ํ†ตํ•ด์„œ๋งŒ ์œ ์šฉํ•œ ๋ฐฉ์‹์œผ๋กœ ์ƒ์„ฑ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด ์ฝ”๋“œ๋Š” ๊ตฌ๋… ์‹œ hello ํ…์ŠคํŠธ๋ฅผ ๋ณด๋‚ด๋Š” Observable์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

import { Observable } from 'rxjs'

var observable = Observable.create((observer) => {
  observer.next('hello')
  observer.next('hello')
})

๊ด€์ฐฐ์ž๋ฅผ ๊ตฌ๋…ํ•˜๋ ค๋ฉด subscribe ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ํ•˜๋‚˜์˜ ํ•„์ˆ˜ ์ฝœ๋ฐฑ๊ณผ ๋‘ ๊ฐœ์˜ ์„ ํƒ์  ์ฝœ๋ฐฑ์„ ์ธ์ˆ˜๋กœ ๋ฐ›์Šต๋‹ˆ๋‹ค.

var observer = observable.subscribe(
  (x) => console.log('onSuccess: ', x),
  (err) => console.error('onError', err),
  () => console.log('onComplete')
)

๊ตฌ๋…์€ Observable์„ ํ™œ์„ฑํ™”ํ•˜๊ณ  onSuccess: hello ๋‘ ์ค„์ด ๋ธŒ๋ผ์šฐ์ € ๊ฐœ๋ฐœ ๋„๊ตฌ์— ๋‚˜ํƒ€๋‚˜์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๊ด€์ฐฐ์ž๊ฐ€ ์™„๋ฃŒ๋กœ ํ‘œ์‹œ๋˜๋ฉด ๋น„ํ™œ์„ฑํ™”๋˜๊ณ  ๋” ์ด์ƒ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ผ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.

var observable = Observable.create((observer) => {
  observer.next('hey')
  observer.next('hey')
  observer.complete()
  observer.next('hey') // ์ „์†ก๋˜์ง€ ์•Š์Œ
})

์˜ต์ €๋ฒ„๋ธ” ๋งŒ๋“ค๊ธฐ

์œ„์—์„œ ์–ธ๊ธ‰ํ–ˆ๋“ฏ์ด Observable์€ RxJS๊ฐ€ ๊ณต์‹์ ์œผ๋กœ ์Šน์ธํ•œ ๋ฐฉ์‹์œผ๋กœ ์ƒ์„ฑ๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. Observable์„ ์ƒ์„ฑํ•˜๋Š” ๋ช‡ ๊ฐ€์ง€ ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

import { Observable, of, from, interval, fromEvent } from 'rxjs'

Observable ๋‚ด๋ถ€์— ์›์‹œ ๊ฐ’์„ ๋ž˜ํ•‘ํ•˜๋ ค๋ฉด of๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” ํ•œ ๋ฒˆ ๋ž˜ํ•‘๋œ ๊ฐ’๋งŒ ๋‚ด๋ณด๋‚ด๋ฏ€๋กœ ์†Œํ”„ํŠธ์›จ์–ด ํ…Œ์ŠคํŠธ์— ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ of๊ฐ€ ํ”„๋กœ๋•์…˜ ์ฝ”๋“œ์—์„œ๋„ ์œ ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

const hello$ = of('hello')

hello$.subscribe((x) => console.log(x)) // hello

๋‹ค์Œ์œผ๋กœ from ์—ฐ์‚ฐ์ž๋Š” ๋ฐ˜๋ณต ๊ฐ€๋Šฅํ•œ ํ•ญ๋ชฉ์„ ๊ฐ€์ ธ์™€ ํ•˜๋‚˜์”ฉ ๋‚ด๋ณด๋ƒ…๋‹ˆ๋‹ค.

const hello$ = from('hello')

hello$.subscribe((x) => console.log(x)) // h, e, l, l, o

๋‹ค์Œ์œผ๋กœ fromEvent ์—ฐ์‚ฐ์ž๋Š” DOM์˜ ์ด๋ฒคํŠธ๋ฅผ ๊ด€์ฐฐ ๊ฐ€๋Šฅ ํ•ญ๋ชฉ์œผ๋กœ ๊ตฌ์„ฑํ•˜๋Š” ๋ฐ ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค. 'fromEvent'๋Š” DOM ์š”์†Œ๋ฅผ ์ฒซ ๋ฒˆ์งธ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ, ์ˆ˜์‹ ํ•  ์ด๋ฒคํŠธ๋ฅผ ๋‘ ๋ฒˆ์งธ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

const event$ = fromEvent(document, 'click')
event$.subscribe((x) => console.log(x))

๋˜ ๋‹ค๋ฅธ ๊ด€์ฐฐ์ž ์ƒ์„ฑ ๋ฐฉ๋ฒ•์€ 'interval'๋กœ, ๋ฐ€๋ฆฌ์ดˆ ๋‹จ์œ„์˜ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์„ ์ทจํ•˜๊ณ  0๋ถ€ํ„ฐ ์‹œ์ž‘ํ•˜์—ฌ 1์”ฉ ์ •์ˆ˜๋ฅผ ์ง€์†์ ์œผ๋กœ ์ฆ๊ฐ€์‹œํ‚ต๋‹ˆ๋‹ค.

const periodic$ = interval(1000)

// 5์ดˆ๊ฐ€ ์ง€๋‚ฌ๋‹ค
periodic$.subscribe((x) => console.log(x)) // 0, 1, 2, 3, 4

๋™๊ธฐ์‹ ๋ฐ ๋น„๋™๊ธฐ์‹

RxJS๋Š” ๋™๊ธฐ์‹๊ณผ ๋น„๋™๊ธฐ์‹์ด ๋ชจ๋‘ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

const hello$ = of('hello')
hello$.subscribe((x) => console.log(x))
console.log('world')

์œ„์˜ ์ฝ”๋“œ๋Š” ๋ฉ”์ธ ์Šค๋ ˆ๋“œ ๋‚ด์—์„œ ์ฝ”๋“œ๊ฐ€ ์œ„์—์„œ ์•„๋ž˜๋กœ ๋ชจ๋‘ ๋™๊ธฐ์ ์œผ๋กœ ์‹คํ–‰๋˜๊ธฐ ๋•Œ๋ฌธ์— hello๋ฅผ ๋จผ์ € ์ƒ์„ฑํ•œ ๋‹ค์Œ world์˜ ๊ฒฐ๊ณผ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

๋น„๋™๊ธฐ์‹์œผ๋กœ ๋งŒ๋“ค๋ ค๋ฉด asyncScheduler๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import { asyncScheduler } from 'rxjs'

const hello$ = of('hello', asyncScheduler)
hello$.subscribe((x) => console.log(x))
console.log('world')

๊ตฌ๋…์€ ๋น„๋™๊ธฐ ์ด๋ฒคํŠธ ๋ฃจํ”„์˜ ๋‘ ๋ฒˆ์งธ ๋ฐ˜๋ณต์—์„œ๋งŒ ๋ฐœ์ƒํ•˜๋Š” ๋ฐ˜๋ฉด world๋ฅผ ์ธ์‡„ํ•˜๋Š” ์ค„์€ ์ฒซ ๋ฒˆ์งธ ์ด๋ฒคํŠธ ๋ฃจํ”„์—์„œ ์ด๋ฏธ ์™„๋ฃŒ๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ์ถœ๋ ฅ์€ world ๋’ค์— hello์ž…๋‹ˆ๋‹ค.

๋œจ๊ฒ๊ณ  ์ฐจ๊ฐ€์šด ์˜ต์ €๋ฒ„๋ธ”

๋ฐ์ดํ„ฐ๊ฐ€ Observable ์ž์ฒด์— ์˜ํ•ด ์ƒ์„ฑ๋˜๋ฉด ์ด๋ฅผ ์ฝœ๋“œ Observable์ด๋ผ๊ณ  ๋ถ€๋ฆ…๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ Observable ์™ธ๋ถ€์—์„œ ์ƒ์„ฑ๋˜๋ฉด ์ด๋ฅผ ํ•ซ Observable์ด๋ผ๊ณ  ๋ถ€๋ฆ…๋‹ˆ๋‹ค. Hot Observable์€ ์—ฌ๋Ÿฌ ๊ตฌ๋…์„ ๊ฐ€์งˆ ์ˆ˜ ์žˆ๋Š” ๋ฐ˜๋ฉด Cold Observable์€ ๊ตฌ๋…์„ ํ•˜๋‚˜๋งŒ ๊ฐ€์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ฝœ๋“œ Observable์— ๋Œ€ํ•œ ๊ตฌ๋…์ด ๋‘ ๊ฐœ ์ด์ƒ์ธ ๊ฒฝ์šฐ ์–ป์€ ๋ฐ์ดํ„ฐ๊ฐ€ ๋‹ค๋ฅผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Cold Observable์€ ๊ฒŒ์œผ๋ฅด๋‹ค. ๊ตฌ๋…ํ•˜๊ธฐ ์ „๊นŒ์ง€๋Š” ๊ฐ’์„ ์ƒ์„ฑํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๋‹ค์Œ์€ Cold Observable์˜ ์˜ˆ์ž…๋‹ˆ๋‹ค.

const cold$ = Observable.create((observer) => observer.next(Math.random()))

cold$.subscribe(console.log) // 0.5
cold$.subscribe(console.log) // 0.89

๊ทธ๋Ÿฌ๋‚˜ ์ด๋Š” ์‹ค์ œ ์‹œ๋‚˜๋ฆฌ์˜ค์—์„œ๋Š” ์œ ์šฉํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์œผ๋ฉฐ ๋ฐ์ดํ„ฐ๊ฐ€ ์ผ๊ด€๋˜๊ธฐ๋ฅผ ์›ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ๋‹ฌ์„ฑํ•˜๋ ค๋ฉด ์ฐจ๊ฐ€์šด Observable์„ Hot Observable๋กœ ๋ณ€ํ™˜ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ฒซ ๋ฒˆ์งธ ๋ฐฉ๋ฒ•์€ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ์„ ๊ด€์ฐฐ ๊ฐ€๋Šฅ ์™ธ๋ถ€๋กœ ์ด๋™ํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

const random = Math.random()

const hot$ = Observable.create((observer) => observer.next(random))

hot$.subscribe(console.log) // 0.5
hot$.subscribe(console.log) // ๊ฐ’์ด ์—†์Œ

๋‘ ๋ฒˆ์งธ ๊ตฌ๋…์ž๋Š” ์ฒซ ๋ฒˆ์งธ ๊ด€์ฐฐ์ž๊ฐ€ ๊ตฌ๋…ํ•  ๋•Œ ๋ฐ์ดํ„ฐ๊ฐ€ ์ด๋ฏธ ๋ฐฉ์ถœ๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ์•„๋ฌด ๊ฐ’๋„ ๋ฐ›์ง€ ๋ชปํ•ฉ๋‹ˆ๋‹ค.

Cold Observable์„ Hot Observable๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ๋‹ค๋ฅธ ๋ฐฉ๋ฒ•์€ share ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

const cold$ = Observable.create((observer) => observer.next(Math.random()))

const hot$ = cold$.pipe(share())

hot$.subscribe(console.log) // 0.5
hot$.subscribe(console.log) // ๊ฐ’์ด ์—†์Œ

๋‘ ๋ฒˆ์งธ ๊ตฌ๋…์ž๊ฐ€ ๋งˆ์ง€๋ง‰์œผ๋กœ ๋‚ด๋ณด๋‚ธ ๊ฐ’์„ ์ˆ˜์‹ ํ•˜๋„๋ก ํ•˜๋ ค๋ฉด share ์—ฐ์‚ฐ์ž๋ฅผ ๋Œ€์ฒดํ•˜์—ฌ shareReplay๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

const cold$ = Observable.create((observer) => observer.next(Math.random()))

const hot$ = cold$.pipe(shareReplay())

hot$.subscribe(console.log) // 0.5
hot$.subscribe(console.log) // 0.5

Subject

Subject๋Š” ์ƒ์„ฑ ํ›„ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ฐฉ์‹์œผ๋กœ ๊ฐ’์„ ํ‘ธ์‹œํ•  ์ˆ˜ ์žˆ๋Š” ๋‹ค๋ฅธ ์œ ํ˜•์˜ ๊ด€์ฐฐ ๊ฐ€๋Šฅ ํ•ญ๋ชฉ์ž…๋‹ˆ๋‹ค.

import { Subject } from 'rxjs'

var subject = new Subject()
subject.subscribe(console.log)
subject.next('The first thing has been sent')

var observer = subject.subscribe(console.log)
subject.next('The second thing has been sent')
observer.unsubscribe()
subject.next('The third thing has been sent')

Behaviour Subject

Behaviour subject๋Š” ์‹ ๊ทœ ๊ตฌ๋… ์‹œ ๋งˆ์ง€๋ง‰์œผ๋กœ ์บ์‹œ๋œ ๊ฐ’์„ ๋‚ด๋ณด๋ƒ…๋‹ˆ๋‹ค.

var subject = new BehaviorSubject('First')

subject.subscribe((data) => addItem('observer 1 ', data))

Replay Subject

Behaviour Subject๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋Šฆ๊ฒŒ ์˜จ ์‚ฌ๋žŒ์€ ๋งˆ์ง€๋ง‰์œผ๋กœ ๋ฐฉ์ถœ๋œ ์•„์ดํ…œ๋งŒ ๋ฐ›์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ Replay Subject๋ฅผ ์ด์šฉํ•˜๋ฉด ํ›„๋ฐœ์ฃผ์ž๋„ ๊ตฌ๋… ์‹œ ์ƒ๋‹น์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

var subject = new ReplaySubject(3)

subject.next(1)
subject.next(2)
subject.subscribe(console.log) // 1, 2
subject.next(3) // 3
subject.next(4) // 4
subject.subscribe(console.log) // 2, 3, 4

Async Subject

๊ฐ€์žฅ ๊ฐ„๋‹จํ•œ ์ฃผ์ œ์ž…๋‹ˆ๋‹ค. ์™„๋ฃŒ ์‹œ ๋งˆ์ง€๋ง‰ ๊ฐ’๋งŒ ๋‚ด๋ณด๋ƒ…๋‹ˆ๋‹ค.

var subject = new AsyncSubject()

subject.next(1)
subject.subscribe(console.log)
subject.complete() // 1

์—ฐ์‚ฐ์ž

  • ์ •์  ์—ฐ์‚ฐ์ž: ์ด ์—ฐ์‚ฐ์ž๋Š” ์ผ๋ฐ˜์ ์œผ๋กœ ๊ด€์ฐฐ ๊ฐ€๋Šฅ ํ•ญ๋ชฉ์„ ๋งŒ๋“œ๋Š” ๋ฐ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.
  • ์ธ์Šคํ„ด์Šค ์—ฐ์‚ฐ์ž: ๊ด€์ฐฐ ๊ฐ€๋Šฅํ•œ ์ธ์Šคํ„ด์Šค์— ๋Œ€ํ•œ ์ด๋Ÿฌํ•œ ๋ฉ”์„œ๋“œ(๋Œ€๋ถ€๋ถ„์˜ RxJS)

์ˆ˜์ •์ž ์—ฐ์‚ฐ์ž

์ด๋Ÿฌํ•œ ์—ฐ์‚ฐ์ž๋Š” ๊ธฐ์กด ๊ฐ’์„ ๋ณ€ํ™˜ํ•˜๊ณ  ๋ฐ์ดํ„ฐ ํ๋ฆ„์„ ์ˆ˜์ •ํ•ฉ๋‹ˆ๋‹ค.

import { map, filter, take, scan } from 'rxjs/operators'

const source$ = from([1, 2, 3, 4, 5])
const modified$ = source$.pipe(
  map((x) => x + 1), // 2, 3, 4, 5, 6
  scan((acc, val) => acc + val), // 2, 5, 9, 14, 20
  filter((x) => x > 10), // 14, 20
  take(1) // 14
)

Pluck

๊ฐ์ฒด ๋ฐฐ์—ด์—์„œ ํŠน์ • ํ‚ค๋งŒ ์„ ํƒํ•˜๊ธฐ ์œ„ํ•œ map์šฉ ํ•ฉ์„ฑ ์„คํƒ•์ž…๋‹ˆ๋‹ค.

const list$ = of([
  {
    name: 'Shino',
    age: 20,
    address: 'Tokyo',
  },
  {
    name: 'Anthony',
    age: 21,
    address: 'Berkeley',
  },
])

const names$ = list$.pipe(pluck('name'))

names$.subscribe(console.log) // 'Shino', 'Anthony'

Tap

์ด ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ํŒŒ์ดํ”„ ๋‚ด์—์„œ ๋ถ€์ž‘์šฉ์ด ํŠธ๋ฆฌ๊ฑฐ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

source$.pipe(
  tap(console.log),
  map((x) => x.toUpperCase()),
  tap(async (x) => {
    await Promise.resolve()
    alert(x)
  })
)

๋ฐฐ์•• ์ฒ˜๋ฆฌ

๋ฐฐ์••์€ ์‹ค์ œ๋กœ ํ•„์š”ํ•œ ๊ฒƒ๋ณด๋‹ค ์••๋„์ ์œผ๋กœ ๋งŽ์€ ์–‘์˜ ๊ฐ’์„ ๋ฐฉ์ถœํ•˜๋Š” ๊ด€์ฐฐ ๊ฐ€๋Šฅ ํ•ญ๋ชฉ์ž…๋‹ˆ๋‹ค. ๋งˆ์šฐ์Šค ์›€์ง์ž„์— ์˜ํ•ด ํŠธ๋ฆฌ๊ฑฐ๋˜๋Š” DOM ์ด๋ฒคํŠธ์˜ ์œ ์ž…์ด ๊ทธ ์ „ํ˜•์ด ๋  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์ด๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฒซ ๋ฒˆ์งธ ์ „๋žต์€ ์ด๋ฒคํŠธ๋ฅผ ๋””๋ฐ”์šด์‹ฑํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. Debounce๋Š” ์ผ์ • ๊ธฐ๊ฐ„ ๋™์•ˆ ์ž‘์—…์ด ์ค‘์ง€๋  ๋•Œ๊นŒ์ง€ ์ด๋ฒคํŠธ๋ฅผ ์ƒ์„ฑํ•˜์ง€ ์•Š์œผ๋ฉฐ ์ด๋Š” ์‚ฌ์šฉ์ž๊ฐ€ ์ž…๋ ฅ ํ•„๋“œ๋ฅผ ์ฑ„์šธ ๋•Œ ์ž๋™ ์™„์„ฑ์— ์œ ์šฉํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ๋Š” ์ž…๋ ฅ์„ ์™„๋ฃŒํ•œ ํ›„์—๋งŒ ํŠธ๋ฆฌ๊ฑฐ๋ฉ๋‹ˆ๋‹ค.

import { fromEvent } from 'rxjs'
import { debounceTime } from 'rxjs/operators'

const event$ = fromEvent(document, 'mousemove')

const debounced$ = event$.pipe(debounceTime(1000))
debounced$.subscribe(console.log)

์ง€์ •๋œ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์— ๋”ฐ๋ผ ์ด๋ฒคํŠธ ์ˆ˜๊ฐ€ ํฌ๊ฒŒ ์ค„์–ด๋“ค๊ธฐ ๋•Œ๋ฌธ์— ์ด๋ฒคํŠธ๋ฅผ ์กฐ์ ˆํ•˜๋Š” ๊ฒƒ๋„ ์œ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์กฐ์ ˆ์€ ์†๋„ ์ œํ•œ์œผ๋กœ ์ƒ๊ฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import { throttleTime } from 'rxjs/operators'

const event$ = fromEvent(document, 'mousemove')

const throttled$ = event$.pipe(throttleTime(1000))
throttled$.subscribe(console.log)

๋ฐ˜๋ฉด์— ๋ฒ„ํผ ์ˆ˜๋Š” ๋ชจ๋“  ์ด๋ฒคํŠธ๋ฅผ ๋ฐฐ์—ด๋กœ ์œ ์ง€ํ•˜๊ณ  ๋ฒ„ํผ ์šฉ๋Ÿ‰์— ๋„๋‹ฌํ•˜๋ฉด ๋ชจ๋“  ์ด๋ฒคํŠธ๋ฅผ ํ•œ ๋ฒˆ์— ๋‚ด๋ณด๋ƒ…๋‹ˆ๋‹ค.

import { bufferCount } from 'rxjs/operators'

const event$ = fromEvent(document, 'mousemove')

const buffered$ = event$.pipe(bufferCount(10))
buffered$.subscribe(console.log)

Switch Map

Switch Map์„ ์‚ฌ์šฉํ•˜๋ฉด ๋‘ ๊ฐœ์˜ ๊ด€๊ณ„ํ˜• ๊ด€์ฐฐ ๊ฐ€๋Šฅ ํ•ญ๋ชฉ์ด ๋ฐ์ดํ„ฐ ๊ฐ€์ ธ์˜ค๊ธฐ๋ฅผ ์œ„ํ•ด ์ƒํ˜ธ ์šด์šฉ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

const user$ = of({ uid: Math.random() })
const fetchOrders$ = (userId: number) => of(`${userId}'s order data'`)

๋จผ์ € ์ฃผ๋ฌธ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ ์ „์— ์‚ฌ์šฉ์ž ID๊ฐ€ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ์ง๊ด€์ ์ธ ๋ฐฉ๋ฒ•์€ ๊ตฌ๋…์„ ์ค‘์ฒฉํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

user$.subscribe({ uid } => {
    fetchOrders$(uid).subscribe(console.log)
})

๊ด€๊ณ„ํ˜• ํ˜ธ์ถœ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๋” ์ข‹์€ ๋ฐฉ๋ฒ•์€ ์Šค์œ„์น˜ ๋งต์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

const orders$ = user$.pipe(switchMap((user) => fetchOrders$(user.uid)))

orders$.subscribe(console.log)

์กฐํ•ฉ ์—ฐ์‚ฐ์ž

Observable์„ ๊ฒฐํ•ฉํ•˜๋Š” ๋ฐฉ๋ฒ•์—๋Š” ์—ฌ๋Ÿฌ ๊ฐ€์ง€๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์ตœ์‹  ๋ฒ„์ „ ๊ฒฐํ•ฉ์€ Observable ๋ฐฐ์—ด์„ ๊ฐ€์ ธ์˜ค๊ณ  ๊ฐ ๋…๋ฆฝ Observable์˜ ๋ชจ๋“  ๊ฐ’์ด ํ•ด๋‹น ๊ฐ’์„ ํ™•์ธํ•˜๊ธฐ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ๋ชจ๋“  ๊ฐ’๋งŒ ๋ฐฐ์—ด๋กœ ํ•จ๊ป˜ ๋‚ด๋ณด๋ƒ…๋‹ˆ๋‹ค.

import { combineLatest } from 'rxjs'
import { delay } from 'rxjs/operators'

const randSync$ = Observable.create((o) => o.next(Math.random()))
const randAsync$ = randSync$.pipe(delay(1000))

const combined$ = combineLatest([randSync$, randAsync$])

combined$.subscribe(console.log) // [0.5, 0.8]

๋ณ‘ํ•ฉ ๋ฐ˜๋ฉด์— ๋‘ ๊ฐœ์˜ Observable์„ ํ•˜๋‚˜๋กœ ์œตํ•ฉํ•˜์—ฌ ์ผ๋ฐ˜์ ์ธ Observable์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

import { merge } from 'rxjs'
import { delay } from 'rxjs/operators'

const randSync$ = Observable.create((o) => o.next(Math.random()))
const randAsync$ = randSync$.pipe(delay(1000))

const merged$ = merge([randSync$, randAsync$])

merged$.subscribe(console.log) // 0.5, 0.8

Skip Until์„ ์‚ฌ์šฉํ•˜๋ฉด ๋‘ ๋ฒˆ์งธ Observable์ด ๊ฐ’์„ ๋ฐฉ์ถœํ•  ๋•Œ๊นŒ์ง€ ์†Œ์Šค Observable์„ ๋ฌด์‹œํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

var skipped$ = observable1$.skipUntil(observable2$)

์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ

ํŒŒ์ดํ”„ ๋‚ด๋ถ€์—์„œ ๊ด€์ฐฐ ๊ฐ€๋Šฅํ•œ ํ•ญ๋ชฉ์— ๋Œ€ํ•ด ์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ๋ฅผ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์žฌ์‹œ๋„ ๋ฉ”์ปค๋‹ˆ์ฆ˜์€ retry ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ตฌํ˜„ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

import { catchError, retry } from 'rxjs/operators'

someObservable$.pipe(
  catchError((err) => of('default value')),
  retry(2)
)

๋ฉ”๋ชจ๋ฆฌ ๋ˆ„์ˆ˜

์žฅ๊ธฐ ์‹คํ–‰ Observable์„ ๊ตฌ๋… ์ทจ์†Œํ•˜๋Š” ๊ฒƒ์„ ์žŠ์ง€ ๋งˆ์„ธ์š”.

const source$ = interval(100)

const subscription = source.subscribe((x) => {
  console.log(x)
  if (x > 10) {
    subscription.unsubscribe()
  }
})

์ด๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋” ์ข‹์€ ๋ฐฉ๋ฒ•์€ ์กฐ๊ฑด์ด ๋” ์ด์ƒ ์ถฉ์กฑ๋˜์ง€ ์•Š์„ ๋•Œ ๊ฐ’ ๋ฐฉ์ถœ์„ ์ค‘์ง€ํ•˜๋Š” takeWhile์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

source$.pipe(takeWhile((x) => x <= 10))

๋Œ€์‹  ๋‹ค๋ฅธ Observable์— ์˜์กดํ•˜์—ฌ ๊ฐ’ ๋ฐฉ์ถœ์„ ์ค‘์ง€ํ•˜๋ ค๋ฉด ๋‹ค๋ฅธ Observable์ด ๊ฐ’์„ ๋ฐฉ์ถœํ•˜๋ฉด ํ˜„์žฌ Observable์— ๋Œ€ํ•œ ๊ตฌ๋…์ด ์ž๋™์œผ๋กœ ์ทจ์†Œ๋˜๋ฏ€๋กœ takeUntil์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

source$.pipe(takeUntil(of('something')))

์ฐธ๊ณ 

ReactiveX. Retrieved 2024, March 24 from https://reactivex.io/
RxJS. Retrieved 2024, March 24 from https://rxjs.dev/
RxJS Primer. Retrieved 2024, March 24 from https://www.learnrxjs.io/learn-rxjs/concepts/rxjs-primer
RxJS Overview. Retrieved 2024, March 24 from https://rxjs-dev.firebaseapp.com/guide/overview
Gruijs, L. Understanding hot vs cold Observables.https://luukgruijs.medium.com/understanding-hot-vs-cold-observables-62d04cf92e03
Delaney, J. RxJS Top Ten - Code This, Not That.https://www.youtube.com/watch?v=ewcoEYS85Co