[Elasticsearch] Similar image search by image or natural language(1)

Elasticsearch(이하 ES)에서 자연어나 이미지로 유사한 이미지를 검색하는 방법에 대해서 알아보자.

유사한 이미지를 찾기 위해서는 먼저 이미지들 간의 유사도를 측정할 방법이 필요하고, 유사도를 측정하기 위해서 Vector를 사용할 수 있다. 이미지를 Embedding을 통해서 Vector로 변환하고, Vector 간의 유사도(Cosine similarity)를 계산해서 유사한 이미지를 찾을 수 있다.

그러면, 이미지를 Vector embedding하는 방법은 무엇이 있을까?

이미지를 vector embedding하는 방법과 이를 만들어주는 ML 모델들은 이미 많이 공개 되어 있다. ResNet, VGGNet, Inception 등 많은 모델들이 있다. 모델들의 성능을 잘 비교해서 알맞은 모델을 사용하면 된다.

여기에서는 OpenAI에서 공개한 CLIP model를 사용할 것이다.

CLIP을 사용해서 검색할 이미지들을 vector embedding해보자.
(사용할 이미지가 없다면 아래 첨부된 이미지를 사용해보자.)

from sentence_transformers import SentenceTransformer
from PIL import Image

img_model = SentenceTransformer('clip-ViT-B-32', device='cpu')

def vectorize(file_path):
    image = Image.open(file_path)
    embedding = img_model.encode(image)
    del image

    return embedding.tolist()

vectorize('./image/apple.jpg')

위 코드를 실행해보면 이미지가 vector embedding되어 512 차원의 vector를 반환하는 것을 확인 할 수 있다. 검색 대상의 이미지들을 모두 embedding 하면 된다.

vector embedding된 값들을 생성했다면, 이제 vector들을 ES에 올려서 유사 이미지 검색을 해보자.

먼저, ES에서 Vector를 담을 index를 생성한다.

PUT image_vector
{
  "mappings": {
    "properties": {
      "image_name": {
        "type": "keyword"
      },
      "vector": {
        "type": "dense_vector",
        "dims": 512,
        "index": true,
        "similarity": "cosine"
      }
    }
  }
}

생성한 인덱스에 만든 Vector 값들을 넣어준다.

from elasticsearch import Elasticsearch

def index(image_name, vector):
    es = Elasticsearch(cloud_id='elastic_cloud_id',
                       basic_auth=('elastic', 'elastic_secret'))
    doc = {     
        'image_name': image_name,
        'vector': vector
    }

    es.index(index='image_vector', body=doc)

index('apple.jpg', vector)

여기까지 하면 유사한 이미지를 검색하기 위한 준비는 끝난다.

원하는 이미지를 하나 골라서 검색을 해보자

GET image_vector/_search
{
  "knn": {
    "field": "vector",
    "k": 5,
    "num_candidates": 10,
    "query_vector": [
      0.5978590846061707,
      0.5926558375358582,
      -0.1243332028388977,
      0.11083680391311646,
      -0.6588778495788574,
     ... 생략 ...
      0.37319913506507874
    ]
  },
  "fields": [
    "image_name"
  ],
  "_source": false
}

왼쪽 끝의 사과 이미지와 위 예제 파일안에서 유사한 이미지 4개를 찾은 것을 확인 할 수 있다.

Next: Similar image search by image or natural language(2)

[Logstash on K8S] Azure SQL to ElasticSearch

Azure SQL(이하 sql)의 data를 ElasticSearch(이하 es)로 검색을 하기 위해서 sql 데이터를 es의 index로 옮겨야 한다. 데이터를 옮기는 방법은 여러가지 방법이 있지만 일반적으로 많이 쓰이는 데이터 처리 오픈소스인 Logstash를 사용하여 옮기는 방법을 알아보자.

 Logstash를 운영하는 방식도 다양하지만 간단하게 사용하려면 역시 K8S만 한 것이 없기 때문에 K8S에 구성해보자.

ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap-start
  namespace: search
  labels:
    task: sql-search-init    
data:
  start.sh: |
    #!/bin/bash
    curl -L -O https://go.microsoft.com/fwlink/?linkid=2203102
    tar -xvf ./sqljdbc_11.2.0.0_kor.tar.gz
    mv ./sqljdbc_11.2/enu/mssql-jdbc-11.2.0.jre11.jar ./lib/mssql-jdbc-11.2.0.jre11.jar
  • start.sh 에서는 jdbc 드라이버를 셋팅하는 작업을 수행한다. 이 링크에서 항상 최신 버전의 jdbc 드라이버를 확인하는 것을 권장한다.
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: search
  labels:
    task: sql-search    
data:
  logstash.yml: |
    http.host: "127.0.0.0"
    path.config: /usr/share/logstash/pipeline
  logstash.conf: 
    input {
      jdbc {
        jdbc_driver_library => "/usr/share/logstash/lib/mssql-jdbc-11.2.0.jre11.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://{database server}:1433;database={Database name};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
        jdbc_user => "{user}"
        jdbc_password => "{password}"
        statement => "{SQL Query statement};"        
        schedule => "{CRON}"
        tracking_column => "{tracking column}"
        
      }
    }    
    output {
      elasticsearch {
        hosts => "{elastic host}"
        index => "{index name}"
        document_id => "%{index id name}"
        user => elastic
        password => "{elastic password}"
      }
      stdout {
        codec => rubydebug
      }
    }
  • input
    • jdbc_connection_string 에서는 접속할 Azure SQL의 connection string을 넣어 주면된다.
    • jdbc_user SQL 접속 계정 정보 값을 넣어준다. logstash 전용 계정을 만들어 사용하는 것을 권장한다.
    • jdbc_password 계정 비밀번호 값을 넣어준다.
    • statement index에 담을 table 혹은 작성한 query를 넣어 준다.
    • schedule 얼마 만큼 반복 작업을 할 것인지 스케줄을 정한다. 표기는 CRON을 사용한다.
    • tracking_column 데이터 변경에 대한 기준이 되는 column을 넣어 준다 (ex. timestemp)
  • output
    • hosts elastic host 정보를 넣어준다.
    • index SQL정보를 저장할 index 이름
    • document_id 인덱스에 저장할 문서를 식별할 키 값
    • password elstic에 접속할 비밀번호

Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sql-logstash
  namespace: search
spec:
  replicas: 1
  selector:
    matchLabels:
      task: sql-search      
  template:
    metadata:
      labels:
        task: sql-search        
    spec:
      containers:
      - name: logstash
        image: docker.elastic.co/logstash/logstash:8.4.2
        ports:
          - containerPort: 5044
        imagePullPolicy: Always
        volumeMounts:
        - mountPath: /usr/share/logstash/config
          name: config-volume
        - mountPath: /usr/share/logstash/pipeline
          name: logstash-pipeline-volume
        - mountPath: /start
          name: start
        lifecycle:
          postStart:
            exec:
              command:
               - /start/start.sh
      volumes:
      - name: config-volume
        configMap:
          name: logstash-configmap
          items:
          - key: logstash.yml
            path: logstash.yml
      - name: logstash-pipeline-volume
        configMap:
          name: logstash-configmap
          items:
          - key: logstash.conf
            path: logstash.conf
      - name: start
        configMap:
          name: logstash-configmap-start
          defaultMode: 0777
      securityContext:
        fsGroup: 101

Service

apiVersion: v1
kind: Service
metadata:
  labels:
    task: sql-search
    kubernetes.io/name: logstash
  name: sql-logstash
  namespace: search
spec:
  ports:
  - port: 5000
    targetPort: 5000
  selector:
    k8s-app: logstash

Configmap 부터 Service 까지 순서대로 모두 K8S에 적용해 주면 된다.
(여러 개의 파일로 보는 것이 귀찮다면 하나의 yaml에 모두 넣어서 한번에 실행해도 된다.)