[Elasticsearch] Collecting RSS information with Logstash

RSS feed는 웹사이트의 최신 콘텐츠를 효율적으로 수집하고 분석할 수 있는 강력한 도구입니다. Logstash를 사용하면 RSS 데이터를 쉽게 수집하고 처리할 수 있습니다. 이 글에서는 Google News RSS를 Logstash로 수집하는 방법을 예제로 설명하겠습니다.

Logstash config

Logstash의 설정 파일은 input, filter, output 세 가지 섹션으로 구성됩니다. 아래는 Google News의 RSS 피드를 수집하기 위한 예제 설정입니다.

Input

http_poller 플러그인을 사용해 RSS 피드를 주기적으로 요청합니다.

input {
  http_poller {
    urls => {
      google_news => {
        url => "https://news.google.com/rss"
        method => get
      }
    }
    request_timeout => 60
    schedule => { every => "10m" }
    codec => "plain"
    metadata_target => "http_metadata"
  }
}

urls: 수집할 RSS 피드 URL을 지정합니다.
schedule: 데이터를 가져올 주기를 설정합니다. every => “5m”은 5분마다 데이터를 요청합니다. cron 표현식을 사용하여 보다 세부적인 설정이 가능합니다
metadata_target: 요청 메타데이터를 저장할 필드를 지정합니다.

Fiter

수집된 RSS 데이터를 구조화된 형태로 변환합니다.

filter {
  xml {
    source => "message"
    target => "rss"
    store_xml => true
    force_array => false
  }
  split {
    field => "[rss][channel][item]"
  }
  mutate {
    add_field => {
      "title" => "%{[rss][channel][item][title]}"
      "link" => "%{[rss][channel][item][link]}"
      "source" => "%{[rss][channel][item][source][content]}"
      "pubDate" => "%{[rss][channel][item][pubDate]}"
      "type" => "%{[http_metadata][input][http_poller][request][name]}"
    }
    remove_field => ["message", "rss", "event", "http_metadata"]
  }
}

xml: RSS 데이터를 XML 형태에서 파싱하여 rss 필드에 저장합니다.
split: 각 기사(item)를 개별 이벤트로 분리합니다.
mutate: 원하는 필드(title, link, source, pubDate)를 추가하고 불필요한 데이터를 제거합니다.

Output

결과 데이터를 Elasticsearch에 저장하거 콘솔에 출력합니다.

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "rss-data-%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }  
}

이 예제를 사용하면 Logstash를 활용하면 RSS 데이터를 효율적으로 수집하고 처리할 수 있습니다.

[Logstash on K8S] Azure SQL to ElasticSearch

Azure SQL(이하 sql)의 data를 ElasticSearch(이하 es)로 검색을 하기 위해서 sql 데이터를 es의 index로 옮겨야 한다. 데이터를 옮기는 방법은 여러가지 방법이 있지만 일반적으로 많이 쓰이는 데이터 처리 오픈소스인 Logstash를 사용하여 옮기는 방법을 알아보자.

 Logstash를 운영하는 방식도 다양하지만 간단하게 사용하려면 역시 K8S만 한 것이 없기 때문에 K8S에 구성해보자.

ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap-start
  namespace: search
  labels:
    task: sql-search-init    
data:
  start.sh: |
    #!/bin/bash
    curl -L -O https://go.microsoft.com/fwlink/?linkid=2203102
    tar -xvf ./sqljdbc_11.2.0.0_kor.tar.gz
    mv ./sqljdbc_11.2/enu/mssql-jdbc-11.2.0.jre11.jar ./lib/mssql-jdbc-11.2.0.jre11.jar
  • start.sh 에서는 jdbc 드라이버를 셋팅하는 작업을 수행한다. 이 링크에서 항상 최신 버전의 jdbc 드라이버를 확인하는 것을 권장한다.
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: search
  labels:
    task: sql-search    
data:
  logstash.yml: |
    http.host: "127.0.0.0"
    path.config: /usr/share/logstash/pipeline
  logstash.conf: 
    input {
      jdbc {
        jdbc_driver_library => "/usr/share/logstash/lib/mssql-jdbc-11.2.0.jre11.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://{database server}:1433;database={Database name};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
        jdbc_user => "{user}"
        jdbc_password => "{password}"
        statement => "{SQL Query statement};"        
        schedule => "{CRON}"
        tracking_column => "{tracking column}"
        
      }
    }    
    output {
      elasticsearch {
        hosts => "{elastic host}"
        index => "{index name}"
        document_id => "%{index id name}"
        user => elastic
        password => "{elastic password}"
      }
      stdout {
        codec => rubydebug
      }
    }
  • input
    • jdbc_connection_string 에서는 접속할 Azure SQL의 connection string을 넣어 주면된다.
    • jdbc_user SQL 접속 계정 정보 값을 넣어준다. logstash 전용 계정을 만들어 사용하는 것을 권장한다.
    • jdbc_password 계정 비밀번호 값을 넣어준다.
    • statement index에 담을 table 혹은 작성한 query를 넣어 준다.
    • schedule 얼마 만큼 반복 작업을 할 것인지 스케줄을 정한다. 표기는 CRON을 사용한다.
    • tracking_column 데이터 변경에 대한 기준이 되는 column을 넣어 준다 (ex. timestemp)
  • output
    • hosts elastic host 정보를 넣어준다.
    • index SQL정보를 저장할 index 이름
    • document_id 인덱스에 저장할 문서를 식별할 키 값
    • password elstic에 접속할 비밀번호

Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sql-logstash
  namespace: search
spec:
  replicas: 1
  selector:
    matchLabels:
      task: sql-search      
  template:
    metadata:
      labels:
        task: sql-search        
    spec:
      containers:
      - name: logstash
        image: docker.elastic.co/logstash/logstash:8.4.2
        ports:
          - containerPort: 5044
        imagePullPolicy: Always
        volumeMounts:
        - mountPath: /usr/share/logstash/config
          name: config-volume
        - mountPath: /usr/share/logstash/pipeline
          name: logstash-pipeline-volume
        - mountPath: /start
          name: start
        lifecycle:
          postStart:
            exec:
              command:
               - /start/start.sh
      volumes:
      - name: config-volume
        configMap:
          name: logstash-configmap
          items:
          - key: logstash.yml
            path: logstash.yml
      - name: logstash-pipeline-volume
        configMap:
          name: logstash-configmap
          items:
          - key: logstash.conf
            path: logstash.conf
      - name: start
        configMap:
          name: logstash-configmap-start
          defaultMode: 0777
      securityContext:
        fsGroup: 101

Service

apiVersion: v1
kind: Service
metadata:
  labels:
    task: sql-search
    kubernetes.io/name: logstash
  name: sql-logstash
  namespace: search
spec:
  ports:
  - port: 5000
    targetPort: 5000
  selector:
    k8s-app: logstash

Configmap 부터 Service 까지 순서대로 모두 K8S에 적용해 주면 된다.
(여러 개의 파일로 보는 것이 귀찮다면 하나의 yaml에 모두 넣어서 한번에 실행해도 된다.)