본문으로 건너뛰기
Elasticsearch CRUD와 Bulk API 가이드
검색 엔진 / · PT3M read

Elasticsearch CRUD: 문서 색인과 Bulk API 최적화

100만 건 데이터, 어떻게 넣을까?

MySQL에서 Elasticsearch로 100만 개의 상품 데이터를 마이그레이션해야 했다. 단건 API로 하나씩 넣으면?

# 나쁜 예: 단건 처리
for product in products:  # 100만 건
    es.index(index="products", document=product)
    # 각 요청마다 HTTP 오버헤드 + refresh 대기

예상 소요 시간: 약 28시간 (초당 10건 기준)

Bulk API를 사용하면?

# 좋은 예: 벌크 처리
from elasticsearch.helpers import bulk
bulk(es, products, index="products", chunk_size=5000)

예상 소요 시간: 약 15분 (초당 1,100건 기준)

이 글에서는 Elasticsearch의 CRUD 기본 동작과 대량 데이터 처리 최적화 전략을 알아본다.

문서(Document) CRUD 기본

Create: 문서 생성

자동 ID 생성:

curl -X POST "localhost:9200/products/_doc" -H "Content-Type: application/json" -d'
{
  "name": "블루투스 이어폰",
  "price": 49900,
  "category": "전자기기"
}'
{
  "_index": "products",
  "_id": "abc123xyz",  // 자동 생성된 ID
  "_version": 1,
  "result": "created",
  "_primary_term": 1,
  "_seq_no": 0
}

지정 ID 사용:

curl -X PUT "localhost:9200/products/_doc/prod-001" -H "Content-Type: application/json" -d'
{
  "name": "블루투스 이어폰",
  "price": 49900,
  "category": "전자기기"
}'

중복 방지 (create only):

# op_type=create: 이미 존재하면 오류
curl -X PUT "localhost:9200/products/_doc/prod-001?op_type=create" \
  -H "Content-Type: application/json" -d'
{
  "name": "블루투스 이어폰"
}'

# 또는 _create 엔드포인트 사용
curl -X PUT "localhost:9200/products/_create/prod-001" \
  -H "Content-Type: application/json" -d'
{
  "name": "블루투스 이어폰"
}'

Read: 문서 조회

단건 조회:

curl -X GET "localhost:9200/products/_doc/prod-001"
{
  "_index": "products",
  "_id": "prod-001",
  "_version": 1,
  "found": true,
  "_source": {
    "name": "블루투스 이어폰",
    "price": 49900,
    "category": "전자기기"
  }
}

특정 필드만 조회:

curl -X GET "localhost:9200/products/_doc/prod-001?_source=name,price"

# 또는 _source_includes / _source_excludes
curl -X GET "localhost:9200/products/_doc/prod-001?_source_includes=name,price"

존재 여부만 확인:

curl -I "localhost:9200/products/_doc/prod-001"
# 200 OK = 존재, 404 = 없음

다건 조회 (mget):

curl -X GET "localhost:9200/products/_mget" -H "Content-Type: application/json" -d'
{
  "ids": ["prod-001", "prod-002", "prod-003"]
}'

Update: 문서 수정

전체 교체 (PUT):

curl -X PUT "localhost:9200/products/_doc/prod-001" -H "Content-Type: application/json" -d'
{
  "name": "블루투스 이어폰 프로",
  "price": 69900,
  "category": "전자기기"
}'

_version이 증가한다.

부분 업데이트 (POST _update):

curl -X POST "localhost:9200/products/_update/prod-001" -H "Content-Type: application/json" -d'
{
  "doc": {
    "price": 59900,
    "on_sale": true
  }
}'

스크립트 업데이트:

# 가격 10% 할인
curl -X POST "localhost:9200/products/_update/prod-001" -H "Content-Type: application/json" -d'
{
  "script": {
    "source": "ctx._source.price = (int)(ctx._source.price * 0.9)"
  }
}'

# 재고 감소 (조건부)
curl -X POST "localhost:9200/products/_update/prod-001" -H "Content-Type: application/json" -d'
{
  "script": {
    "source": """
      if (ctx._source.stock > 0) {
        ctx._source.stock -= params.quantity;
      } else {
        ctx.op = 'noop';
      }
    """,
    "params": { "quantity": 1 }
  }
}'

Upsert (없으면 생성, 있으면 수정):

curl -X POST "localhost:9200/products/_update/prod-999" -H "Content-Type: application/json" -d'
{
  "doc": {
    "view_count": 1
  },
  "upsert": {
    "name": "새 상품",
    "price": 10000,
    "view_count": 1
  }
}'

Delete: 문서 삭제

단건 삭제:

curl -X DELETE "localhost:9200/products/_doc/prod-001"

쿼리로 삭제 (delete by query):

# 비활성 상품 모두 삭제
curl -X POST "localhost:9200/products/_delete_by_query" -H "Content-Type: application/json" -d'
{
  "query": {
    "term": { "is_active": false }
  }
}'

인덱스 전체 삭제:

curl -X DELETE "localhost:9200/products"

Bulk API: 대량 처리의 핵심

기본 형식

curl -X POST "localhost:9200/_bulk" -H "Content-Type: application/x-ndjson" -d'
{"index":{"_index":"products","_id":"1"}}
{"name":"상품1","price":10000}
{"index":{"_index":"products","_id":"2"}}
{"name":"상품2","price":20000}
{"update":{"_index":"products","_id":"1"}}
{"doc":{"price":15000}}
{"delete":{"_index":"products","_id":"2"}}
'

주의:

  • 각 줄은 JSON 한 줄 (NDJSON 형식)
  • 마지막에 빈 줄(newline) 필수
  • Content-Type: application/x-ndjson

지원되는 액션

액션설명본문 필요
index생성 또는 교체
create생성만 (있으면 오류)
update부분 업데이트✅ (doc 또는 script)
delete삭제

Python 클라이언트 활용

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, streaming_bulk

es = Elasticsearch(["http://localhost:9200"])

# 방법 1: bulk helper
def generate_actions():
    for i, product in enumerate(products):
        yield {
            "_index": "products",
            "_id": product["id"],
            "_source": product
        }

success, failed = bulk(es, generate_actions(), chunk_size=5000)
print(f"성공: {success}, 실패: {len(failed)}")

# 방법 2: streaming_bulk (메모리 효율)
for ok, result in streaming_bulk(es, generate_actions(), chunk_size=5000):
    if not ok:
        print(f"실패: {result}")

Bulk 응답 처리

{
  "took": 30,
  "errors": true,
  "items": [
    {
      "index": {
        "_index": "products",
        "_id": "1",
        "status": 201,
        "result": "created"
      }
    },
    {
      "index": {
        "_index": "products",
        "_id": "2",
        "status": 429,
        "error": {
          "type": "es_rejected_execution_exception",
          "reason": "rejected execution..."
        }
      }
    }
  ]
}

errors: true면 개별 항목 확인 필요!

# 실패 항목 재시도
response = es.bulk(body=bulk_body)
if response["errors"]:
    for item in response["items"]:
        if "error" in item.get("index", item.get("update", {})):
            # 재시도 로직
            pass

Bulk API 최적화

최적 Bulk 크기

요소권장값
문서 수1,000 ~ 10,000개/요청
페이로드 크기5MB ~ 15MB/요청
동시 요청2 ~ 4개 (클러스터 크기에 따라)
# 최적 크기 찾기 테스트
import time

for chunk_size in [100, 500, 1000, 5000, 10000]:
    start = time.time()
    bulk(es, generate_actions(), chunk_size=chunk_size)
    elapsed = time.time() - start
    print(f"chunk_size={chunk_size}: {elapsed:.2f}s")

refresh 전략

기본적으로 Elasticsearch는 1초마다 refresh하여 검색 가능 상태로 만든다.

대량 색인 시 refresh 비활성화:

# 색인 전
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
  "index": { "refresh_interval": "-1" }
}'

# 대량 색인 수행...

# 색인 후 복원 및 수동 refresh
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
  "index": { "refresh_interval": "1s" }
}'
curl -X POST "localhost:9200/products/_refresh"

Bulk 요청의 refresh 옵션:

# 즉시 refresh (느림, 개별 요청용)
curl -X POST "localhost:9200/_bulk?refresh=true" ...

# wait_for: refresh 될 때까지 대기
curl -X POST "localhost:9200/_bulk?refresh=wait_for" ...

# false (기본): refresh 안 기다림
curl -X POST "localhost:9200/_bulk?refresh=false" ...

replica 일시 비활성화

대량 색인 시 replica 동기화 비용을 줄일 수 있다.

# replica 0으로 변경
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
  "index": { "number_of_replicas": 0 }
}'

# 대량 색인 수행...

# replica 복원
curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
  "index": { "number_of_replicas": 1 }
}'

스레드풀 모니터링

curl -X GET "localhost:9200/_cat/thread_pool/write?v"
node_name name  active queue rejected
node-1    write 4      0     0
node-2    write 3      50    10  # 큐 적체, 거부 발생!

rejected가 발생하면:

  • Bulk 크기 줄이기
  • 동시 요청 수 줄이기
  • 클러스터 확장 고려

색인 성능 최적화 설정

curl -X PUT "localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
  "index": {
    "refresh_interval": "30s",
    "number_of_replicas": 0,
    "translog": {
      "durability": "async",
      "sync_interval": "30s"
    }
  }
}'
설정효과트레이드오프
refresh_interval: 30srefresh 빈도 감소검색 지연
number_of_replicas: 0복제 비용 제거가용성 감소
translog.durability: asyncfsync 비용 감소장애 시 데이터 유실 가능

색인 완료 후 반드시 복원!

버전 관리와 동시성

낙관적 동시성 제어

# 현재 문서 조회
curl -X GET "localhost:9200/products/_doc/prod-001"
# _seq_no: 5, _primary_term: 1

# 버전 기반 업데이트
curl -X PUT "localhost:9200/products/_doc/prod-001?if_seq_no=5&if_primary_term=1" \
  -H "Content-Type: application/json" -d'
{
  "name": "블루투스 이어폰",
  "price": 59900
}'

다른 프로세스가 먼저 수정했다면 409 Conflict 발생.

재시도 패턴

from elasticsearch import ConflictError

def update_with_retry(es, index, doc_id, update_fn, max_retries=3):
    for attempt in range(max_retries):
        try:
            doc = es.get(index=index, id=doc_id)
            updated = update_fn(doc["_source"])

            es.index(
                index=index,
                id=doc_id,
                document=updated,
                if_seq_no=doc["_seq_no"],
                if_primary_term=doc["_primary_term"]
            )
            return True
        except ConflictError:
            if attempt == max_retries - 1:
                raise
            continue
    return False

# 사용
def increase_price(doc):
    doc["price"] = int(doc["price"] * 1.1)
    return doc

update_with_retry(es, "products", "prod-001", increase_price)

라우팅(Routing)

기본 라우팅

문서 ID의 해시값으로 샤드 결정:

shard = hash(_routing) % number_of_shards

기본값: _routing = _id

커스텀 라우팅

# 같은 사용자의 주문을 같은 샤드에
curl -X PUT "localhost:9200/orders/_doc/order-001?routing=user-123" \
  -H "Content-Type: application/json" -d'
{
  "user_id": "user-123",
  "product": "블루투스 이어폰",
  "amount": 49900
}'

# 조회 시에도 routing 필요
curl -X GET "localhost:9200/orders/_doc/order-001?routing=user-123"

# 검색 시 특정 샤드만 조회 (성능 향상)
curl -X GET "localhost:9200/orders/_search?routing=user-123" \
  -H "Content-Type: application/json" -d'
{
  "query": { "term": { "user_id": "user-123" } }
}'

라우팅 필수 설정

curl -X PUT "localhost:9200/orders" -H "Content-Type: application/json" -d'
{
  "mappings": {
    "_routing": {
      "required": true
    }
  }
}'

routing 없이 요청하면 오류 발생.

실전: 대량 마이그레이션 스크립트

import time
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
import psycopg2

# 설정
ES_HOST = "http://localhost:9200"
PG_CONN = "postgresql://user:pass@localhost/mydb"
INDEX_NAME = "products"
BATCH_SIZE = 5000

def create_index(es):
    """인덱스 생성 (최적화 설정)"""
    es.indices.create(
        index=INDEX_NAME,
        body={
            "settings": {
                "number_of_shards": 3,
                "number_of_replicas": 0,  # 색인 중 비활성화
                "refresh_interval": "-1"   # 색인 중 비활성화
            },
            "mappings": {
                "properties": {
                    "name": {"type": "text", "analyzer": "korean"},
                    "price": {"type": "integer"},
                    "category": {"type": "keyword"},
                    "created_at": {"type": "date"}
                }
            }
        },
        ignore=400  # 이미 존재해도 무시
    )

def generate_docs():
    """PostgreSQL에서 데이터 스트리밍"""
    conn = psycopg2.connect(PG_CONN)
    cursor = conn.cursor(name="products_cursor")  # 서버 사이드 커서

    cursor.execute("SELECT id, name, price, category, created_at FROM products")

    for row in cursor:
        yield {
            "_index": INDEX_NAME,
            "_id": str(row[0]),
            "_source": {
                "name": row[1],
                "price": row[2],
                "category": row[3],
                "created_at": row[4].isoformat() if row[4] else None
            }
        }

    cursor.close()
    conn.close()

def optimize_after_indexing(es):
    """색인 후 최적화"""
    # refresh 활성화
    es.indices.put_settings(
        index=INDEX_NAME,
        body={"index": {"refresh_interval": "1s"}}
    )

    # 수동 refresh
    es.indices.refresh(index=INDEX_NAME)

    # replica 활성화
    es.indices.put_settings(
        index=INDEX_NAME,
        body={"index": {"number_of_replicas": 1}}
    )

    # force merge (세그먼트 병합)
    es.indices.forcemerge(index=INDEX_NAME, max_num_segments=1)

def main():
    es = Elasticsearch([ES_HOST])

    # 1. 인덱스 생성
    create_index(es)
    print("인덱스 생성 완료")

    # 2. 대량 색인
    start = time.time()
    success = 0
    failed = 0

    for ok, result in streaming_bulk(
        es,
        generate_docs(),
        chunk_size=BATCH_SIZE,
        raise_on_error=False
    ):
        if ok:
            success += 1
        else:
            failed += 1
            print(f"실패: {result}")

        if (success + failed) % 10000 == 0:
            print(f"진행: {success + failed}건 처리됨")

    elapsed = time.time() - start
    print(f"색인 완료: 성공 {success}, 실패 {failed}")
    print(f"소요 시간: {elapsed:.2f}초 ({success/elapsed:.0f}건/초)")

    # 3. 최적화
    optimize_after_indexing(es)
    print("최적화 완료")

if __name__ == "__main__":
    main()

OpenSearch 차이점

CRUD와 Bulk API는 Elasticsearch와 OpenSearch에서 동일하게 동작한다.

주요 동일점

  • 동일한 API 엔드포인트
  • 동일한 Bulk 형식 (NDJSON)
  • 동일한 버전 관리 방식

차이점

기능ElasticsearchOpenSearch
기본 인증8.x부터 필수기본 활성화
Bulk 스레드풀writebulk (이름만 다름)

체크리스트

단건 처리

  • 적절한 ID 전략 선택 (자동 vs 지정)
  • 중복 방지 필요 시 op_type=create
  • 부분 업데이트는 _update 사용

대량 처리

  • Bulk API 사용 (단건 API 금지)
  • 최적 chunk size 테스트 (1,000 ~ 10,000)
  • refresh_interval 비활성화
  • replica 일시 비활성화
  • 색인 후 최적화 (refresh, replica, force merge)

동시성

  • 버전 충돌 처리 (if_seq_no, if_primary_term)
  • 재시도 로직 구현

마무리

이 글에서 다룬 핵심 내용:

  1. CRUD 기본: index, get, update, delete API
  2. Bulk API: NDJSON 형식, 액션 타입
  3. 최적화: refresh, replica, translog 설정
  4. 동시성: 낙관적 잠금, 버전 관리
  5. 라우팅: 커스텀 라우팅으로 성능 향상

Part 1 “기초 및 아키텍처”가 완료되었다. 다음 Part 2에서는 분석기와 텍스트 처리를 다룬다. 한국어 검색을 위한 Nori 분석기, 자동완성을 위한 edge_ngram 등 검색 품질을 높이는 핵심 기술을 알아본다.

참고 자료

My avatar

글을 마치며

이 글이 도움이 되었기를 바랍니다. 궁금한 점이나 의견이 있다면 댓글로 남겨주세요.

더 많은 기술 인사이트와 개발 경험을 공유하고 있으니, 다른 포스트도 확인해보세요.

유럽살며 여행하며 코딩하는 노마드의 여정을 함께 나누며, 함께 성장하는 개발자 커뮤니티를 만들어가요! 🚀


Elasticsearch 검색 엔진 마스터 시리즈
Elasticsearch 검색 엔진 입문 가이드

Elasticsearch 입문: 검색 엔진이 필요한 이유

RDBMS의 LIKE 검색이 왜 프로덕션에서 문제가 되는지, 역인덱스가 무엇인지, 그리고 Elasticsearch가 어떻게 이 문제를 해결하는지 실제 장애 사례와 함께 알아봅니다.

백엔드 데이터베이스 프로덕션 +3
Elasticsearch 매핑과 필드 타입 가이드

Elasticsearch 매핑: 필드 타입과 스키마 설계

Elasticsearch의 매핑을 이해하고 올바른 필드 타입을 선택하는 방법을 알아봅니다. text vs keyword, 동적 매핑의 함정, 매핑 폭발 방지, 중첩 객체 처리까지 실무 설계 패턴을 다룹니다.

Elasticsearch OpenSearch 검색엔진 +3