rxjs intro - 1

Table of Contents

목표

deployment detail page에서 상단에 status가 있는데요, 실시간으로 업데이트 되어야 합니다. 현재 subscription API가 없는 상황에서 status field만 5초 마다 새로 polling해서 가져오는 형태로 구현해야합니다.

구현

Observable → Operator → Observer

  1. Observable: subscribe 가능한 rxjs의 원시타입이라고 합니다. 관찰되는 대상이자 producer입니다. 아래 코드에서 timer(0, pollingInterval) 에 해당하는 부분입니다. 0초 기다렸다가 한번 실행후 pollingInterval에 따라 interval을 돌립니다.
  2. Operator: Observable에서 생성된 값들을 어떻게 consume할지 다듬는 역할입니다. 아래 코드에서는 pipe(...) 에 해당하는 내용 전부입니다. Observable의 Interval이 ping할 때마다 그 핑을 switchMap을 통해서 fetchStatus()라는 lazy query의 data로 대체하고 map을 통해서 staus만을 걸러냅니다. 이 과정을 staus가 stopped가 나올때까지 반복합니다
// deployment-status-observable.tsx
...

const obs$ = timer(0, pollingInterval).pipe(
    switchMap(() => fetchStatus()),
    map((d) => d.data?.aiDeployment?.status),
    takeWhile((d) => d !== AiDeploymentStatusEnum.Stopped, true)
);

...

return <div>{children(obs$)}</div>;

Operator가 딸린 Observable을 render prop패턴으로 넘겨줍니다.

// detail-layout.tsx
...
<DeploymentStatusObservable
    pollingInterval={5000}
    deploymentId={deploymentId}
  >
    {(obs$) => (
        <Header
            ...
            deploymentStatusObservable$={obs$}
        />
    )}
</DeploymentStatusObservable>

Observable를 넘겨받은 Header에서 이제 subscription을 통해서 consume을 하게 됩니다

//header.tsx

...
const [status, setStatus] = useState<DeploymentStatusEnum>();

...
useEffect(() => {
    deploymentStatusObservable$?.subscribe((value) => {
      setStatus(value as string as DeploymentStatusEnum);
    });

    function unSub() {
      deploymentStatusObservable$?.subscribe().unsubscribe();
    }
    return () => unSub();
}, []);

data stream이기에 useEffect내에서 cleanup을 해주어야 합니다. 하지만 다른 예시에서는

useEffect(() => {
  const sub = interval(10).subscribe()
  return () => sub.unsubscribe()
})

와 같이 하지만 Observable을 할당하면 실행이 안되고 있어서 함수를 새로 선언하고 cleanup하고 있습니다


ref