Elasticsearch CRUD: 문서 색인과 Bulk API 최적화
100만 건 데이터, 어떻게 넣을까?
MySQL에서 Elasticsearch로 100만 개의 상품 데이터를 마이그레이션해야 했다. 단건 API로 하나씩 넣으면?
# 나쁜 예: 단건 처리
for product in products: # 100만 건
es.index(index="products", document=product)
# 각 요청마다 HTTP 오버헤드 + refresh 대기
예상 소요 시간: 약 28시간 (초당 10건 기준)
Bulk API를 사용하면?
# 좋은 예: 벌크 처리
from elasticsearch.helpers import bulk
bulk(es, products, index="products", chunk_size=5000)
예상 소요 시간: 약 15분 (초당 1,100건 기준)
이 글에서는 Elasticsearch의 CRUD 기본 동작과 대량 데이터 처리 최적화 전략을 알아본다.
문서(Document) CRUD 기본
Create: 문서 생성
자동 ID 생성:
curl -X POST "localhost:9200/products/_doc" -H "Content-Type: application/json" -d'
{
"name": "블루투스 이어폰",
"price": 49900,
"category": "전자기기"
}'
{
"_index": "products",
"_id": "abc123xyz", // 자동 생성된 ID
"_version": 1,
"result": "created",
"_primary_term": 1,
"_seq_no": 0
}
지정 ID 사용:
curl -X PUT "localhost:9200/products/_doc/prod-001" -H "Content-Type: application/json" -d'
{
"name": "블루투스 이어폰",
"price": 49900,
"category": "전자기기"
}'
중복 방지 (create only):
# op_type=create: 이미 존재하면 오류
curl -X PUT "localhost:9200/products/_doc/prod-001?op_type=create" \
-H "Content-Type: application/json" -d'
{
"name": "블루투스 이어폰"
}'
# 또는 _create 엔드포인트 사용
curl -X PUT "localhost:9200/products/_create/prod-001" \
-H "Content-Type: application/json" -d'
{
"name": "블루투스 이어폰"
}'
Read: 문서 조회
단건 조회:
curl -X GET "localhost:9200/products/_doc/prod-001"
{
"_index": "products",
"_id": "prod-001",
"_version": 1,
"found": true,
"_source": {
"name": "블루투스 이어폰",
"price": 49900,
"category": "전자기기"
}
}
특정 필드만 조회:
curl -X GET "localhost:9200/products/_doc/prod-001?_source=name,price"
# 또는 _source_includes / _source_excludes
curl -X GET "localhost:9200/products/_doc/prod-001?_source_includes=name,price"
존재 여부만 확인:
curl -I "localhost:9200/products/_doc/prod-001"
# 200 OK = 존재, 404 = 없음
다건 조회 (mget):
curl -X GET "localhost:9200/products/_mget" -H "Content-Type: application/json" -d'
{
"ids": ["prod-001", "prod-002", "prod-003"]
}'
Update: 문서 수정
전체 교체 (PUT):
curl -X PUT "localhost:9200/products/_doc/prod-001" -H "Content-Type: application/json" -d'
{
"name": "블루투스 이어폰 프로",
"price": 69900,
"category": "전자기기"
}'
_version이 증가한다.
부분 업데이트 (POST _update):
curl -X POST "localhost:9200/products/_update/prod-001" -H "Content-Type: application/json" -d'
{
"doc": {
"price": 59900,
"on_sale": true
}
}'
스크립트 업데이트:
# 가격 10% 할인
curl -X POST "localhost:9200/products/_update/prod-001" -H "Content-Type: application/json" -d'
{
"script": {
"source": "ctx._source.price = (int)(ctx._source.price * 0.9)"
}
}'
# 재고 감소 (조건부)
curl -X POST "localhost:9200/products/_update/prod-001" -H "Content-Type: application/json" -d'
{
"script": {
"source": """
if (ctx._source.stock > 0) {
ctx._source.stock -= params.quantity;
} else {
ctx.op = 'noop';
}
""",
"params": { "quantity": 1 }
}
}'
Upsert (없으면 생성, 있으면 수정):
curl -X POST "localhost:9200/products/_update/prod-999" -H "Content-Type: application/json" -d'
{
"doc": {
"view_count": 1
},
"upsert": {
"name": "새 상품",
"price": 10000,
"view_count": 1
}
}'
Delete: 문서 삭제
단건 삭제:
curl -X DELETE "localhost:9200/products/_doc/prod-001"
쿼리로 삭제 (delete by query):
# 비활성 상품 모두 삭제
curl -X POST "localhost:9200/products/_delete_by_query" -H "Content-Type: application/json" -d'
{
"query": {
"term": { "is_active": false }
}
}'
인덱스 전체 삭제:
curl -X DELETE "localhost:9200/products"
Bulk API: 대량 처리의 핵심
기본 형식
curl -X POST "localhost:9200/_bulk" -H "Content-Type: application/x-ndjson" -d'
{"index":{"_index":"products","_id":"1"}}
{"name":"상품1","price":10000}
{"index":{"_index":"products","_id":"2"}}
{"name":"상품2","price":20000}
{"update":{"_index":"products","_id":"1"}}
{"doc":{"price":15000}}
{"delete":{"_index":"products","_id":"2"}}
'
주의:
- 각 줄은 JSON 한 줄 (NDJSON 형식)
- 마지막에 빈 줄(newline) 필수
- Content-Type:
application/x-ndjson
지원되는 액션
| 액션 | 설명 | 본문 필요 |
|---|---|---|
index | 생성 또는 교체 | ✅ |
create | 생성만 (있으면 오류) | ✅ |
update | 부분 업데이트 | ✅ (doc 또는 script) |
delete | 삭제 | ❌ |
Python 클라이언트 활용
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, streaming_bulk
es = Elasticsearch(["http://localhost:9200"])
# 방법 1: bulk helper
def generate_actions():
for i, product in enumerate(products):
yield {
"_index": "products",
"_id": product["id"],
"_source": product
}
success, failed = bulk(es, generate_actions(), chunk_size=5000)
print(f"성공: {success}, 실패: {len(failed)}")
# 방법 2: streaming_bulk (메모리 효율)
for ok, result in streaming_bulk(es, generate_actions(), chunk_size=5000):
if not ok:
print(f"실패: {result}")
Bulk 응답 처리
{
"took": 30,
"errors": true,
"items": [
{
"index": {
"_index": "products",
"_id": "1",
"status": 201,
"result": "created"
}
},
{
"index": {
"_index": "products",
"_id": "2",
"status": 429,
"error": {
"type": "es_rejected_execution_exception",
"reason": "rejected execution..."
}
}
}
]
}
errors: true면 개별 항목 확인 필요!
# 실패 항목 재시도
response = es.bulk(body=bulk_body)
if response["errors"]:
for item in response["items"]:
if "error" in item.get("index", item.get("update", {})):
# 재시도 로직
pass
Bulk API 최적화
최적 Bulk 크기
| 요소 | 권장값 |
|---|---|
| 문서 수 | 1,000 ~ 10,000개/요청 |
| 페이로드 크기 | 5MB ~ 15MB/요청 |
| 동시 요청 | 2 ~ 4개 (클러스터 크기에 따라) |
# 최적 크기 찾기 테스트
import time
for chunk_size in [100, 500, 1000, 5000, 10000]:
start = time.time()
bulk(es, generate_actions(), chunk_size=chunk_size)
elapsed = time.time() - start
print(f"chunk_size={chunk_size}: {elapsed:.2f}s")
refresh 전략
기본적으로 Elasticsearch는 1초마다 refresh하여 검색 가능 상태로 만든다.
대량 색인 시 refresh 비활성화:
# 색인 전
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
"index": { "refresh_interval": "-1" }
}'
# 대량 색인 수행...
# 색인 후 복원 및 수동 refresh
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
"index": { "refresh_interval": "1s" }
}'
curl -X POST "localhost:9200/products/_refresh"
Bulk 요청의 refresh 옵션:
# 즉시 refresh (느림, 개별 요청용)
curl -X POST "localhost:9200/_bulk?refresh=true" ...
# wait_for: refresh 될 때까지 대기
curl -X POST "localhost:9200/_bulk?refresh=wait_for" ...
# false (기본): refresh 안 기다림
curl -X POST "localhost:9200/_bulk?refresh=false" ...
replica 일시 비활성화
대량 색인 시 replica 동기화 비용을 줄일 수 있다.
# replica 0으로 변경
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
"index": { "number_of_replicas": 0 }
}'
# 대량 색인 수행...
# replica 복원
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
"index": { "number_of_replicas": 1 }
}'
스레드풀 모니터링
curl -X GET "localhost:9200/_cat/thread_pool/write?v"
node_name name active queue rejected
node-1 write 4 0 0
node-2 write 3 50 10 # 큐 적체, 거부 발생!
rejected가 발생하면:
- Bulk 크기 줄이기
- 동시 요청 수 줄이기
- 클러스터 확장 고려
색인 성능 최적화 설정
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
"index": {
"refresh_interval": "30s",
"number_of_replicas": 0,
"translog": {
"durability": "async",
"sync_interval": "30s"
}
}
}'
| 설정 | 효과 | 트레이드오프 |
|---|---|---|
refresh_interval: 30s | refresh 빈도 감소 | 검색 지연 |
number_of_replicas: 0 | 복제 비용 제거 | 가용성 감소 |
translog.durability: async | fsync 비용 감소 | 장애 시 데이터 유실 가능 |
색인 완료 후 반드시 복원!
버전 관리와 동시성
낙관적 동시성 제어
# 현재 문서 조회
curl -X GET "localhost:9200/products/_doc/prod-001"
# _seq_no: 5, _primary_term: 1
# 버전 기반 업데이트
curl -X PUT "localhost:9200/products/_doc/prod-001?if_seq_no=5&if_primary_term=1" \
-H "Content-Type: application/json" -d'
{
"name": "블루투스 이어폰",
"price": 59900
}'
다른 프로세스가 먼저 수정했다면 409 Conflict 발생.
재시도 패턴
from elasticsearch import ConflictError
def update_with_retry(es, index, doc_id, update_fn, max_retries=3):
for attempt in range(max_retries):
try:
doc = es.get(index=index, id=doc_id)
updated = update_fn(doc["_source"])
es.index(
index=index,
id=doc_id,
document=updated,
if_seq_no=doc["_seq_no"],
if_primary_term=doc["_primary_term"]
)
return True
except ConflictError:
if attempt == max_retries - 1:
raise
continue
return False
# 사용
def increase_price(doc):
doc["price"] = int(doc["price"] * 1.1)
return doc
update_with_retry(es, "products", "prod-001", increase_price)
라우팅(Routing)
기본 라우팅
문서 ID의 해시값으로 샤드 결정:
shard = hash(_routing) % number_of_shards
기본값: _routing = _id
커스텀 라우팅
# 같은 사용자의 주문을 같은 샤드에
curl -X PUT "localhost:9200/orders/_doc/order-001?routing=user-123" \
-H "Content-Type: application/json" -d'
{
"user_id": "user-123",
"product": "블루투스 이어폰",
"amount": 49900
}'
# 조회 시에도 routing 필요
curl -X GET "localhost:9200/orders/_doc/order-001?routing=user-123"
# 검색 시 특정 샤드만 조회 (성능 향상)
curl -X GET "localhost:9200/orders/_search?routing=user-123" \
-H "Content-Type: application/json" -d'
{
"query": { "term": { "user_id": "user-123" } }
}'
라우팅 필수 설정
curl -X PUT "localhost:9200/orders" -H "Content-Type: application/json" -d'
{
"mappings": {
"_routing": {
"required": true
}
}
}'
routing 없이 요청하면 오류 발생.
실전: 대량 마이그레이션 스크립트
import time
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
import psycopg2
# 설정
ES_HOST = "http://localhost:9200"
PG_CONN = "postgresql://user:pass@localhost/mydb"
INDEX_NAME = "products"
BATCH_SIZE = 5000
def create_index(es):
"""인덱스 생성 (최적화 설정)"""
es.indices.create(
index=INDEX_NAME,
body={
"settings": {
"number_of_shards": 3,
"number_of_replicas": 0, # 색인 중 비활성화
"refresh_interval": "-1" # 색인 중 비활성화
},
"mappings": {
"properties": {
"name": {"type": "text", "analyzer": "korean"},
"price": {"type": "integer"},
"category": {"type": "keyword"},
"created_at": {"type": "date"}
}
}
},
ignore=400 # 이미 존재해도 무시
)
def generate_docs():
"""PostgreSQL에서 데이터 스트리밍"""
conn = psycopg2.connect(PG_CONN)
cursor = conn.cursor(name="products_cursor") # 서버 사이드 커서
cursor.execute("SELECT id, name, price, category, created_at FROM products")
for row in cursor:
yield {
"_index": INDEX_NAME,
"_id": str(row[0]),
"_source": {
"name": row[1],
"price": row[2],
"category": row[3],
"created_at": row[4].isoformat() if row[4] else None
}
}
cursor.close()
conn.close()
def optimize_after_indexing(es):
"""색인 후 최적화"""
# refresh 활성화
es.indices.put_settings(
index=INDEX_NAME,
body={"index": {"refresh_interval": "1s"}}
)
# 수동 refresh
es.indices.refresh(index=INDEX_NAME)
# replica 활성화
es.indices.put_settings(
index=INDEX_NAME,
body={"index": {"number_of_replicas": 1}}
)
# force merge (세그먼트 병합)
es.indices.forcemerge(index=INDEX_NAME, max_num_segments=1)
def main():
es = Elasticsearch([ES_HOST])
# 1. 인덱스 생성
create_index(es)
print("인덱스 생성 완료")
# 2. 대량 색인
start = time.time()
success = 0
failed = 0
for ok, result in streaming_bulk(
es,
generate_docs(),
chunk_size=BATCH_SIZE,
raise_on_error=False
):
if ok:
success += 1
else:
failed += 1
print(f"실패: {result}")
if (success + failed) % 10000 == 0:
print(f"진행: {success + failed}건 처리됨")
elapsed = time.time() - start
print(f"색인 완료: 성공 {success}, 실패 {failed}")
print(f"소요 시간: {elapsed:.2f}초 ({success/elapsed:.0f}건/초)")
# 3. 최적화
optimize_after_indexing(es)
print("최적화 완료")
if __name__ == "__main__":
main()
OpenSearch 차이점
CRUD와 Bulk API는 Elasticsearch와 OpenSearch에서 동일하게 동작한다.
주요 동일점
- 동일한 API 엔드포인트
- 동일한 Bulk 형식 (NDJSON)
- 동일한 버전 관리 방식
차이점
| 기능 | Elasticsearch | OpenSearch |
|---|---|---|
| 기본 인증 | 8.x부터 필수 | 기본 활성화 |
| Bulk 스레드풀 | write | bulk (이름만 다름) |
체크리스트
단건 처리
- 적절한 ID 전략 선택 (자동 vs 지정)
- 중복 방지 필요 시
op_type=create - 부분 업데이트는
_update사용
대량 처리
- Bulk API 사용 (단건 API 금지)
- 최적 chunk size 테스트 (1,000 ~ 10,000)
- refresh_interval 비활성화
- replica 일시 비활성화
- 색인 후 최적화 (refresh, replica, force merge)
동시성
- 버전 충돌 처리 (if_seq_no, if_primary_term)
- 재시도 로직 구현
마무리
이 글에서 다룬 핵심 내용:
- CRUD 기본: index, get, update, delete API
- Bulk API: NDJSON 형식, 액션 타입
- 최적화: refresh, replica, translog 설정
- 동시성: 낙관적 잠금, 버전 관리
- 라우팅: 커스텀 라우팅으로 성능 향상
Part 1 “기초 및 아키텍처”가 완료되었다. 다음 Part 2에서는 분석기와 텍스트 처리를 다룬다. 한국어 검색을 위한 Nori 분석기, 자동완성을 위한 edge_ngram 등 검색 품질을 높이는 핵심 기술을 알아본다.