[Elasticsearch] Efficient Data Enrichment Using Enrich Processor

Elasticsearch를 쓰다 보면 Index document에 대량으로 새로운 필드를 추가해야 하는 상황이 생긴다.
update-by-query나 bulk update를 시도하기엔 문서 수가 너무 많거나, 문서 하나하나를 업데이트해야 해서 작업 비용이 커지는 경우도 있다.

이런 문제를 해결하기 위해 선택한 기능이 바로 Enrich Processor이다.

Enrich를 사용하면 문서를 인덱싱하는 시점에 다른 인덱스의 참조 데이터를 자동으로 병합할 수 있어서, 복잡한 업데이트 작업 없이 깔끔하게 데이터를 보강할 수 있다.

Enrich란?

Enrich Processor는 인덱싱 과정에서 들어오는 문서를 대상으로, 미리 준비된 참조 데이터(reference data) 를 자동으로 붙이는 기능이다. 한마디로 “문서 저장 전에 실시간 lookup + 자동 join” 을 수행하는 구조이다.

Enrich 기능은 다음 3가지 요소로 구성된다.

Source Index참조 데이터가 들어 있는 인덱스 (예: 제품 정보, 사용자 정보)
Enrich Policy어떤 필드로 매칭하고 어떤 필드를 보강할지 정의
Enrich IndexPolicy 실행 시 생성되는 최적화된 시스템 인덱스 (.enrich-*)

Enrich Processor는 매번 검색을 수행하지 않고, 정적·최적화된 enrich index를 조회하기 때문에 성능 손실 없이 빠르게 보강 작업을 수행한다.

동작 예시

사용자의 이메일을 기반으로 해당 사용자의 기본 정보를 문서에 자동 보강하는 상황을 살펴본다.

1) Source Index 준비

PUT users/_doc/1?refresh=wait_for
{
  "email": "alice@example.com",
  "first_name": "Alice",
  "city": "Seoul"
}

2) Enrich Policy 생성

PUT /_enrich/policy/user-policy
{
  "match": {
    "indices": "users",
    "match_field": "email",
    "enrich_fields": ["first_name", "city"]
  }
}

3) Enrich Policy 실행 → enrich index 생성

POST /_enrich/policy/user-policy/_execute

여기까지 수행하면 .enrich-user-policy-* 형태의 전용 인덱스가 생성된다.

4) Ingest Pipeline 생성

PUT /_ingest/pipeline/user_lookup
{
  "processors": [
    {
      "enrich": {
        "policy_name": "user-policy",
        "field": "email",
        "target_field": "user_info",
        "max_matches": 1
      }
    }
  ]
}

5) 문서 인덱싱

PUT /logs/_doc/1?pipeline=user_lookup
{
  "email": "alice@example.com"
}

// 결과
{
  "email": "alice@example.com",
  "user_info": {
    "first_name": "Alice",
    "city": "Seoul"
  }
}

문서가 인덱싱될 때 자동으로 참조 데이터가 붙는 구조다.
update-by-query 없이 “인덱싱 순간에 데이터가 완성되는” 형태라고 보면 된다.

제한사항

❗ 자주 변경되는 참조 데이터를 Enrich로 처리하는 경우

Enrich Processor는 source index를 직접 조회하지 않고 Enrich Index를 조회하는 구조이다. 따라서 참조 데이터가 변경되면 enrich index를 재생성하는 작업을 반복하게 된다.

참조 데이터가 자주 바뀐다면 재생성 작업을 반복해야 하고, 그만큼 시스템 부하도 증가한다.
특히 대규모 인덱스에서는 재생성 비용이 커져 전체 ingest 성능에 영향을 줄 수 있다.

따라서 Enrich는 자주 변하지 않는 데이터인 경우에 사용하는 것이 적합하다.

[Elasticsearch] Understanding the Difference Between Fielddata and Keyword Fields

Elasticsearch를 사용하다 보면 집계나 정렬을 위해 텍스트 데이터를 처리해야 할 때가 많다. 이때 자주 등장하는 개념이 fielddatakeyword가 있는데 이 둘은 비슷한 목적(정렬, 집계)을 위해 사용되지만, 내부 동작 방식과 성능, 메모리 사용 면에서는 큰 차이가 있다.

fielddata (text 필드)

  • 기본적으로 text 타입은 검색에 적합하도록 분석(analyzed)된 문자열이다
  • 정렬이나 집계를 하기 위해선 fielddata를 사용하여 해당 데이터를 JVM 힙 메모리에 적재 한다.
  • 매우 메모리 집약적이며, 특히 대규모 데이터셋에서는 성능에 부담을 줄 수 있다.
  • 처음 한 번은 데이터를 읽어서 메모리에 올려야 하는데, 이때 리소스 사용량이 많다.

keyword type

    • keyword 타입은 분석되지 않은 raw 문자열로, 주로 정렬, 집계, 필터링에 사용된다.
    • doc values라는 구조로 디스크에 저장된다. 이 값은 인덱싱 시점에 미리 계산(precomputed) 되어 저장되며, 검색 시에는 디스크에서 읽어온다.
    • JVM 힙 메모리를 사용하지 않고, 일반적으로 fielddata보다 성능과 안정성 측면에서 유리하다.

    [Elasticsearch] Collecting RSS information with Logstash

    RSS feed는 웹사이트의 최신 콘텐츠를 효율적으로 수집하고 분석할 수 있는 강력한 도구입니다. Logstash를 사용하면 RSS 데이터를 쉽게 수집하고 처리할 수 있습니다. 이 글에서는 Google News RSS를 Logstash로 수집하는 방법을 예제로 설명하겠습니다.

    Logstash config

    Logstash의 설정 파일은 input, filter, output 세 가지 섹션으로 구성됩니다. 아래는 Google News의 RSS 피드를 수집하기 위한 예제 설정입니다.

    Input

    http_poller 플러그인을 사용해 RSS 피드를 주기적으로 요청합니다.

    input {
      http_poller {
        urls => {
          google_news => {
            url => "https://news.google.com/rss"
            method => get
          }
        }
        request_timeout => 60
        schedule => { every => "10m" }
        codec => "plain"
        metadata_target => "http_metadata"
      }
    }
    

    urls: 수집할 RSS 피드 URL을 지정합니다.
    schedule: 데이터를 가져올 주기를 설정합니다. every => “5m”은 5분마다 데이터를 요청합니다. cron 표현식을 사용하여 보다 세부적인 설정이 가능합니다
    metadata_target: 요청 메타데이터를 저장할 필드를 지정합니다.

    Fiter

    수집된 RSS 데이터를 구조화된 형태로 변환합니다.

    filter {
      xml {
        source => "message"
        target => "rss"
        store_xml => true
        force_array => false
      }
      split {
        field => "[rss][channel][item]"
      }
      mutate {
        add_field => {
          "title" => "%{[rss][channel][item][title]}"
          "link" => "%{[rss][channel][item][link]}"
          "source" => "%{[rss][channel][item][source][content]}"
          "pubDate" => "%{[rss][channel][item][pubDate]}"
          "type" => "%{[http_metadata][input][http_poller][request][name]}"
        }
        remove_field => ["message", "rss", "event", "http_metadata"]
      }
    }
    

    xml: RSS 데이터를 XML 형태에서 파싱하여 rss 필드에 저장합니다.
    split: 각 기사(item)를 개별 이벤트로 분리합니다.
    mutate: 원하는 필드(title, link, source, pubDate)를 추가하고 불필요한 데이터를 제거합니다.

    Output

    결과 데이터를 Elasticsearch에 저장하거 콘솔에 출력합니다.

    output {
      elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "rss-data-%{+YYYY.MM.dd}"
      }
      stdout { codec => rubydebug }  
    }
    

    이 예제를 사용하면 Logstash를 활용하면 RSS 데이터를 효율적으로 수집하고 처리할 수 있습니다.

    [Elasticsearch] Similar image search by image or natural language(2)

    Similar image search by image or natural language(1)에서는 이미지 간의 유사도를 측정해서 유사한 이미지를 찾도록 하였다. 이번 포스트에서는 자연어를 이용해서 자연어에서 설명하는 이미지와 유사한 이미지를 찾는 방법에 대해서 알아보겠다.

    CLIP model은 Text로 들어오는 자연어를 Transformer 아키텍처를 사용해서 처리한다. 입력된 Text는 Tokenize되어 Embeding 된 후에 여러 개 층으로 구성된 Transformer 인코더를 통해 처리됩니다. 처리된 자연어는 Vector값으로 반환 되고, 자연어의 의미와 문맥이 어떤 이미지를 의미 하는 지를 512 차원의 Vector 값으로 나타내게 됩니다.

    CLIP model process

    이제, CLIP을 사용해서 검색에 사용할 자연어를 vector embedding해보자.
    (이전 포스트에서 이미지를 vector embedding 하던 코드를 조금 변형해서 사용하면 된다.)

    from sentence_transformers import SentenceTransformer
    from PIL import Image
     
    # 영어 모델 clip-ViT-B-32
    # 다국어 모델 clip-ViT-B-32-multilingual-v1
    img_model = SentenceTransformer('clip-ViT-B-32-multilingual-v1', device='cpu')
     
    def vectorize(text):
        embedding = img_model.encode(text) 
        return embedding.tolist()
     
    vectorize('빨간 사과')
    

    위 코드를 실행해보면 텍스트가 vector embedding되어 512 차원의 vector를 반환하는 것을 확인 할 수 있다. 반환 받은 vector 값을 이용하여서 이미지 검색을 해보자.

    GET image_vector/_search
    {
      "knn": {
        "field": "vector",
        "k": 5,
        "num_candidates": 10,
        "query_vector": [
          -0.10659506916999817,
          0.20013009011745453,
          0.15304496884346008,
          0.03850187361240387,
          -0.126994326710701,
         ... 생략 ...
          -0.04996727779507637
        ]
      },
      "fields": [
        "image_name"
      ],
      "_source": false
    }
    

    결과를 보면 빨간 사과라는 자연어 Text에서 “빨간”“사과”의 의미를 잘 파악해서 이미지를 잘 찾아 낸 것을 볼 수 있다.

    [Elasticsearch] Similar image search by image or natural language(1)

    Elasticsearch(이하 ES)에서 자연어나 이미지로 유사한 이미지를 검색하는 방법에 대해서 알아보자.

    유사한 이미지를 찾기 위해서는 먼저 이미지들 간의 유사도를 측정할 방법이 필요하고, 유사도를 측정하기 위해서 Vector를 사용할 수 있다. 이미지를 Embedding을 통해서 Vector로 변환하고, Vector 간의 유사도(Cosine similarity)를 계산해서 유사한 이미지를 찾을 수 있다.

    그러면, 이미지를 Vector embedding하는 방법은 무엇이 있을까?

    이미지를 vector embedding하는 방법과 이를 만들어주는 ML 모델들은 이미 많이 공개 되어 있다. ResNet, VGGNet, Inception 등 많은 모델들이 있다. 모델들의 성능을 잘 비교해서 알맞은 모델을 사용하면 된다.

    여기에서는 OpenAI에서 공개한 CLIP model를 사용할 것이다.

    CLIP을 사용해서 검색할 이미지들을 vector embedding해보자.
    (사용할 이미지가 없다면 아래 첨부된 이미지를 사용해보자.)

    from sentence_transformers import SentenceTransformer
    from PIL import Image
    
    img_model = SentenceTransformer('clip-ViT-B-32', device='cpu')
    
    def vectorize(file_path):
        image = Image.open(file_path)
        embedding = img_model.encode(image)
        del image
    
        return embedding.tolist()
    
    vectorize('./image/apple.jpg')
    

    위 코드를 실행해보면 이미지가 vector embedding되어 512 차원의 vector를 반환하는 것을 확인 할 수 있다. 검색 대상의 이미지들을 모두 embedding 하면 된다.

    vector embedding된 값들을 생성했다면, 이제 vector들을 ES에 올려서 유사 이미지 검색을 해보자.

    먼저, ES에서 Vector를 담을 index를 생성한다.

    PUT image_vector
    {
      "mappings": {
        "properties": {
          "image_name": {
            "type": "keyword"
          },
          "vector": {
            "type": "dense_vector",
            "dims": 512,
            "index": true,
            "similarity": "cosine"
          }
        }
      }
    }
    

    생성한 인덱스에 만든 Vector 값들을 넣어준다.

    from elasticsearch import Elasticsearch
    
    def index(image_name, vector):
        es = Elasticsearch(cloud_id='elastic_cloud_id',
                           basic_auth=('elastic', 'elastic_secret'))
        doc = {     
            'image_name': image_name,
            'vector': vector
        }
    
        es.index(index='image_vector', body=doc)
    
    index('apple.jpg', vector)
    

    여기까지 하면 유사한 이미지를 검색하기 위한 준비는 끝난다.

    원하는 이미지를 하나 골라서 검색을 해보자

    GET image_vector/_search
    {
      "knn": {
        "field": "vector",
        "k": 5,
        "num_candidates": 10,
        "query_vector": [
          0.5978590846061707,
          0.5926558375358582,
          -0.1243332028388977,
          0.11083680391311646,
          -0.6588778495788574,
         ... 생략 ...
          0.37319913506507874
        ]
      },
      "fields": [
        "image_name"
      ],
      "_source": false
    }
    

    왼쪽 끝의 사과 이미지와 위 예제 파일안에서 유사한 이미지 4개를 찾은 것을 확인 할 수 있다.

    Next: Similar image search by image or natural language(2)

    [Logstash on K8S] Azure SQL to ElasticSearch

    Azure SQL(이하 sql)의 data를 ElasticSearch(이하 es)로 검색을 하기 위해서 sql 데이터를 es의 index로 옮겨야 한다. 데이터를 옮기는 방법은 여러가지 방법이 있지만 일반적으로 많이 쓰이는 데이터 처리 오픈소스인 Logstash를 사용하여 옮기는 방법을 알아보자.

     Logstash를 운영하는 방식도 다양하지만 간단하게 사용하려면 역시 K8S만 한 것이 없기 때문에 K8S에 구성해보자.

    ConfigMap

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: logstash-configmap-start
      namespace: search
      labels:
        task: sql-search-init    
    data:
      start.sh: |
        #!/bin/bash
        curl -L -O https://go.microsoft.com/fwlink/?linkid=2203102
        tar -xvf ./sqljdbc_11.2.0.0_kor.tar.gz
        mv ./sqljdbc_11.2/enu/mssql-jdbc-11.2.0.jre11.jar ./lib/mssql-jdbc-11.2.0.jre11.jar
    
    • start.sh 에서는 jdbc 드라이버를 셋팅하는 작업을 수행한다. 이 링크에서 항상 최신 버전의 jdbc 드라이버를 확인하는 것을 권장한다.
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: logstash-configmap
      namespace: search
      labels:
        task: sql-search    
    data:
      logstash.yml: |
        http.host: "127.0.0.0"
        path.config: /usr/share/logstash/pipeline
      logstash.conf: 
        input {
          jdbc {
            jdbc_driver_library => "/usr/share/logstash/lib/mssql-jdbc-11.2.0.jre11.jar"
            jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
            jdbc_connection_string => "jdbc:sqlserver://{database server}:1433;database={Database name};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
            jdbc_user => "{user}"
            jdbc_password => "{password}"
            statement => "{SQL Query statement};"        
            schedule => "{CRON}"
            tracking_column => "{tracking column}"
            
          }
        }    
        output {
          elasticsearch {
            hosts => "{elastic host}"
            index => "{index name}"
            document_id => "%{index id name}"
            user => elastic
            password => "{elastic password}"
          }
          stdout {
            codec => rubydebug
          }
        }
    
    • input
      • jdbc_connection_string 에서는 접속할 Azure SQL의 connection string을 넣어 주면된다.
      • jdbc_user SQL 접속 계정 정보 값을 넣어준다. logstash 전용 계정을 만들어 사용하는 것을 권장한다.
      • jdbc_password 계정 비밀번호 값을 넣어준다.
      • statement index에 담을 table 혹은 작성한 query를 넣어 준다.
      • schedule 얼마 만큼 반복 작업을 할 것인지 스케줄을 정한다. 표기는 CRON을 사용한다.
      • tracking_column 데이터 변경에 대한 기준이 되는 column을 넣어 준다 (ex. timestemp)
    • output
      • hosts elastic host 정보를 넣어준다.
      • index SQL정보를 저장할 index 이름
      • document_id 인덱스에 저장할 문서를 식별할 키 값
      • password elstic에 접속할 비밀번호

    Deployment

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sql-logstash
      namespace: search
    spec:
      replicas: 1
      selector:
        matchLabels:
          task: sql-search      
      template:
        metadata:
          labels:
            task: sql-search        
        spec:
          containers:
          - name: logstash
            image: docker.elastic.co/logstash/logstash:8.4.2
            ports:
              - containerPort: 5044
            imagePullPolicy: Always
            volumeMounts:
            - mountPath: /usr/share/logstash/config
              name: config-volume
            - mountPath: /usr/share/logstash/pipeline
              name: logstash-pipeline-volume
            - mountPath: /start
              name: start
            lifecycle:
              postStart:
                exec:
                  command:
                   - /start/start.sh
          volumes:
          - name: config-volume
            configMap:
              name: logstash-configmap
              items:
              - key: logstash.yml
                path: logstash.yml
          - name: logstash-pipeline-volume
            configMap:
              name: logstash-configmap
              items:
              - key: logstash.conf
                path: logstash.conf
          - name: start
            configMap:
              name: logstash-configmap-start
              defaultMode: 0777
          securityContext:
            fsGroup: 101
    

    Service

    apiVersion: v1
    kind: Service
    metadata:
      labels:
        task: sql-search
        kubernetes.io/name: logstash
      name: sql-logstash
      namespace: search
    spec:
      ports:
      - port: 5000
        targetPort: 5000
      selector:
        k8s-app: logstash
    

    Configmap 부터 Service 까지 순서대로 모두 K8S에 적용해 주면 된다.
    (여러 개의 파일로 보는 것이 귀찮다면 하나의 yaml에 모두 넣어서 한번에 실행해도 된다.)