Fetch API 기반 SSE로 실시간 이벤트 스트림 구현하기

2024년 12월 04일

실시간으로 서버에서 클라이언트로 데이터를 전달하는 방법은 여러 가지가 있지만, SSE(Server-Sent Events)는 단방향(서버 → 클라이언트) 통신을 위해 설계된 방식입니다. 클라이언트가 서버와 연결을 유지하면, 서버는 언제든 데이터를 클라이언트로 직접 푸시할 수 있습니다.

예를 들어, 이메일 발송 결과를 사용자에게 실시간으로 알리고 싶을 때 SSE가 유용합니다. 서버가 이메일을 성공적으로 발송하거나 실패했을 때, 그 결과를 클라이언트에 즉시 전달할 수 있기 때문입니다. 사용자 입장에서는 화면에서 바로 성공 여부를 확인할 수 있고, 서버는 필요할 때만 데이터를 보내기 때문에 불필요한 요청이 발생하지 않습니다.

브라우저에서는 EventSource API로 SSE를 사용할 수 있지만, 커스텀 헤더(Authorization) 설정이 안 된다는 제약이 있습니다. 이 글에서는 Fetch API를 사용해 Authorization 헤더를 설정할 수 있는 커스텀 SSE 클래스를 구현해보겠습니다.


이메일 발송에 SSE를 적용한 이유

이메일 대량 발송을 예로 들어보겠습니다. 이메일 발송은 보통 몇 초 이상 걸리는 비동기 작업인데, 일반적인 HTTP 요청-응답 패턴으로는 다음과 같은 문제가 생깁니다.

  1. 언제 발송이 실제로 끝났는지 알 수 없다
    • 요청 직후 "발송 완료"라고 표시해도, 실제로는 서버가 아직 메일을 전송 중일 수도 있습니다.
    • 사용자는 진짜로 성공했는지, 어느 시점에 완료됐는지를 정확히 확인하기 어렵습니다.
  2. polling
    • "이메일 발송이 얼마나 진행됐는지" 알기 위해서 5초, 10초 간격으로 계속 "몇 %나 보냈나요?" 하는 식으로 서버에 물어볼 수는 있지만,
    • 이 방식은 요청이 쌓이면서 서버 부하가 늘어나고, 구현도 번거로워집니다.
  3. WebSocket
    • 양방향 실시간 통신이 가능하지만, 단순히 서버 → 클라이언트 알림만 필요한 경우에는 과합니다.
    • SSE는 HTTP 기반이라 설정이 간단하고, 이메일 발송 상태 같은 단방향 스트리밍에 적합합니다.

SSE는 서버가 이벤트를 직접 푸시하기 때문에, 클라이언트는 별도의 요청 없이 "진행 상황", "성공", "실패" 같은 상태 변화를 실시간으로 받을 수 있습니다.

EventSource API 이해

EventSource API를 사용하면 몇 줄만으로 서버의 실시간 이벤트를 받을 수 있습니다.

const eventSource = new EventSource('https://api.example.com/email-progress');
 
eventSource.onmessage = (event) => {
  console.log('Server says:', event.data);
};
 
eventSource.onerror = (error) => {
  console.error('SSE error:', error);
};

EventSource 한계

하지만 EventSource는 요청 헤더를 직접 설정할 수 없습니다. 서버에서 인증 토큰(JWT, Bearer 등)을 요구한다면, new EventSource(url)만으로는 Authorization 헤더를 전달할 방법이 없습니다.


Fetch API 기반의 Custom SSE 구현

EventSource의 한계를 해결하기 위해 Fetch API 기반의 CustomSSE 클래스를 구현합니다.

요구사항

  1. 커스텀 헤더 설정 — Authorization 등 필요한 헤더를 포함해 요청
  2. 실시간 스트림 데이터 처리 — SSE 스트림을 이벤트 단위로 분리하고 파싱
  3. EventSource와 유사한 인터페이스onopen, onmessage, onerror 핸들러 지원
  4. 연결 상태 관리readyState로 연결 중(0), 연결됨(1), 종료(2) 상태 관리

CustomSSE 구현하기

SSE 스트림은 JSON이 아닌 텍스트 형식(text/event-stream)으로 전달됩니다. 먼저 이 텍스트를 파싱하는 방법부터 살펴보겠습니다.

SSE 데이터의 구조

event: message
data: Hello, world!

event: update
data: 50%
  • event : 이벤트 타입 (message, update 등)
  • data : 실제 데이터 (여러 줄 가능)
  • id : 이벤트 ID (클라이언트가 마지막 수신 ID를 기억하는 데 사용)

SSE 이벤트 파싱 (parseSSEEvent 함수)

function parseSSEEvent(raw: string): SSEEvent | null {
  const lines = raw.split('\n');
  let eventType: string | undefined;
  let data = '';
  let id: string | undefined;
 
  for (const line of lines) {
    if (line.startsWith('event:')) {
      eventType = line.replace('event:', '').trim();
    } else if (line.startsWith('data:')) {
      // 여러 data: 줄이 있을 수 있으므로 줄바꿈으로 연결
      data += line.replace('data:', '').trim() + '\n';
    } else if (line.startsWith('id:')) {
      id = line.replace('id:', '').trim();
    }
  }
 
  if (data !== '') {
    return { event: eventType, data: data.trim(), id };
  }
  return null;
}

원시 SSE 텍스트를 줄 단위로 분리해서 event, data, id를 추출합니다. 여러 줄에 걸친 data: 값은 하나의 문자열로 합치고, 이벤트 단위는 \n\n(빈줄)으로 구분합니다.

CustomSSE 클래스

CustomSSE 클래스는 Fetch API로 SSE 서버에 연결하고, 스트림 데이터를 파싱해서 콜백으로 전달합니다. 생성 시 바로 연결을 시도하고, close()로 연결을 종료할 수 있습니다.

클래스 멤버 변수 및 생성자

export class CustomSSE {
  url: string;
  onopen: ((event: Event) => void) | null = null;
  onmessage: ((event: MessageEvent) => void) | null = null;
  onerror: ((event: Event) => void) | null = null;
  readyState: number = 0; // 0: connecting, 1: open, 2: closed
 
  private _abortController: AbortController;
 
  constructor(url: string) {
    this.url = url;
    this._abortController = new AbortController();
  }
 
  private getToken(): string {
    return useAuthStore.getState().accessToken;
  }
}

connect, close 메서드

async connect() {
  try {
    const response = await fetch(this.url, {
      headers: {
        Authorization: `Bearer ${this.getToken()}`,
        Accept: 'text/event-stream',            // SSE 스트림 요청
      },
      cache: 'no-store',
      credentials: 'include',
      signal: this._abortController.signal,      // 연결 취소를 위한 signal
    });
    if (!response.ok || !response.body) {
      throw new Error(`HTTP error! status: ${response.status}`);
    }
 
    // 연결 성공: readyState를 OPEN(1)으로 변경하고 onopen 콜백 호출
    this.readyState = 1;
    if (this.onopen) {
      this.onopen(new Event('open'));
    }
 
    // 서버로부터 데이터를 읽어오기 위한 리더 생성
    const reader = response.body.getReader();
    const decoder = new TextDecoder('utf-8');
    let buffer = '';
 
    // 스트림에서 데이터를 계속 읽어옵니다.
    while (true) {
      const { done, value } = await reader.read();
      if (done) break; // 더 이상 읽을 데이터가 없으면 종료
 
      // 받아온 바이트 데이터를 문자열로 변환하고, 버퍼에 누적합니다.
      buffer += decoder.decode(value, { stream: true });
 
      // 버퍼 내에서 "\n\n" 구분자로 이벤트 단위를 찾습니다.
      let delimiterIndex: number;
      while ((delimiterIndex = buffer.indexOf('\n\n')) !== -1) {
        const rawEvent = buffer.slice(0, delimiterIndex).trim();
        buffer = buffer.slice(delimiterIndex + 2);
        if (rawEvent) {
          // parseSSEEvent 함수를 통해 원시 이벤트 텍스트를 파싱
          const eventData = parseSSEEvent(rawEvent);
          if (eventData && this.onmessage) {
            // 파싱된 데이터를 MessageEvent 형태로 변환하여 onmessage 콜백에 전달
            const messageEvent = new MessageEvent(eventData.event || 'message', {
              data: eventData.data,
              lastEventId: eventData.id || '',
            });
            this.onmessage(messageEvent);
          }
        }
      }
    }
  } catch (error) {
    // 에러 발생 시, 연결 상태가 CLOSED가 아니라면 onerror 콜백 호출
    if (this.readyState !== 2 && this.onerror) {
      this.onerror(new Event('error'));
    }
    console.error('CustomSSE connection error:', error);
  }
}
 
close() {
  this.readyState = 2; // CLOSED 상태로 변경
  this._abortController.abort(); // fetch 요청을 취소하여 연결을 종료합니다.
}
  • connect(): Fetch API로 SSE 서버에 연결하고, 스트림 데이터를 TextDecoder로 변환한 뒤 \n\n 구분자로 이벤트를 파싱합니다. 파싱된 이벤트는 onmessage 콜백으로 전달됩니다.
  • close(): AbortController.abort()로 연결을 종료하고 readyState를 2(CLOSED)로 변경합니다.
  • readyState: 네이티브 EventSource와 동일하게 0(연결 중), 1(연결됨), 2(종료)로 관리합니다.

사용 예시

실제 이메일 대량 발송의 진행 상황을 실시간으로 보여주는 컴포넌트입니다.

function EmailCampaignProgress({ campaignId }: { campaignId: string }) {
  const [progress, setProgress] = useState({ sent: 0, failed: 0, total: 0 });
  const [status, setStatus] = useState<'connecting' | 'sending' | 'done' | 'error'>('connecting');
 
  useEffect(() => {
    const sse = new CustomSSE(`/api/campaigns/${campaignId}/progress`);
 
    sse.onopen = () => {
      setStatus('sending');
    };
 
    sse.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setProgress({ sent: data.sent, failed: data.failed, total: data.total });
 
      if (data.sent + data.failed >= data.total) {
        setStatus('done');
        sse.close();
      }
    };
 
    sse.onerror = () => {
      setStatus('error');
    };
 
    sse.connect();
 
    return () => sse.close();
  }, [campaignId]);
 
  return (
    <div>
      <p>{status === 'sending' && `발송 중... ${progress.sent}/${progress.total}`}</p>
      <p>{status === 'done' && `완료 (성공 ${progress.sent}, 실패 ${progress.failed})`}</p>
      <p>{status === 'error' && '연결이 끊어졌습니다.'}</p>
    </div>
  );
}

마무리

EventSource는 간편하지만 커스텀 헤더를 설정할 수 없어서, 인증이 필요한 환경에서는 사용하기 어렵습니다. 이 글에서는 Fetch API로 이 제약을 해결하면서도 onopen/onmessage/onerror 같은 익숙한 인터페이스를 유지하는 CustomSSE 클래스를 구현해봤습니다.