본문 바로가기

Elastic Search

[ElasticSearch] Bulk Request에서의 Error Handling

오늘은 Elastic Search의 Bulk Request를 사용하며 살짝 헤맸던 사소한 경험을 공유하겠습니다. Elastic Search의 Bulk API는 하나의 요청으로 여러 명령(index, create, delete, update)을 수행하고 싶을 때 사용할 수 있으며, 작은 단건 Request들을 하나의 Bulk Request로 묶어 수행함으로써 인덱싱 속도를 높이고 다양한 오버헤드를 줄일 수 있습니다. "몇 개의 단건 Request들을 하나의 Bulk Request로 묶을 것인가"는 정답은 없고 다양한 시도를 통해 최적화된 사이즈를 찾아야 합니다. 이때, 수십 메가바이트를 초과하는 너무 큰 대량 요청은 클러스터의 과도한 메모리 압력을 유발할 수 있으므로 조심해야 합니다.

 

회사에서 수많은 데이터들을 ElasticSearch에 인덱싱하는 배치 작업이 필요했고 Bulk Request를 사용했습니다. 대략적인 코드를 요약하면 다음과 같습니다.

 

const bulkBody = [
  { index: { _index: "indexName", _id: 1 } },
  { date: '2021-05-04 00:00:00' },
  { index: { _index: "indexName", _id: 2 } },
  { date: '2010-10-21 00:00:00' },
  { index: { _index: "indexName", _id: 3 } },
  { date: '0000-00-00 00:00:00' },
]

try {
  bulkResponse = await client.bulk({ refresh: true, body: bulkBody });
} catch (error) {
  console.log('Error occured while bulk request');
}

 

Date 타입의 date 필드들을 가지고 있는 도큐먼트들을 인덱싱하는 복수 개의 index 명령들을 Bulk Request를 사용해 하나의 요청으로 처리합니다. 또, Bulk Request 중 예외가 발생할 경우 try...catch를 사용해 'Error occured while bulk request'를 출력해 줍니다. 실제 위 Bulk Request에서 3번 도큐먼트 관련 요청은 date인 '0000-00-00 00:00:00'의 년, 월, 일 정보가 1 보다 작기 때문에 정상적으로 처리되지 못합니다. 과연 위 코드는 Bulk Request 처리 중 예외가 발생했을 때 의도된 바와 같이 'Error occured while bulk request'를 출력할 수 있을까요?

 

정답은 NO 입니다. 'Error occured while bulk request'은 출력되지 못하지만 Count API를 사용해 정상적으로 저장된 총 도큐먼트 개수를 출력해보면 3이 아닌 2를 확인할 수 있습니다. 저는 try...catch를 믿고 모든 각 단 건의 Request에 대해 정상적으로 인덱싱이 처리됐다고 생각했지만 누락된 도큐먼트들이 존재했습니다. 여기서 도큐먼트 누락의 원인을 찾기 위해 엉뚱한 곳을 쑤시기 시작했고 무식하면 몸이 고생한다더니 1시간 정도를 디버깅했던 것 같습니다.

 

저는 ElasticSearch의 get, index, update 등 Single documents APIs과 같이 Bulk Request 또한 처리 중 "실패한 명령이 있을 경우 2xx이 아닌 HTTP Status Code를 반환"할 것이라 확인없이 믿고 있었고, 이는 잘못된 생각이었습니다. Bulk Request는 처리 중 일부 실패가 발생하더라도, 200 HTTP Status Code를 반환하며 Response Body의 errors 필드를 true로, items 필드에 실행 결과와 함께 원인 등 실패와 관련된 추가적인 정보를 제공합니다. 지금 생각해보면, 앞에서 언급한 명령들과 달리, 복수 개의 명령을 한번에 처리하는 Multi-document APIs인 Bulk Request의 특성상, 처리 중 단 한 개의 실패라도 발생했을 때 2xx가 아닌 HTTP Status Code를 반환하지 않는 것은 당연한 것 같습니다.

 

직접 Kibana Dev Tools와 ElasticSearch REST API를 사용해 확인해보겠습니다. 먼저, Date 타입의 필드를 가지는 도큐먼트를 저장할 매핑을 정의해 줍니다.

 

PUT bulk_test
{
  "mappings": {
    "properties": {
      "date_field": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss"
      }
    }
  }
}

 

Single documents API인 Index API를 사용해 유효하지 않은 범위의 날짜('0000-00-00 00:00:00')를 date_field에 담아 인덱싱을 시도해 봅니다.

 

POST bulk_test/_doc/0
{
  "date_field": "0000-00-00 00:00:00"
}

 

 

우측 상단을 보시면 400 HTTP Status Code(Bad Request)를 Exception Type(mapper_parsing_exception)과 함께 반환하므로 일반적인 try...catch를 사용해 예외 발생 여부를 확인할 수 있습니다.

 

이번에는 Multi-documents API인 Bulk API를 사용해 유효하지 않은 범위의 날짜를 인덱싱 해보겠습니다.

 

POST _bulk
{ "index": { "_index": "bulk_test", "_id": 1 } }
{ "date_field": "0001-00-00 00:00:00" }
{ "index": { "_index": "bulk_test", "_id": 2 } }
{ "date_field": "0001-01-01 00:00:00" }

 

 

명령 중 일부 처리가 실패했지만 200 HTTP Status Code(OK)와 함께 Response Body의 errors 필드를 true로, items 필드에 실행 결과와 함께 실패 타입, 원인 관련된 추가적인 정보를 제공하는 것을 확인할 수 있습니다. 이 경우 일반적인 try...catch를 사용해 예외 발생 여부를 확인할 수 없으므로 Response Body 파싱이 필요합니다. 다음은 ElasticSearch 공식 문서의 Node.js Client 예시에 첨부된 Error Handling 코드입니다.

 

  const { body: bulkResponse } = await client.bulk({ refresh: true, body })

  if (bulkResponse.errors) {
    const erroredDocuments = []
    // The items array has the same order of the dataset we just indexed.
    // The presence of the `error` key indicates that the operation
    // that we did for the document has failed.
    bulkResponse.items.forEach((action, i) => {
      const operation = Object.keys(action)[0]
      if (action[operation].error) {
        erroredDocuments.push({
          // If the status is 429 it means that you can retry the document,
          // otherwise it's very likely a mapping error, and you should
          // fix the document before to try it again.
          status: action[operation].status,
          error: action[operation].error,
          operation: body[i * 2],
          document: body[i * 2 + 1]
        })
      }
    })
    console.log(erroredDocuments)
  }

 

Bulk Request의 일부 명령이 실패했다면, Response Body의 errors 필드가 true이고 items 리스트를 순회하며 에러의 타입과 원인 등을 얻을 수 있습니다. (이때, items 리스트는 요청한 Bulk Request의 명령 순서대로 반환됩니다.)  예시에서는 반환받은 에러 정보를 단순히 출력하지만, 이를 적절히 활용한다면 상황에 맞는 Error Handling을 할 수 있습니다.

 

처음 인덱싱에서 누락된 도큐먼트를 발견했을 때 뇌피셜로 덤비지 말고 공식 문서만 잘 확인했어도 누락의 원인을 찾기 위해 1시간 가까이 삽질을 하지 않았을 것 같습니다. 저와 비슷한 실수를 하신 분들께 조금이라도 도움이 되셨으면 좋겠습니다. 혹시 잘못된 내용이 있을 경우 언제나 댓글로 알려주시면 감사히 배우고 수정하도록 하겠습니다! 오늘도 읽어주셔서 감사합니다 😊 

 

개발 환경

- ElasticSearch 7.12.0

- Kibana 7.12.0

 

참고 문서

- ElasticSearch Bulk API