rxjs intro - 1
목표
deployment detail page에서 상단에 status가 있는데요, 실시간으로 업데이트 되어야 합니다. 현재 subscription API가 없는 상황에서 status field만 5초 마다 새로 polling해서 가져오는 형태로 구현해야합니다.
구현
Observable → Operator → Observer
- Observable: subscribe 가능한 rxjs의 원시타입이라고 합니다. 관찰되는 대상이자 producer입니다. 아래 코드에서
timer(0, pollingInterval)
에 해당하는 부분입니다. 0초 기다렸다가 한번 실행후 pollingInterval에 따라 interval을 돌립니다. - Operator: Observable에서 생성된 값들을 어떻게 consume할지 다듬는 역할입니다. 아래 코드에서는
pipe(...)
에 해당하는 내용 전부입니다. Observable의 Interval이 ping할 때마다 그 핑을 switchMap을 통해서 fetchStatus()라는 lazy query의 data로 대체하고 map을 통해서 staus만을 걸러냅니다. 이 과정을 staus가 stopped가 나올때까지 반복합니다
// deployment-status-observable.tsx
...
const obs$ = timer(0, pollingInterval).pipe(
switchMap(() => fetchStatus()),
map((d) => d.data?.aiDeployment?.status),
takeWhile((d) => d !== AiDeploymentStatusEnum.Stopped, true)
);
...
return <div>{children(obs$)}</div>;
Operator가 딸린 Observable을 render prop패턴으로 넘겨줍니다.
// detail-layout.tsx
...
<DeploymentStatusObservable
pollingInterval={5000}
deploymentId={deploymentId}
>
{(obs$) => (
<Header
...
deploymentStatusObservable$={obs$}
/>
)}
</DeploymentStatusObservable>
Observable를 넘겨받은 Header에서 이제 subscription을 통해서 consume을 하게 됩니다
//header.tsx
...
const [status, setStatus] = useState<DeploymentStatusEnum>();
...
useEffect(() => {
deploymentStatusObservable$?.subscribe((value) => {
setStatus(value as string as DeploymentStatusEnum);
});
function unSub() {
deploymentStatusObservable$?.subscribe().unsubscribe();
}
return () => unSub();
}, []);
data stream이기에 useEffect내에서 cleanup을 해주어야 합니다. 하지만 다른 예시에서는
useEffect(() => {
const sub = interval(10).subscribe()
return () => sub.unsubscribe()
})
와 같이 하지만 Observable을 할당하면 실행이 안되고 있어서 함수를 새로 선언하고 cleanup하고 있습니다
ref
- RxJS - Glossary - 용어 사전
- RxJS - Decision Tree - 어떤 operator를 사용할지 추천해주는 문답
- RxViz - Animated playground - Observable을 간단히 테스트해보고 결과를 marble로 보여주는 playground
- RxJS - Observable - DEV Community - 각 개념 안내서