Fetch API 기반 SSE로 실시간 이벤트 스트림 구현하기

2024년 12월 04일

실시간으로 서버에서 클라이언트로 데이터를 전달하는 방법은 여러 가지가 있지만, SSE(Server-Sent Events)는 단방향(서버 → 클라이언트) 통신을 위해 설계된 방식입니다. 클라이언트가 서버와 연결을 유지하면, 서버는 언제든 데이터를 클라이언트로 직접 푸시할 수 있습니다.

예를 들어, 이메일 발송 결과를 사용자에게 실시간으로 알리고 싶을 때 SSE가 유용합니다. 서버가 이메일을 성공적으로 발송하거나 실패했을 때, 그 결과를 클라이언트에 즉시 전달할 수 있기 때문입니다. 사용자 입장에서는 화면에서 바로 성공 여부를 확인할 수 있고, 서버는 필요할 때만 데이터를 보내기 때문에 불필요한 요청이 발생하지 않습니다.

브라우저에서는 기본적으로 EventSource API를 통해 SSE를 사용할 수 있습니다. 하지만 EventSource API에는 커스텀 헤더(Authorization) 설정이 불가능하다는 제약이 있기때문에, Fetch API를 사용하여 Authorization 헤더를 설정할 수 있는 SSE(Client-Sent Events) 커스텀 클래스를 구현해보겠습니다.


SSE 적용 이유

이메일 대량 발송은 보통 몇 초 이상 걸리는 비동기 프로세스로, 실제로는 서버가 모든 이메일을 보내는 데 상당한 시간이 소요될 수 있습니다. 예를 들어, 여러 명에게 이메일을 동시에 보내야 할 때, 일반적인 HTTP 요청-응답 패턴으로는 다음과 같은 문제가 생깁니다

  1. 언제 발송이 실제로 끝났는지 알 수 없다
    • 요청 직후 "발송 완료"라고 표시해도, 실제로는 서버가 아직 메일을 전송 중일 수도 있습니다.
    • 사용자는 진짜로 성공했는지, 어느 시점에 완료됐는지를 정확히 확인하기 어렵습니다.
  2. polling
    • "이메일 발송이 얼마나 진행됐는지" 알기 위해서 5초, 10초 간격으로 계속 "몇 %나 보냈나요?" 하는 식으로 서버에 물어볼 수는 있지만,
    • 이 방식은 요청이 쌓이면서 서버 부하가 늘어나고, 구현도 번거로워집니다.
  3. WebSocket을 이용해 양방향 실시간 통신을 구현할 수도 있습니다.
    • 하지만 WebSocket은 양방향 통신이 필요한 경우에 적합하며, 비교적 구현이 더 복잡해질 수 있습니다.
    • SSE는 단방향(서버 → 클라이언트) 요구사항에 더 잘 맞고, HTTP 기반이라 설정이 간단한 장점이 있어 이메일 발송 상태 표시나 간단한 알림 스트리밍에 적합합니다.

SSE는 서버가 이벤트를 직접 푸시하기 때문에, 클라이언트는 별도의 요청 없이도 "진행 상황", "성공", "실패" 등 상태 변화를 실시간으로 전달받을 수 있기 때문에, 사용자 경험이 향상되고, 서버 부하도 최소화할 수 있습니다.

EventSource API 이해

EventSource Web API를 사용하면 아래와 같이 몇 줄의 코드로 서버의 실시간 이벤트를 받을 수 있습니다.

const eventSource = new EventSource('https://api.example.com/email-progress');
 
eventSource.onmessage = (event) => {
  console.log('Server says:', event.data);
};
 
eventSource.onerror = (error) => {
  console.error('SSE error:', error);
};

EventSource 한계

하지만 EventSource는 브라우저의 보안 정책과 CORS 규칙 등 때문에 요청 헤더를 직접 수정하거나 추가할 수 없습니다.

만약 서버에서 인증 토큰(예: JWT나 Bearer 토큰)이 포함된 요청을 요구한다면, new EventSource(url)만으로는 필요한 Authorization 헤더를 전달할 수 없습니다.


Fetch API 기반의 Custom SSE 구현

EventSource의 한계를 극복하기 위해 Fetch API를 사용한 CustomSSE 클래스를 구현합니다.

요구사항

  1. 커스텀 헤더 설정
    • Fetch API를 사용해 요청을 보낼 때, Authorization 등 필요한 커스텀 헤더를 포함할 수 있습니다.
  2. 실시간 스트림 데이터 처리
    • 서버에서 전송된 텍스트 형식의 SSE 스트림을 읽어, 하나의 긴 텍스트 덩어리에서 여러 이벤트 단위로 분리하고 파싱해야 합니다.
  3. EventSource와 유사한 인터페이스 제공
    • 기존 EventSource에서 제공하는 onopen, onmessage, onerror와 같은 이벤트 핸들러를 지원해야 합니다.
  4. 연결 상태 관리 (readyState)
    • 연결 상태를 나타내는 readyState 속성을 도입하여, 현재 연결이 "연결 중", "연결됨", "연결 종료" 상태 중 어느 상태에 있는지 쉽게 파악할 수 있어야 합니다.

CustomSSE 구현하기

SSE(Server-Sent Events)로 서버에서 클라이언트로 데이터를 전달할 때, 서버는 텍스트 형식의 이벤트 스트림을 전송합니다. 이 스트림은 실시간으로 클라이언트에 전달되지만 JSON이나 객체 형식이 아닙니다.

SSE 데이터의 구조

event: message
data: Hello, world!

event: update
data: 50%

event : 이벤트의 타입을 지정합니다. 예를 들어, message, update 등 사용자 정의 이벤트 타입이 가능합니다.
data : 이벤트의 실제 데이터이며, 여러 줄에 걸쳐 전달될 수 있습니다.
id :이벤트 ID를 지정하여 클라이언트가 수신한 마지막 이벤트 ID를 기억할 수 있게 합니다.

SSE 이벤트 파싱 (parseSSEEvent 함수)

function parseSSEEvent(raw: string): SSEEvent | null {
  const lines = raw.split('\n');
  let eventType: string | undefined;
  let data = '';
  let id: string | undefined;
 
  for (const line of lines) {
    if (line.startsWith('event:')) {
      eventType = line.replace('event:', '').trim();
    } else if (line.startsWith('data:')) {
      // 여러 data: 줄이 있을 수 있으므로 줄바꿈으로 연결
      data += line.replace('data:', '').trim() + '\n';
    } else if (line.startsWith('id:')) {
      id = line.replace('id:', '').trim();
    }
  }
 
  if (data !== '') {
    return { event: eventType, data: data.trim(), id };
  }
  return null;
}
  • 서버에서 받은 원시 SSE 텍스트 데이터를 줄 단위로 분리하여, 이벤트 타입, 데이터, 아이디를 추출합니다.
  • 여러 줄에 걸친 data: 값을 하나의 문자열로 연결하며, 이벤트 단위는 \n\n(빈줄)으로 구분되어 파싱됩니다.

CustomSSE 클래스

CustomSSE 클래스는 SSE(Server-Sent Events) 서버와의 실시간 연결하고, 메시지를 수신하여 사용자에게 전달하는 기능을 제공합니다.

먼저, CustomSSE 객체가 생성되면 URL과 인증 토큰이 저장되고, 즉시 서버와 연결을 시도합니다.
이때 Fetch API가 사용되며, Authorization 헤더에 Bearer 토큰이 설정되어 인증된 사용자로 연결이 시도됩니다. Accept 헤더는 'text/event-stream'으로 지정되어 서버가 SSE 형식으로 응답을 제공하도록 명시합니다.
또한, AbortController가 함께 생성되어 연결을 중단할 수 있는 제어권을 확보합니다.

서버 연결이 성공하면 readyState가 1 (OPEN)으로 변경되며, 사용자 지정 onopen 콜백이 실행됩니다. 이는 연결이 성공했음을 사용자에게 알리거나 초기화 작업을 수행할 수 있도록 도와줍니다. 서버로부터 수신된 데이터는 스트리밍 방식으로 지속적으로 읽혀지며, TextDecoder를 사용하여 UTF-8 형식의 문자열로 변환됩니다. 이 문자열 데이터는 버퍼에 누적되어 SSE 표준에 따라 '\n\n' (빈 줄)로 구분된 개별 이벤트 메시지로 파싱됩니다.

파싱된 각 메시지는 사용자 지정 onmessage 콜백으로 전달됩니다. 여기서 각 메시지는 JSON 형식으로 변환되거나 사용자 정의 로직에 맞게 처리될 수 있습니다. 이를 통해 사용자 애플리케이션은 서버로부터 실시간으로 이벤트를 받아 처리할 수 있습니다.

만약 연결 중 오류가 발생하면, CustomSSE 클래스는 onerror 콜백을 통해 사용자에게 에러를 알립니다. 이때 콘솔에 에러 메시지도 출력되므로 디버깅이 용이합니다. 에러는 서버 연결 실패, 네트워크 문제 또는 잘못된 토큰으로 인해 발생할 수 있으며, 각 상황에서 적절한 대응이 가능합니다.

마지막으로, 사용자가 CustomSSE 인스턴스의 close() 메서드를 호출하면 연결이 안전하게 종료됩니다. 이때 readyState는 2 (CLOSED)로 변경되고, AbortController의 abort() 메서드를 통해 Fetch 요청이 강제로 취소됩니다. 이는 불필요한 리소스 사용을 방지하고 서버와의 연결을 즉시 종료할 수 있도록 합니다.

클래스 멤버 변수 및 생성자

export class CustomSSE {
  url: string;      // SSE 서버의 URL
  token: string;    // 인증 토큰 (예: Bearer 토큰)
  onopen: ((event: Event) => void) | null = null;     // 연결 성공 시 호출될 콜백
  onmessage: ((event: MessageEvent) => void) | null = null; // 메시지 수신 시 호출될 콜백
  onerror: ((event: Event) => void) | null = null;      // 에러 발생 시 호출될 콜백
  readyState: number = 0; // 0: connecting, 1: open, 2: closed
 
  private _abortController: AbortController; // 연결을 중단하기 위한 컨트롤러
 
  constructor(url: string, token: string) {
    this.url = url;
    this.token = token;
    this.readyState = 0; // 처음에는 연결 중 (CONNECTING)
    this._abortController = new AbortController();
    this.connect(); // 객체 생성 시 바로 연결을 시도합니다.
  }
}

connect, close 메서드

private async connect() {
  try {
    const response = await fetch(this.url, {
      headers: {
        Authorization: `Bearer ${this.token}`, // 인증 헤더 설정
        Accept: 'text/event-stream',            // SSE 스트림 요청
      },
      cache: 'no-store',
      credentials: 'include',
      signal: this._abortController.signal,      // 연결 취소를 위한 signal
    });
    if (!response.ok || !response.body) {
      throw new Error(`HTTP error! status: ${response.status}`);
    }
 
    // 연결 성공: readyState를 OPEN(1)으로 변경하고 onopen 콜백 호출
    this.readyState = 1;
    if (this.onopen) {
      this.onopen(new Event('open'));
    }
 
    // 서버로부터 데이터를 읽어오기 위한 리더 생성
    const reader = response.body.getReader();
    const decoder = new TextDecoder('utf-8');
    let buffer = '';
 
    // 스트림에서 데이터를 계속 읽어옵니다.
    while (true) {
      const { done, value } = await reader.read();
      if (done) break; // 더 이상 읽을 데이터가 없으면 종료
 
      // 받아온 바이트 데이터를 문자열로 변환하고, 버퍼에 누적합니다.
      buffer += decoder.decode(value, { stream: true });
 
      // 버퍼 내에서 "\n\n" 구분자로 이벤트 단위를 찾습니다.
      let delimiterIndex: number;
      while ((delimiterIndex = buffer.indexOf('\n\n')) !== -1) {
        const rawEvent = buffer.slice(0, delimiterIndex).trim();
        buffer = buffer.slice(delimiterIndex + 2);
        if (rawEvent) {
          // parseSSEEvent 함수를 통해 원시 이벤트 텍스트를 파싱
          const eventData = parseSSEEvent(rawEvent);
          if (eventData && this.onmessage) {
            // 파싱된 데이터를 MessageEvent 형태로 변환하여 onmessage 콜백에 전달
            const messageEvent = new MessageEvent(eventData.event || 'message', {
              data: eventData.data,
              lastEventId: eventData.id || '',
            });
            this.onmessage(messageEvent);
          }
        }
      }
    }
  } catch (error) {
    // 에러 발생 시, 연결 상태가 CLOSED가 아니라면 onerror 콜백 호출
    if (this.readyState !== 2 && this.onerror) {
      this.onerror(new Event('error'));
    }
    console.error('CustomSSE connection error:', error);
  }
}
 
close() {
  this.readyState = 2; // CLOSED 상태로 변경
  this._abortController.abort(); // fetch 요청을 취소하여 연결을 종료합니다.
}
  • 연결 설정:
    • 생성자에서 URL과 토큰을 저장한 후, _abortController를 생성하고 connect()를 호출하여 서버에 연결을 시도합니다.
  • 커스텀 헤더 포함:
    • Fetch API를 사용하여 요청 시 Authorization 헤더와 Accept: 'text/event-stream'를 포함합니다.
  • 데이터 스트림 처리:
    • 서버로부터 읽어들인 바이트 데이터를 TextDecoder를 통해 문자열로 변환합니다.
    • 버퍼에 누적된 데이터를 \n\n(빈줄) 구분자로 분리하여 이벤트 단위로 파싱합니다.
    • 파싱된 데이터가 있으면 onmessage 핸들러를 호출해 개발자가 정의한 로직을 실행합니다.
  • 연결 상태 관리:
    • EventSource API는 연결 상태를 0(연결 중), 1(연결됨), 2(연결 종료)로 관리합니다.
    • CustomSSE 클래스에서도 네이티브 EventSource와 동일한 방식으로 readyState를 설정합니다.
    • 이 상태 값은 네이티브 EventSource의 상태와 유사한 방식으로 정의되어 있습니다.
  • 에러 및 종료:
    • 에러 발생 시 onerror 핸들러를 호출하고, 사용자가 close() 메서드를 호출하면 AbortController를 통해 연결을 중단합니다.

마무리

위에서 구현한 CustomSSE 클래스는 기본 EventSource API의 한계를 보완하여, 실시간 이벤트 스트림을 확장하여 관리할 수 있게 해줍니다. 인증이 필요한 경우에도 Fetch API를 통해 필요한 헤더를 자유롭게 설정할 수 있으며, 실시간으로 이벤트 데이터를 파싱하여 사용자에게 즉시 전달할 수 있습니다.
필요에 따라 CustomSSE를 확장하여 자동 재연결, 사용자 정의 이벤트 핸들러 같은 기능을 추가할 수 있을 것 같습니다.