본문으로 건너뛰기
Event Sourcing과 CQRS 프로덕션 구현 - 분산 시스템 아키텍처
Table of Contents

2025년, 마이크로서비스 아키텍처가 신규 엔터프라이즈 애플리케이션의 90% 이상을 차지하면서, 확장 가능하고 감사 가능한 시스템 구축에 필수적인 두 가지 패턴이 부상했습니다: Event Sourcing과 CQRS(Command Query Responsibility Segregation). 하지만 이러한 패턴을 프로덕션 환경에 구현하면 대부분의 튜토리얼에서 언급하지 않는 문제들이 드러납니다.

이 가이드는 기본 개념부터 실제 프로덕션 문제까지, 매일 수백만 개의 이벤트를 처리하는 기업들이 사용하는 실용적인 솔루션을 다룹니다.

Event Sourcing과 CQRS를 사용하는 이유

전통적인 방식의 문제점

전통적인 CRUD 시스템은 현재 상태만 저장합니다:

// 전통적인 CRUD - 현재 상태만 저장
const user = {
 id: 123,
 balance: 5000,
 status: 'active',
 lastUpdated: '2025-11-11T10:00:00Z'
}

// 손실된 정보:
// - 잔액이 왜 5000인가?
// - 어떤 거래가 발생했는가?
// - 상태는 언제 변경되었는가?
// - 누가 이러한 변경을 했는가?

이 접근 방식의 문제점:

  • 감사 추적 없음
  • 히스토리 재생 불가
  • 비즈니스 인사이트 손실
  • 디버깅 어려움
  • 컴플라이언스 문제

Event Sourcing 솔루션

현재 상태를 저장하는 대신, 그 상태로 이어진 모든 이벤트를 저장합니다:

// Event Sourcing - 완전한 히스토리
const events = [
 {
 id: 'e1',
 type: 'AccountCreated',
 data: { userId: 123, initialBalance: 0 },
 timestamp: '2025-01-01T00:00:00Z'
 },
 {
 id: 'e2',
 type: 'FundsDeposited',
 data: { userId: 123, amount: 3000, source: 'bank_transfer' },
 timestamp: '2025-01-15T10:30:00Z'
 },
 {
 id: 'e3',
 type: 'FundsDeposited',
 data: { userId: 123, amount: 2500, source: 'payroll' },
 timestamp: '2025-02-01T09:00:00Z'
 },
 {
 id: 'e4',
 type: 'FundsWithdrawn',
 data: { userId: 123, amount: 500, reason: 'atm_withdrawal' },
 timestamp: '2025-02-10T14:20:00Z'
 },
 {
 id: 'e5',
 type: 'AccountActivated',
 data: { userId: 123, activatedBy: 'admin@example.com' },
 timestamp: '2025-01-02T08:00:00Z'
 }
]

// 현재 상태는 이벤트로부터 계산됨
function calculateBalance(events) {
 return events.reduce((balance, event) => {
 switch (event.type) {
 case 'AccountCreated': return event.data.initialBalance
 case 'FundsDeposited': return balance + event.data.amount
 case 'FundsWithdrawn': return balance - event.data.amount
 default: return balance
 }
 }, 0)
}

const currentBalance = calculateBalance(events) // 5000

장점:

  • 완전한 감사 추적
  • 시간 여행 디버깅
  • 규제 준수 (금융, 헬스케어)
  • 비즈니스 인텔리전스 (사용자 행동 분석)
  • 새로운 프로젝션 추가 용이

CQRS: 읽기와 쓰기 분리

CQRS는 애플리케이션을 두 부분으로 분리합니다:

Command Side (쓰기):

  • 비즈니스 규칙 검증
  • 이벤트 생성
  • 이벤트 스토어에 저장

Query Side (읽기):

  • 최적화된 읽기 모델 (프로젝션)
  • 빠른 쿼리
  • 최종 일관성 (Eventually consistent)
// Command Side - 쓰기 작업
class BankAccountCommandHandler {
 async depositFunds(accountId, amount, source) {
 // 1. 이벤트 스토어에서 이벤트 로드
 const events = await eventStore.getEvents(accountId)

 // 2. 현재 상태 재구성 (aggregate)
 const account = new BankAccount(events)

 // 3. 명령 실행 (비즈니스 로직)
 const newEvent = account.deposit(amount, source)

 // 4. 검증
 if (!newEvent) {
 throw new Error('Deposit failed')
 }

 // 5. 새 이벤트 저장
 await eventStore.appendEvent(accountId, newEvent)

 return newEvent
 }
}

// Query Side - 읽기 작업
class BankAccountQueryHandler {
 async getAccountBalance(accountId) {
 // 최적화된 프로젝션에서 읽기 (빠름!)
 return await accountBalanceProjection.get(accountId)
 }

 async getTransactionHistory(accountId, limit = 50) {
 // 비정규화된 테이블에서 읽기
 return await transactionHistoryProjection.query(accountId, limit)
 }
}

프로덕션 구현: 완전한 시스템

1. Event Store 설계

// event-store.js
const EventEmitter = require('events')

class EventStore extends EventEmitter {
 constructor(database) {
 super()
 this.db = database
 }

 async appendEvent(aggregateId, event) {
 // 낙관적 동시성 제어를 위한 현재 버전 가져오기
 const currentVersion = await this.getAggregateVersion(aggregateId)

 const eventRecord = {
 id: this.generateEventId(),
 aggregateId,
 version: currentVersion + 1,
 type: event.type,
 data: event.data,
 metadata: {
 timestamp: new Date().toISOString(),
 correlationId: event.correlationId,
 causationId: event.causationId,
 userId: event.userId
 }
 }

 try {
 // 버전 체크와 함께 원자적 삽입
 await this.db.query(`
 INSERT INTO events (
 id, aggregate_id, version, type, data, metadata, created_at
 )
 VALUES ($1, $2, $3, $4, $5, $6, NOW())
 WHERE NOT EXISTS (
 SELECT 1 FROM events
 WHERE aggregate_id = $2 AND version >= $3
 )
 `, [
 eventRecord.id,
 eventRecord.aggregateId,
 eventRecord.version,
 eventRecord.type,
 JSON.stringify(eventRecord.data),
 JSON.stringify(eventRecord.metadata)
 ])

 // 프로젝션을 위한 이벤트 발행
 this.emit('eventAppended', eventRecord)

 return eventRecord

 } catch (error) {
 if (error.code === '23505') { // 유니크 제약 조건 위반
 throw new ConcurrencyError(
 `Concurrency conflict for aggregate ${aggregateId}`
 )
 }
 throw error
 }
 }

 async getEvents(aggregateId, fromVersion = 0) {
 const result = await this.db.query(`
 SELECT id, aggregate_id, version, type, data, metadata, created_at
 FROM events
 WHERE aggregate_id = $1 AND version > $2
 ORDER BY version ASC
 `, [aggregateId, fromVersion])

 return result.rows.map(row => ({
 id: row.id,
 aggregateId: row.aggregate_id,
 version: row.version,
 type: row.type,
 data: JSON.parse(row.data),
 metadata: JSON.parse(row.metadata),
 timestamp: row.created_at
 }))
 }

 async getAggregateVersion(aggregateId) {
 const result = await this.db.query(`
 SELECT COALESCE(MAX(version), 0) as version
 FROM events
 WHERE aggregate_id = $1
 `, [aggregateId])

 return result.rows[0].version
 }

 generateEventId() {
 return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
 }
}

class ConcurrencyError extends Error {
 constructor(message) {
 super(message)
 this.name = 'ConcurrencyError'
 }
}

module.exports = { EventStore, ConcurrencyError }

2. Aggregate Root 패턴

// bank-account-aggregate.js
class BankAccount {
 constructor(events = []) {
 this.id = null
 this.balance = 0
 this.status = 'pending'
 this.version = 0
 this.pendingEvents = []

 // 이벤트로부터 상태 재구성
 events.forEach(event => this.apply(event, false))
 }

 // Commands (비즈니스 로직)

 create(accountId, initialBalance = 0) {
 if (this.id) {
 throw new Error('Account already exists')
 }

 const event = {
 type: 'AccountCreated',
 data: { accountId, initialBalance }
 }

 this.apply(event, true)
 return event
 }

 deposit(amount, source) {
 this.assertActive()

 if (amount <= 0) {
 throw new Error('Deposit amount must be positive')
 }

 if (amount > 1000000) {
 throw new Error('Deposit amount exceeds limit')
 }

 const event = {
 type: 'FundsDeposited',
 data: { amount, source }
 }

 this.apply(event, true)
 return event
 }

 withdraw(amount, reason) {
 this.assertActive()

 if (amount <= 0) {
 throw new Error('Withdrawal amount must be positive')
 }

 if (this.balance < amount) {
 throw new Error('Insufficient funds')
 }

 const event = {
 type: 'FundsWithdrawn',
 data: { amount, reason }
 }

 this.apply(event, true)
 return event
 }

 activate() {
 if (this.status === 'active') {
 throw new Error('Account already active')
 }

 const event = {
 type: 'AccountActivated',
 data: {}
 }

 this.apply(event, true)
 return event
 }

 // Event Handlers (상태 변경)

 apply(event, isNew = false) {
 switch (event.type) {
 case 'AccountCreated':
 this.id = event.data.accountId
 this.balance = event.data.initialBalance
 this.status = 'pending'
 break

 case 'FundsDeposited':
 this.balance += event.data.amount
 break

 case 'FundsWithdrawn':
 this.balance -= event.data.amount
 break

 case 'AccountActivated':
 this.status = 'active'
 break

 default:
 console.warn(`Unknown event type: ${event.type}`)
 }

 this.version++

 if (isNew) {
 this.pendingEvents.push(event)
 }
 }

 assertActive() {
 if (this.status !== 'active') {
 throw new Error('Account is not active')
 }
 }

 getPendingEvents() {
 return this.pendingEvents
 }

 clearPendingEvents() {
 this.pendingEvents = []
 }
}

module.exports = BankAccount

3. Projection Builder (읽기 모델)

// projections/account-balance-projection.js
class AccountBalanceProjection {
 constructor(database, eventStore) {
 this.db = database
 this.eventStore = eventStore

 // 이벤트 구독
 this.eventStore.on('eventAppended', (event) => {
 this.handleEvent(event)
 })
 }

 async handleEvent(event) {
 switch (event.type) {
 case 'AccountCreated':
 await this.createAccount(event)
 break

 case 'FundsDeposited':
 await this.updateBalance(event.aggregateId, event.data.amount)
 break

 case 'FundsWithdrawn':
 await this.updateBalance(event.aggregateId, -event.data.amount)
 break

 case 'AccountActivated':
 await this.activateAccount(event.aggregateId)
 break
 }
 }

 async createAccount(event) {
 await this.db.query(`
 INSERT INTO account_balances (
 account_id, balance, status, created_at, updated_at
 )
 VALUES ($1, $2, 'pending', NOW(), NOW())
 ON CONFLICT (account_id) DO NOTHING
 `, [event.data.accountId, event.data.initialBalance])
 }

 async updateBalance(accountId, amount) {
 await this.db.query(`
 UPDATE account_balances
 SET balance = balance + $2, updated_at = NOW()
 WHERE account_id = $1
 `, [accountId, amount])
 }

 async activateAccount(accountId) {
 await this.db.query(`
 UPDATE account_balances
 SET status = 'active', updated_at = NOW()
 WHERE account_id = $1
 `, [accountId])
 }

 async get(accountId) {
 const result = await this.db.query(`
 SELECT account_id, balance, status, created_at, updated_at
 FROM account_balances
 WHERE account_id = $1
 `, [accountId])

 return result.rows[0] || null
 }

 async rebuild() {
 console.log('프로젝션 재구성 중...')

 // 현재 프로젝션 초기화
 await this.db.query('TRUNCATE TABLE account_balances')

 // 모든 이벤트 재생
 const result = await this.db.query(`
 SELECT id, aggregate_id, version, type, data, metadata, created_at
 FROM events
 ORDER BY created_at ASC, version ASC
 `)

 for (const row of result.rows) {
 await this.handleEvent({
 id: row.id,
 aggregateId: row.aggregate_id,
 version: row.version,
 type: row.type,
 data: JSON.parse(row.data),
 metadata: JSON.parse(row.metadata),
 timestamp: row.created_at
 })
 }

 console.log(`프로젝션 재구성 완료: ${result.rows.length}개 이벤트 처리됨`)
 }
}

module.exports = AccountBalanceProjection

프로덕션 문제와 해결책

문제 1: 이벤트 스키마 진화

문제: 시스템이 진화하면서 이벤트 구조가 변경됩니다.

해결책: Event Upcasting

class EventUpcaster {
 constructor() {
 this.upcasters = new Map()

 // Upcaster 등록
 this.registerUpcaster('FundsDeposited', 1, this.upcastFundsDepositedV1)
 }

 registerUpcaster(eventType, fromVersion, upcasterFn) {
 const key = `${eventType}_v${fromVersion}`
 this.upcasters.set(key, upcasterFn)
 }

 upcast(event) {
 const version = event.metadata.schemaVersion || 1
 const key = `${event.type}_v${version}`

 if (this.upcasters.has(key)) {
 const upcaster = this.upcasters.get(key)
 return upcaster(event)
 }

 return event
 }

 // 예시: 이전 FundsDeposited 이벤트에는 'source' 필드가 없었음
 upcastFundsDepositedV1(event) {
 return {
...event,
 data: {
...event.data,
 source: event.data.source || 'unknown' // 누락된 필드 추가
 },
 metadata: {
...event.metadata,
 schemaVersion: 2
 }
 }
 }
}

문제 2: 대규모 이벤트 스트림 성능

문제: 하나의 aggregate에 대해 수천 개의 이벤트를 로드하는 것은 느립니다.

해결책: Snapshots

class SnapshotStore {
 constructor(database) {
 this.db = database
 this.snapshotInterval = 100 // 100개 이벤트마다 스냅샷
 }

 async saveSnapshot(aggregateId, state, version) {
 await this.db.query(`
 INSERT INTO snapshots (aggregate_id, state, version, created_at)
 VALUES ($1, $2, $3, NOW())
 ON CONFLICT (aggregate_id)
 DO UPDATE SET state = $2, version = $3, created_at = NOW()
 `, [aggregateId, JSON.stringify(state), version])
 }

 async getSnapshot(aggregateId) {
 const result = await this.db.query(`
 SELECT state, version
 FROM snapshots
 WHERE aggregate_id = $1
 `, [aggregateId])

 if (result.rows.length === 0) {
 return null
 }

 return {
 state: JSON.parse(result.rows[0].state),
 version: result.rows[0].version
 }
 }
}

// 스냅샷을 사용한 이벤트 로딩
async function loadAggregate(aggregateId) {
 // 스냅샷 먼저 로드 시도
 const snapshot = await snapshotStore.getSnapshot(aggregateId)

 let events
 let aggregate

 if (snapshot) {
 // 스냅샷 이후의 이벤트만 로드
 events = await eventStore.getEvents(aggregateId, snapshot.version)
 aggregate = new BankAccount([])
 aggregate.hydrateFromSnapshot(snapshot.state)
 } else {
 // 모든 이벤트 로드
 events = await eventStore.getEvents(aggregateId)
 aggregate = new BankAccount([])
 }

 // 나머지 이벤트 적용
 events.forEach(event => aggregate.apply(event))

 return aggregate
}

문제 3: 최종 일관성 (Eventual Consistency)

문제: 프로젝션이 이벤트보다 뒤처집니다.

해결책: Event Position Tracking

class ProjectionProgressTracker {
 constructor(database) {
 this.db = database
 }

 async getLastProcessedPosition(projectionName) {
 const result = await this.db.query(`
 SELECT last_position
 FROM projection_progress
 WHERE projection_name = $1
 `, [projectionName])

 return result.rows[0]?.last_position || 0
 }

 async updatePosition(projectionName, position) {
 await this.db.query(`
 INSERT INTO projection_progress (projection_name, last_position, updated_at)
 VALUES ($1, $2, NOW())
 ON CONFLICT (projection_name)
 DO UPDATE SET last_position = $2, updated_at = NOW()
 `, [projectionName, position])
 }
}

// 프로젝션을 위한 Catch-up Subscription
class CatchUpSubscription {
 constructor(eventStore, projection, progressTracker) {
 this.eventStore = eventStore
 this.projection = projection
 this.progressTracker = progressTracker
 this.isRunning = false
 }

 async start() {
 this.isRunning = true
 const projectionName = this.projection.constructor.name

 while (this.isRunning) {
 const lastPosition = await this.progressTracker.getLastProcessedPosition(projectionName)

 // 이벤트 배치 가져오기
 const events = await this.eventStore.getEventsFromPosition(lastPosition, 100)

 if (events.length === 0) {
 // 따라잡음 - 새 이벤트 대기
 await this.sleep(1000)
 continue
 }

 // 배치 처리
 for (const event of events) {
 await this.projection.handleEvent(event)
 await this.progressTracker.updatePosition(projectionName, event.globalPosition)
 }

 console.log(`${projectionName}에 대해 ${events.length}개 이벤트 처리됨`)
 }
 }

 stop() {
 this.isRunning = false
 }

 sleep(ms) {
 return new Promise(resolve => setTimeout(resolve, ms))
 }
}

프로덕션 모범 사례

1. 이벤트 네이밍 규칙

// 좋은 예 - 과거형, 도메인 언어
'OrderPlaced'
'PaymentProcessed'
'CustomerRegistered'
'InventoryReserved'

// 나쁜 예 - 현재형, 기술적
'PlaceOrder'
'ProcessPayment'
'RegisterCustomer'
'ReserveInventory'

2. 멱등성 이벤트 핸들러

async function handlePaymentProcessed(event) {
 // 이미 처리되었는지 확인
 const exists = await db.query(`
 SELECT 1 FROM processed_payments
 WHERE event_id = $1
 `, [event.id])

 if (exists.rows.length > 0) {
 console.log(`이벤트 ${event.id} 이미 처리됨, 건너뜀`)
 return
 }

 // 이벤트 처리
 await db.query(`
 INSERT INTO processed_payments (event_id, payment_id, amount)
 VALUES ($1, $2, $3)
 `, [event.id, event.data.paymentId, event.data.amount])
}

3. Correlation 및 Causation ID

// 이벤트 관계 추적
const event = {
 id: 'evt_123',
 type: 'PaymentProcessed',
 data: { amount: 100 },
 metadata: {
 correlationId: 'order_xyz', // 원래 요청 ID
 causationId: 'evt_122', // 이 이벤트를 발생시킨 이벤트
 userId: 'user_456'
 }
}

4. 모니터링과 관찰성

const prometheus = require('prom-client')

const eventProcessedCounter = new prometheus.Counter({
 name: 'events_processed_total',
 help: 'Total events processed',
 labelNames: ['event_type', 'status']
})

const projectionLagGauge = new prometheus.Gauge({
 name: 'projection_lag_seconds',
 help: 'Projection lag in seconds',
 labelNames: ['projection_name']
})

// 메트릭 추적
eventProcessedCounter.inc({ event_type: 'FundsDeposited', status: 'success' })

const lag = (Date.now() - lastEventTimestamp) / 1000
projectionLagGauge.set({ projection_name: 'AccountBalance' }, lag)

Event Sourcing을 사용하지 말아야 할 때

Event Sourcing이 항상 올바른 선택은 아닙니다:

** 사용하지 말아야 할 때:**

  • 단순한 CRUD 애플리케이션
  • 낮은 컴플라이언스 요구사항
  • 패턴에 익숙하지 않은 작은 팀
  • 감사 추적 필요 없음
  • 빠른 프로토타입

** 사용해야 할 때:**

  • 감사 추적이 중요함 (금융, 헬스케어)
  • 복잡한 비즈니스 로직
  • 히스토리 데이터 분석 필요
  • 시간적 쿼리 필요
  • 이벤트 주도 시스템 구축

결론

Event Sourcing과 CQRS는 확장 가능하고 감사 가능한 시스템을 구축하는 강력한 기능을 제공하지만, 복잡성이 따릅니다. 성공하려면 패턴과 프로덕션 문제를 모두 이해해야 합니다.

핵심 요점:

  1. 간단하게 시작: 하나의 bounded context부터 시작
  2. 스냅샷은 필수: 수천 개의 이벤트를 로드하지 마세요
  3. 스키마 진화 처리: 이벤트 upcasting 계획
  4. 프로젝션 모니터링: 지연과 실패 추적
  5. 철저한 테스트: 특히 동시성 시나리오

이러한 패턴과 솔루션을 따르면, 데이터 무결성과 성능을 유지하면서 수백만 개의 이벤트로 확장되는 프로덕션 준비 이벤트 소싱 시스템을 구축할 수 있습니다.

추가 자료

이 글 공유하기:
My avatar

글을 마치며

이 글이 도움이 되었기를 바랍니다. 궁금한 점이나 의견이 있다면 댓글로 남겨주세요.

더 많은 기술 인사이트와 개발 경험을 공유하고 있으니, 다른 포스트도 확인해보세요.

유럽살며 여행하며 코딩하는 노마드의 여정을 함께 나누며, 함께 성장하는 개발자 커뮤니티를 만들어가요! 🚀


관련 포스트

# PostgreSQL 느린 쿼리 최적화 완벽 가이드: EXPLAIN ANALYZE로 프로덕션 성능 10배 개선하기

게시:

PostgreSQL 느린 쿼리 최적화와 EXPLAIN ANALYZE 완벽 가이드입니다. Sequential Scan → Index Scan 전환, 복합 인덱스 설계, VACUUM ANALYZE, auto_explain, 그리고 2025년 최신 도구(pev2, pganalyze)까지 실전 예제와 함께 설명합니다.

읽기