# Event Sourcing과 CQRS: 프로덕션 환경 구현 가이드
Table of Contents
2025년, 마이크로서비스 아키텍처가 신규 엔터프라이즈 애플리케이션의 90% 이상을 차지하면서, 확장 가능하고 감사 가능한 시스템 구축에 필수적인 두 가지 패턴이 부상했습니다: Event Sourcing과 CQRS(Command Query Responsibility Segregation). 하지만 이러한 패턴을 프로덕션 환경에 구현하면 대부분의 튜토리얼에서 언급하지 않는 문제들이 드러납니다.
이 가이드는 기본 개념부터 실제 프로덕션 문제까지, 매일 수백만 개의 이벤트를 처리하는 기업들이 사용하는 실용적인 솔루션을 다룹니다.
Event Sourcing과 CQRS를 사용하는 이유
전통적인 방식의 문제점
전통적인 CRUD 시스템은 현재 상태만 저장합니다:
// 전통적인 CRUD - 현재 상태만 저장
const user = {
id: 123,
balance: 5000,
status: 'active',
lastUpdated: '2025-11-11T10:00:00Z'
}
// 손실된 정보:
// - 잔액이 왜 5000인가?
// - 어떤 거래가 발생했는가?
// - 상태는 언제 변경되었는가?
// - 누가 이러한 변경을 했는가?
이 접근 방식의 문제점:
- 감사 추적 없음
- 히스토리 재생 불가
- 비즈니스 인사이트 손실
- 디버깅 어려움
- 컴플라이언스 문제
Event Sourcing 솔루션
현재 상태를 저장하는 대신, 그 상태로 이어진 모든 이벤트를 저장합니다:
// Event Sourcing - 완전한 히스토리
const events = [
{
id: 'e1',
type: 'AccountCreated',
data: { userId: 123, initialBalance: 0 },
timestamp: '2025-01-01T00:00:00Z'
},
{
id: 'e2',
type: 'FundsDeposited',
data: { userId: 123, amount: 3000, source: 'bank_transfer' },
timestamp: '2025-01-15T10:30:00Z'
},
{
id: 'e3',
type: 'FundsDeposited',
data: { userId: 123, amount: 2500, source: 'payroll' },
timestamp: '2025-02-01T09:00:00Z'
},
{
id: 'e4',
type: 'FundsWithdrawn',
data: { userId: 123, amount: 500, reason: 'atm_withdrawal' },
timestamp: '2025-02-10T14:20:00Z'
},
{
id: 'e5',
type: 'AccountActivated',
data: { userId: 123, activatedBy: 'admin@example.com' },
timestamp: '2025-01-02T08:00:00Z'
}
]
// 현재 상태는 이벤트로부터 계산됨
function calculateBalance(events) {
return events.reduce((balance, event) => {
switch (event.type) {
case 'AccountCreated': return event.data.initialBalance
case 'FundsDeposited': return balance + event.data.amount
case 'FundsWithdrawn': return balance - event.data.amount
default: return balance
}
}, 0)
}
const currentBalance = calculateBalance(events) // 5000
장점:
- 완전한 감사 추적
- 시간 여행 디버깅
- 규제 준수 (금융, 헬스케어)
- 비즈니스 인텔리전스 (사용자 행동 분석)
- 새로운 프로젝션 추가 용이
CQRS: 읽기와 쓰기 분리
CQRS는 애플리케이션을 두 부분으로 분리합니다:
Command Side (쓰기):
- 비즈니스 규칙 검증
- 이벤트 생성
- 이벤트 스토어에 저장
Query Side (읽기):
- 최적화된 읽기 모델 (프로젝션)
- 빠른 쿼리
- 최종 일관성 (Eventually consistent)
// Command Side - 쓰기 작업
class BankAccountCommandHandler {
async depositFunds(accountId, amount, source) {
// 1. 이벤트 스토어에서 이벤트 로드
const events = await eventStore.getEvents(accountId)
// 2. 현재 상태 재구성 (aggregate)
const account = new BankAccount(events)
// 3. 명령 실행 (비즈니스 로직)
const newEvent = account.deposit(amount, source)
// 4. 검증
if (!newEvent) {
throw new Error('Deposit failed')
}
// 5. 새 이벤트 저장
await eventStore.appendEvent(accountId, newEvent)
return newEvent
}
}
// Query Side - 읽기 작업
class BankAccountQueryHandler {
async getAccountBalance(accountId) {
// 최적화된 프로젝션에서 읽기 (빠름!)
return await accountBalanceProjection.get(accountId)
}
async getTransactionHistory(accountId, limit = 50) {
// 비정규화된 테이블에서 읽기
return await transactionHistoryProjection.query(accountId, limit)
}
}
프로덕션 구현: 완전한 시스템
1. Event Store 설계
// event-store.js
const EventEmitter = require('events')
class EventStore extends EventEmitter {
constructor(database) {
super()
this.db = database
}
async appendEvent(aggregateId, event) {
// 낙관적 동시성 제어를 위한 현재 버전 가져오기
const currentVersion = await this.getAggregateVersion(aggregateId)
const eventRecord = {
id: this.generateEventId(),
aggregateId,
version: currentVersion + 1,
type: event.type,
data: event.data,
metadata: {
timestamp: new Date().toISOString(),
correlationId: event.correlationId,
causationId: event.causationId,
userId: event.userId
}
}
try {
// 버전 체크와 함께 원자적 삽입
await this.db.query(`
INSERT INTO events (
id, aggregate_id, version, type, data, metadata, created_at
)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
WHERE NOT EXISTS (
SELECT 1 FROM events
WHERE aggregate_id = $2 AND version >= $3
)
`, [
eventRecord.id,
eventRecord.aggregateId,
eventRecord.version,
eventRecord.type,
JSON.stringify(eventRecord.data),
JSON.stringify(eventRecord.metadata)
])
// 프로젝션을 위한 이벤트 발행
this.emit('eventAppended', eventRecord)
return eventRecord
} catch (error) {
if (error.code === '23505') { // 유니크 제약 조건 위반
throw new ConcurrencyError(
`Concurrency conflict for aggregate ${aggregateId}`
)
}
throw error
}
}
async getEvents(aggregateId, fromVersion = 0) {
const result = await this.db.query(`
SELECT id, aggregate_id, version, type, data, metadata, created_at
FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
`, [aggregateId, fromVersion])
return result.rows.map(row => ({
id: row.id,
aggregateId: row.aggregate_id,
version: row.version,
type: row.type,
data: JSON.parse(row.data),
metadata: JSON.parse(row.metadata),
timestamp: row.created_at
}))
}
async getAggregateVersion(aggregateId) {
const result = await this.db.query(`
SELECT COALESCE(MAX(version), 0) as version
FROM events
WHERE aggregate_id = $1
`, [aggregateId])
return result.rows[0].version
}
generateEventId() {
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
}
class ConcurrencyError extends Error {
constructor(message) {
super(message)
this.name = 'ConcurrencyError'
}
}
module.exports = { EventStore, ConcurrencyError }
2. Aggregate Root 패턴
// bank-account-aggregate.js
class BankAccount {
constructor(events = []) {
this.id = null
this.balance = 0
this.status = 'pending'
this.version = 0
this.pendingEvents = []
// 이벤트로부터 상태 재구성
events.forEach(event => this.apply(event, false))
}
// Commands (비즈니스 로직)
create(accountId, initialBalance = 0) {
if (this.id) {
throw new Error('Account already exists')
}
const event = {
type: 'AccountCreated',
data: { accountId, initialBalance }
}
this.apply(event, true)
return event
}
deposit(amount, source) {
this.assertActive()
if (amount <= 0) {
throw new Error('Deposit amount must be positive')
}
if (amount > 1000000) {
throw new Error('Deposit amount exceeds limit')
}
const event = {
type: 'FundsDeposited',
data: { amount, source }
}
this.apply(event, true)
return event
}
withdraw(amount, reason) {
this.assertActive()
if (amount <= 0) {
throw new Error('Withdrawal amount must be positive')
}
if (this.balance < amount) {
throw new Error('Insufficient funds')
}
const event = {
type: 'FundsWithdrawn',
data: { amount, reason }
}
this.apply(event, true)
return event
}
activate() {
if (this.status === 'active') {
throw new Error('Account already active')
}
const event = {
type: 'AccountActivated',
data: {}
}
this.apply(event, true)
return event
}
// Event Handlers (상태 변경)
apply(event, isNew = false) {
switch (event.type) {
case 'AccountCreated':
this.id = event.data.accountId
this.balance = event.data.initialBalance
this.status = 'pending'
break
case 'FundsDeposited':
this.balance += event.data.amount
break
case 'FundsWithdrawn':
this.balance -= event.data.amount
break
case 'AccountActivated':
this.status = 'active'
break
default:
console.warn(`Unknown event type: ${event.type}`)
}
this.version++
if (isNew) {
this.pendingEvents.push(event)
}
}
assertActive() {
if (this.status !== 'active') {
throw new Error('Account is not active')
}
}
getPendingEvents() {
return this.pendingEvents
}
clearPendingEvents() {
this.pendingEvents = []
}
}
module.exports = BankAccount
3. Projection Builder (읽기 모델)
// projections/account-balance-projection.js
class AccountBalanceProjection {
constructor(database, eventStore) {
this.db = database
this.eventStore = eventStore
// 이벤트 구독
this.eventStore.on('eventAppended', (event) => {
this.handleEvent(event)
})
}
async handleEvent(event) {
switch (event.type) {
case 'AccountCreated':
await this.createAccount(event)
break
case 'FundsDeposited':
await this.updateBalance(event.aggregateId, event.data.amount)
break
case 'FundsWithdrawn':
await this.updateBalance(event.aggregateId, -event.data.amount)
break
case 'AccountActivated':
await this.activateAccount(event.aggregateId)
break
}
}
async createAccount(event) {
await this.db.query(`
INSERT INTO account_balances (
account_id, balance, status, created_at, updated_at
)
VALUES ($1, $2, 'pending', NOW(), NOW())
ON CONFLICT (account_id) DO NOTHING
`, [event.data.accountId, event.data.initialBalance])
}
async updateBalance(accountId, amount) {
await this.db.query(`
UPDATE account_balances
SET balance = balance + $2, updated_at = NOW()
WHERE account_id = $1
`, [accountId, amount])
}
async activateAccount(accountId) {
await this.db.query(`
UPDATE account_balances
SET status = 'active', updated_at = NOW()
WHERE account_id = $1
`, [accountId])
}
async get(accountId) {
const result = await this.db.query(`
SELECT account_id, balance, status, created_at, updated_at
FROM account_balances
WHERE account_id = $1
`, [accountId])
return result.rows[0] || null
}
async rebuild() {
console.log('프로젝션 재구성 중...')
// 현재 프로젝션 초기화
await this.db.query('TRUNCATE TABLE account_balances')
// 모든 이벤트 재생
const result = await this.db.query(`
SELECT id, aggregate_id, version, type, data, metadata, created_at
FROM events
ORDER BY created_at ASC, version ASC
`)
for (const row of result.rows) {
await this.handleEvent({
id: row.id,
aggregateId: row.aggregate_id,
version: row.version,
type: row.type,
data: JSON.parse(row.data),
metadata: JSON.parse(row.metadata),
timestamp: row.created_at
})
}
console.log(`프로젝션 재구성 완료: ${result.rows.length}개 이벤트 처리됨`)
}
}
module.exports = AccountBalanceProjection
프로덕션 문제와 해결책
문제 1: 이벤트 스키마 진화
문제: 시스템이 진화하면서 이벤트 구조가 변경됩니다.
해결책: Event Upcasting
class EventUpcaster {
constructor() {
this.upcasters = new Map()
// Upcaster 등록
this.registerUpcaster('FundsDeposited', 1, this.upcastFundsDepositedV1)
}
registerUpcaster(eventType, fromVersion, upcasterFn) {
const key = `${eventType}_v${fromVersion}`
this.upcasters.set(key, upcasterFn)
}
upcast(event) {
const version = event.metadata.schemaVersion || 1
const key = `${event.type}_v${version}`
if (this.upcasters.has(key)) {
const upcaster = this.upcasters.get(key)
return upcaster(event)
}
return event
}
// 예시: 이전 FundsDeposited 이벤트에는 'source' 필드가 없었음
upcastFundsDepositedV1(event) {
return {
...event,
data: {
...event.data,
source: event.data.source || 'unknown' // 누락된 필드 추가
},
metadata: {
...event.metadata,
schemaVersion: 2
}
}
}
}
문제 2: 대규모 이벤트 스트림 성능
문제: 하나의 aggregate에 대해 수천 개의 이벤트를 로드하는 것은 느립니다.
해결책: Snapshots
class SnapshotStore {
constructor(database) {
this.db = database
this.snapshotInterval = 100 // 100개 이벤트마다 스냅샷
}
async saveSnapshot(aggregateId, state, version) {
await this.db.query(`
INSERT INTO snapshots (aggregate_id, state, version, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (aggregate_id)
DO UPDATE SET state = $2, version = $3, created_at = NOW()
`, [aggregateId, JSON.stringify(state), version])
}
async getSnapshot(aggregateId) {
const result = await this.db.query(`
SELECT state, version
FROM snapshots
WHERE aggregate_id = $1
`, [aggregateId])
if (result.rows.length === 0) {
return null
}
return {
state: JSON.parse(result.rows[0].state),
version: result.rows[0].version
}
}
}
// 스냅샷을 사용한 이벤트 로딩
async function loadAggregate(aggregateId) {
// 스냅샷 먼저 로드 시도
const snapshot = await snapshotStore.getSnapshot(aggregateId)
let events
let aggregate
if (snapshot) {
// 스냅샷 이후의 이벤트만 로드
events = await eventStore.getEvents(aggregateId, snapshot.version)
aggregate = new BankAccount([])
aggregate.hydrateFromSnapshot(snapshot.state)
} else {
// 모든 이벤트 로드
events = await eventStore.getEvents(aggregateId)
aggregate = new BankAccount([])
}
// 나머지 이벤트 적용
events.forEach(event => aggregate.apply(event))
return aggregate
}
문제 3: 최종 일관성 (Eventual Consistency)
문제: 프로젝션이 이벤트보다 뒤처집니다.
해결책: Event Position Tracking
class ProjectionProgressTracker {
constructor(database) {
this.db = database
}
async getLastProcessedPosition(projectionName) {
const result = await this.db.query(`
SELECT last_position
FROM projection_progress
WHERE projection_name = $1
`, [projectionName])
return result.rows[0]?.last_position || 0
}
async updatePosition(projectionName, position) {
await this.db.query(`
INSERT INTO projection_progress (projection_name, last_position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (projection_name)
DO UPDATE SET last_position = $2, updated_at = NOW()
`, [projectionName, position])
}
}
// 프로젝션을 위한 Catch-up Subscription
class CatchUpSubscription {
constructor(eventStore, projection, progressTracker) {
this.eventStore = eventStore
this.projection = projection
this.progressTracker = progressTracker
this.isRunning = false
}
async start() {
this.isRunning = true
const projectionName = this.projection.constructor.name
while (this.isRunning) {
const lastPosition = await this.progressTracker.getLastProcessedPosition(projectionName)
// 이벤트 배치 가져오기
const events = await this.eventStore.getEventsFromPosition(lastPosition, 100)
if (events.length === 0) {
// 따라잡음 - 새 이벤트 대기
await this.sleep(1000)
continue
}
// 배치 처리
for (const event of events) {
await this.projection.handleEvent(event)
await this.progressTracker.updatePosition(projectionName, event.globalPosition)
}
console.log(`${projectionName}에 대해 ${events.length}개 이벤트 처리됨`)
}
}
stop() {
this.isRunning = false
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
}
프로덕션 모범 사례
1. 이벤트 네이밍 규칙
// 좋은 예 - 과거형, 도메인 언어
'OrderPlaced'
'PaymentProcessed'
'CustomerRegistered'
'InventoryReserved'
// 나쁜 예 - 현재형, 기술적
'PlaceOrder'
'ProcessPayment'
'RegisterCustomer'
'ReserveInventory'
2. 멱등성 이벤트 핸들러
async function handlePaymentProcessed(event) {
// 이미 처리되었는지 확인
const exists = await db.query(`
SELECT 1 FROM processed_payments
WHERE event_id = $1
`, [event.id])
if (exists.rows.length > 0) {
console.log(`이벤트 ${event.id} 이미 처리됨, 건너뜀`)
return
}
// 이벤트 처리
await db.query(`
INSERT INTO processed_payments (event_id, payment_id, amount)
VALUES ($1, $2, $3)
`, [event.id, event.data.paymentId, event.data.amount])
}
3. Correlation 및 Causation ID
// 이벤트 관계 추적
const event = {
id: 'evt_123',
type: 'PaymentProcessed',
data: { amount: 100 },
metadata: {
correlationId: 'order_xyz', // 원래 요청 ID
causationId: 'evt_122', // 이 이벤트를 발생시킨 이벤트
userId: 'user_456'
}
}
4. 모니터링과 관찰성
const prometheus = require('prom-client')
const eventProcessedCounter = new prometheus.Counter({
name: 'events_processed_total',
help: 'Total events processed',
labelNames: ['event_type', 'status']
})
const projectionLagGauge = new prometheus.Gauge({
name: 'projection_lag_seconds',
help: 'Projection lag in seconds',
labelNames: ['projection_name']
})
// 메트릭 추적
eventProcessedCounter.inc({ event_type: 'FundsDeposited', status: 'success' })
const lag = (Date.now() - lastEventTimestamp) / 1000
projectionLagGauge.set({ projection_name: 'AccountBalance' }, lag)
Event Sourcing을 사용하지 말아야 할 때
Event Sourcing이 항상 올바른 선택은 아닙니다:
** 사용하지 말아야 할 때:**
- 단순한 CRUD 애플리케이션
- 낮은 컴플라이언스 요구사항
- 패턴에 익숙하지 않은 작은 팀
- 감사 추적 필요 없음
- 빠른 프로토타입
** 사용해야 할 때:**
- 감사 추적이 중요함 (금융, 헬스케어)
- 복잡한 비즈니스 로직
- 히스토리 데이터 분석 필요
- 시간적 쿼리 필요
- 이벤트 주도 시스템 구축
결론
Event Sourcing과 CQRS는 확장 가능하고 감사 가능한 시스템을 구축하는 강력한 기능을 제공하지만, 복잡성이 따릅니다. 성공하려면 패턴과 프로덕션 문제를 모두 이해해야 합니다.
핵심 요점:
- 간단하게 시작: 하나의 bounded context부터 시작
- 스냅샷은 필수: 수천 개의 이벤트를 로드하지 마세요
- 스키마 진화 처리: 이벤트 upcasting 계획
- 프로젝션 모니터링: 지연과 실패 추적
- 철저한 테스트: 특히 동시성 시나리오
이러한 패턴과 솔루션을 따르면, 데이터 무결성과 성능을 유지하면서 수백만 개의 이벤트로 확장되는 프로덕션 준비 이벤트 소싱 시스템을 구축할 수 있습니다.