[Logstash on K8S] Azure SQL to ElasticSearch

Azure SQL(이하 sql)의 data를 ElasticSearch(이하 es)로 검색을 하기 위해서 sql 데이터를 es의 index로 옮겨야 한다. 데이터를 옮기는 방법은 여러가지 방법이 있지만 일반적으로 많이 쓰이는 데이터 처리 오픈소스인 Logstash를 사용하여 옮기는 방법을 알아보자.

 Logstash를 운영하는 방식도 다양하지만 간단하게 사용하려면 역시 K8S만 한 것이 없기 때문에 K8S에 구성해보자.

ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap-start
  namespace: search
  labels:
    task: sql-search-init    
data:
  start.sh: |
    #!/bin/bash
    curl -L -O https://go.microsoft.com/fwlink/?linkid=2203102
    tar -xvf ./sqljdbc_11.2.0.0_kor.tar.gz
    mv ./sqljdbc_11.2/enu/mssql-jdbc-11.2.0.jre11.jar ./lib/mssql-jdbc-11.2.0.jre11.jar
  • start.sh 에서는 jdbc 드라이버를 셋팅하는 작업을 수행한다. 이 링크에서 항상 최신 버전의 jdbc 드라이버를 확인하는 것을 권장한다.
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: search
  labels:
    task: sql-search    
data:
  logstash.yml: |
    http.host: "127.0.0.0"
    path.config: /usr/share/logstash/pipeline
  logstash.conf: 
    input {
      jdbc {
        jdbc_driver_library => "/usr/share/logstash/lib/mssql-jdbc-11.2.0.jre11.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://{database server}:1433;database={Database name};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
        jdbc_user => "{user}"
        jdbc_password => "{password}"
        statement => "{SQL Query statement};"        
        schedule => "{CRON}"
        tracking_column => "{tracking column}"
        
      }
    }    
    output {
      elasticsearch {
        hosts => "{elastic host}"
        index => "{index name}"
        document_id => "%{index id name}"
        user => elastic
        password => "{elastic password}"
      }
      stdout {
        codec => rubydebug
      }
    }
  • input
    • jdbc_connection_string 에서는 접속할 Azure SQL의 connection string을 넣어 주면된다.
    • jdbc_user SQL 접속 계정 정보 값을 넣어준다. logstash 전용 계정을 만들어 사용하는 것을 권장한다.
    • jdbc_password 계정 비밀번호 값을 넣어준다.
    • statement index에 담을 table 혹은 작성한 query를 넣어 준다.
    • schedule 얼마 만큼 반복 작업을 할 것인지 스케줄을 정한다. 표기는 CRON을 사용한다.
    • tracking_column 데이터 변경에 대한 기준이 되는 column을 넣어 준다 (ex. timestemp)
  • output
    • hosts elastic host 정보를 넣어준다.
    • index SQL정보를 저장할 index 이름
    • document_id 인덱스에 저장할 문서를 식별할 키 값
    • password elstic에 접속할 비밀번호

Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sql-logstash
  namespace: search
spec:
  replicas: 1
  selector:
    matchLabels:
      task: sql-search      
  template:
    metadata:
      labels:
        task: sql-search        
    spec:
      containers:
      - name: logstash
        image: docker.elastic.co/logstash/logstash:8.4.2
        ports:
          - containerPort: 5044
        imagePullPolicy: Always
        volumeMounts:
        - mountPath: /usr/share/logstash/config
          name: config-volume
        - mountPath: /usr/share/logstash/pipeline
          name: logstash-pipeline-volume
        - mountPath: /start
          name: start
        lifecycle:
          postStart:
            exec:
              command:
               - /start/start.sh
      volumes:
      - name: config-volume
        configMap:
          name: logstash-configmap
          items:
          - key: logstash.yml
            path: logstash.yml
      - name: logstash-pipeline-volume
        configMap:
          name: logstash-configmap
          items:
          - key: logstash.conf
            path: logstash.conf
      - name: start
        configMap:
          name: logstash-configmap-start
          defaultMode: 0777
      securityContext:
        fsGroup: 101

Service

apiVersion: v1
kind: Service
metadata:
  labels:
    task: sql-search
    kubernetes.io/name: logstash
  name: sql-logstash
  namespace: search
spec:
  ports:
  - port: 5000
    targetPort: 5000
  selector:
    k8s-app: logstash

Configmap 부터 Service 까지 순서대로 모두 K8S에 적용해 주면 된다.
(여러 개의 파일로 보는 것이 귀찮다면 하나의 yaml에 모두 넣어서 한번에 실행해도 된다.)

MS-SQL Replace ASCII control code

SQL 쿼리 결과를 XML 형태로 출력 할 때 아래와 같은 오류가 발생하는 경우가 있다.

FOR XML could not serialize the data for node ‘[ Column name]’ because it contains a character (0x001F) which is not allowed in XML. To retrieve this data using FOR XML, convert it to binary, varbinary or image data type and use the BINARY BASE64 directive.

XML로 변활 할 대상 데이터에 ASCII control code가 포함되어 있어서 발생하는 경우이다. 그래서 대상 데이터 안에 포함 되어 있는 해당 아래 스크립트 같이 XML로 변환 전에 모두 치환해 주어야 한다.

SELECT 
  REPLACE([Column name], char(0), '') 
FROM 
  [Table Name]

Control code를 삭제하는 Function을 만들어 두면 필요할 때 가져다 쓰면 좋다.

CREATE FUNCTION [dbo].[FN_ReplaceControlCharacter]  
(  
    @mString NVARCHAR(MAX)  
)  
RETURNS NVARCHAR(MAX)  
AS  
BEGIN      	
	SET @mString = REPLACE(@mString, char(0), '')
	SET @mString = REPLACE(@mString, char(1), '')
	SET @mString = REPLACE(@mString, char(2), '')
	SET @mString = REPLACE(@mString, char(3), '')
	SET @mString = REPLACE(@mString, char(4), '')
	SET @mString = REPLACE(@mString, char(5), '')
	SET @mString = REPLACE(@mString, char(6), '')
	SET @mString = REPLACE(@mString, char(7), '')
	SET @mString = REPLACE(@mString, char(8), '')
	SET @mString = REPLACE(@mString, char(9), '')
	SET @mString = REPLACE(@mString, char(10), '')
	SET @mString = REPLACE(@mString, char(11), '')
	SET @mString = REPLACE(@mString, char(12), '')
	SET @mString = REPLACE(@mString, char(13), '')
	SET @mString = REPLACE(@mString, char(14), '')
	SET @mString = REPLACE(@mString, char(15), '')
	SET @mString = REPLACE(@mString, char(16), '')
	SET @mString = REPLACE(@mString, char(17), '')
	SET @mString = REPLACE(@mString, char(18), '')
	SET @mString = REPLACE(@mString, char(19), '')
	SET @mString = REPLACE(@mString, char(20), '')
	SET @mString = REPLACE(@mString, char(21), '')
	SET @mString = REPLACE(@mString, char(22), '')
	SET @mString = REPLACE(@mString, char(23), '')
	SET @mString = REPLACE(@mString, char(24), '')
	SET @mString = REPLACE(@mString, char(25), '')
	SET @mString = REPLACE(@mString, char(26), '')
	SET @mString = REPLACE(@mString, char(27), '')
	SET @mString = REPLACE(@mString, char(28), '')
	SET @mString = REPLACE(@mString, char(29), '')
	SET @mString = REPLACE(@mString, char(30), '')
	SET @mString = REPLACE(@mString, char(31), '')

    RETURN @mString    
END  

0부터 31까지 순차적으로 있어서 While Loop를 사용하면 간결하게 보일 수 있긴 하지만, 데이터가 많은 경우(ex 100만개 이상의 row) 2배 이상의 속도 차이가 날 수 있다. 성능을 고려해야 한다면, 최대한 필요한 Control code를 특정 지어서 직접 치환하는 것을 권장한다.

Power BI Compare with previous row value

시간에 따라 변화하는 등의 시계열 데이터가 있을 때, 이전 값과 얼만큼 변화 했는지 확인하기 위해서 current row value와 previous row value를 비교 해볼 수 있다.

COVID19 Sample 데이터를 Power BI에 Load 해보자

COVID19-Sample

각 row 데이터를 식별하기 위해서 Index 열을 추가한다.
(참고, 별도의 설정 없이 기본으로 추가된 인덱스는 0부터 시작한다.)

Add Index

이제, 사용자 지정 열을 추가 하여서 Current row에 Previous row value를 추가한다.

사용자 지정 열

사용자 지정 열에는 Script를 추가 할 수 있는데 아래 스크립트를 넣어주면 Previous row에 있는 “인원[명]” 값을 Current row에 추가 할 수 있다.

try 
 #"추가된 인덱스" {[인덱스] + 1} [#"인원[명]"]
otherwise 0
Current row에 Previous row value가 추가됨.

두 개의 값의 비율을 계산하여 증감률을 계산해 낼 수 있다.
다시 한번 사용자 지정 열을 추가 하여 비율 계산 식을 추가한다.

사용자 지정 열
[#"인원[명]"] / [Previous value] * 100

다음과 같이 증감율에 대한 데이터를 확인해 볼 수 있다.

Rate(증감률) 열 추가

만든 증감율 데이터를 가지고 다음과 같이 시각화 할 수 있다.

코로나 차트

선 그래프는 2020년 7월부터 2021년 2월까지 신규 확진자 수를 나타내고 있으며 막대 그래프는 전일 대비 신규 확진자 증감률을 나타내고 있다. 막대 그래프의 색은 증감률 데이터가 100 이하로 감소하면 푸른 계열로 바뀌고, 증가하면 붉은 계열로 바뀌도록 되어 있다.

MS-SQL Stored procedure에 List(or Array) parameter 전달하기

Custom type을 정의하면 Stored procedure(이하 SP)에 List(or Array) 형태의 정보를 전달하여 처리 할 수 있다.

먼저, List로 전달할 정보를 담을 Custom table type을 정의한다.
객체 정보를 담는 Class를 정의한다고 생각하면 된다.

CREATE TYPE CodeList
AS TABLE
(
  Code varchar(32)  
);

SP에 변수를 type을 위에서 만든 Custom type으로 정의한다. 그러면, SP는 정의된 Table 정보를 받아 처리 할 수 있게 된다.

CREATE PROCEDURE GetInfoByCodes
	@pCodeList AS CodeList READONLY
AS 
BEGIN
	SET NOCOUNT ON;
	SELECT * FROM InfoTable
	WHERE Code in (SELECT * FROM @pCodeList)	
END
GO

임의로 Custom table type으로 만든 변수를 만들어 다음과 같이 SP를 실행해 볼 수 있다.

DECLARE @pCodes CodeList
INSERT INTO @pCodes VALUES('A40274208')
INSERT INTO @pCodes VALUES('A10028014') 
INSERT INTO @pCodes VALUES('A56087115') 

EXEC GetInfoByCodes @pCodeList = @pCodes

Calculating DTU in AWS RDS for MSSQL

On-Prem이나 VM 형태로 사용하던 MSSQL Server를 Azure SQL로 Migration을 한다고 가정해보자. 기존에 사용하던 처리 성능에 맞춰 Cloud 상에 알맞은 수준의 리소스를 준비하고 옮겨야 할 것이다.

기존에 사용하던 SQL Server는 CPU와 Memory, Disk I/O등 Hardware적인 요소로 처리 성능을 나타낸다. 하지만 Azure SQL에서는 DTU(Database Throughput Unit)라는 단위를 사용하여 처리 성능을 나타내기 때문에 성능 비교가 쉽지가 않다.

DTU는 Hardware 성능 요소인 CPU, Disk I/O와 Database Log flush 발생양을 이용하여서 계산해낸 단위이다. 때문에 MSSQL Server에서 각 Metric들을 추출해 낼 수 있다면, DTU를 계산해 낼 수 있다. DTU 계산은 이미 Azure DTU Calculator라는 계산기가 제공되고 있기 때문에 추출해낸 Metric 값들을 설명대로 잘 넣어 주기만 하면 쉽게 확인해 볼 수 있다.

Metric을 추출하는 방법은 대상이 되는 SQL Server에서 DTU Calculator에서 제공하는 PowerShell Script를 관리자 권한으로 실행시켜 주기만 하면 된다. Script는 CPU, Disk I/O, Database Log flush에 대한 Metric들을 DTU Calculator에서 요구하는 형식으로 추출해준다. 때문에 일반적으로 On-Prem이나 VM 형태로 MSSQL Server를 사용하고 있다면, 계산해 내는데 별다른 어려움이 없을 것이다.

하지만 AWS RDS for MSSQL 같은 경우 MSSQL Server를 AWS에서 Cloud Service로 제공하기 위해 Wrapping한 서비스다보니 SQL Server에 대한 관리자 권한을 얻을 수가 없다. 이러한 이유 때문에 DTU Calculator에서 제공하는 PowerShell Script를 사용할 수 가 없다.

이런 경우, DTU 계산에 필요한 Metric들을 AWS에서 별도로 추출해야 한다. AWS에는 Cloudwatch라는 서비스를 제공하고 있는데, AWS 서비스들에 대한 Performance Metric을 기록하고 제공하는 역할을 한다.

Cloudwatch에서 metric을 추출하기 위해서는 AWS CLI를 사용하는 것이 편리하다. AWS Console의 우측 상단에 있는 CLI 실행 아이콘을 눌러 AWS CLI를 실행해 보자.

AWS CLI가 실행 되었다면, 아래 조회 명령어(list-metrics)를 실행시켜 제공되는 metric들에 대한 정보를 확인해 보자.

aws cloudwatch list-metrics --namespace AWS/RDS --dimensions Name=DBInstanceIdentifier,Value=[RDS for MSSQL Name]

잘 실행되었다면, cloudwatch에서 제공하는 Metric 리스트들을 확인 할 수 있다. 리스트 중 DTU계산에 필요한 CPU Processor Time과 Disk Reads/sec, Writes/sec에 값에 해당 하는 Metric 다음과 같다. (참고로, Log Bytes Flushed/sec에 해당하는 metric은 제공되지 않는다.)

{
    "Namespace": "AWS/RDS",
    "MetricName": "ReadIOPS",
    "Dimensions": [
        {
            "Name": "DBInstanceIdentifier",
            "Value": "[RDS for MSSQL Name]"
        }
    ]
},
{
    "Namespace": "AWS/RDS",
    "MetricName": "WriteIOPS",
    "Dimensions": [
        {
            "Name": "DBInstanceIdentifier",
            "Value": "[RDS for MSSQL Name]"
        }
    ]
},
{
    "Namespace": "AWS/RDS",
    "MetricName": "CPUUtilization",
    "Dimensions": [
        {
            "Name": "DBInstanceIdentifier",
            "Value": "[RDS for MSSQL Name]"
        }
    ]
}

이제, 필요한 metric 정보에 대해서 확인 하였으니 수집해보자. 아래 metric 수집 명령어(get-metric-statistics)를 사용하면 수집된 metric정보가 csv파일로 저장된다.

aws cloudwatch get-metric-statistics --namespace AWS/RDS --dimensions Name=DBInstanceIdentifier,Value=[RDS for MSSQL Name] --metric-name=CPUUtilization --period 3600 --statistics Average --start-time 2021-01-01T00:00:00.000Z --end-time 2021-02-01T00:00:00.000Z | jq -r '.Datapoints[] | [.Timestamp, .Average, .Unit] | @csv' | cat > cpu.csv

aws cloudwatch get-metric-statistics --namespace AWS/RDS --dimensions Name=DBInstanceIdentifier,Value=[RDS for MSSQL Name] --metric-name=ReadIOPS --period 3600 --statistics Average --start-time 2021-01-01T00:00:00.000Z --end-time 2021-02-01T00:00:00.000Z | jq -r '.Datapoints[] | [.Timestamp, .Average, .Unit] | @csv' | cat > read.csv

aws cloudwatch get-metric-statistics --namespace AWS/RDS --dimensions Name=DBInstanceIdentifier,Value=[RDS for MSSQL Name] --metric-name=WriteIOPS --period 3600 --statistics Average --start-time 2021-01-01T00:00:00.000Z --end-time 2021-02-01T00:00:00.000Z | jq -r '.Datapoints[] | [.Timestamp, .Average, .Unit] | @csv' | cat > write.csv

생성한 csv 파일은 AWS CLI Storage에 저장 되어있다. 우측 상단의 Actions > Download File 메뉴를 사용하면 로컬 환경으로 다운 받을 수 있다.

3개의 파일 모두 다운 받아서 DTU 계산에 필요한 형식으로 맞춰준다.

  • % Processor Time = CPUUtilization
  • Disk Reads/sec = ReadIOPS
  • Disk Writes/sec = WriteIOPS
  • Log Bytes Flushed/sec에 해당하는 데이터가 없기때문에 0으로 넣어준다.

그림과 같은 형태로 구성될 것이다.

RDS for MSSQL metrics for DTU Calculator

이제, 한땀 한땀 준비한 데이터를 Azure DTU Calculator에 넣어 주기만 하면 되는데 한가지 주의할 점이 있다. 우리가 얻은 데이터는 RDS for MSSQL Server에 대한 정보이다. 때문에 여러 DB Instance에 대한 계산을 하는 Elastic Database 메뉴를 선택하여 계산 하도록 해야한다.

계산이 끝나면 아래와 같이 나오는데 이 결과를 통해서 AWS RDS에서 사용하던 성능 수준을 Azure SQL에서 그대로 사용하기 위해서 구성 해야하는 Service Tire/Performance Level을 확인 할 수 있다.

DTU Result

Web Crawling with Scrapy(2)

지난 포스트에서는 Scrapy Shell에서 간단한 명령 코드를 이용하여 간단히 Web Crawling을 수행해 보았다.

이번에는 지난 포스트에서 작성한 Shell 명령 코드를 활용하여 Spider Project를 만들어서 데이터를 주기적으로 업데이트 할 수 있는 Crawler 모듈을 만들어 보겠다.

그럼, Spider Project를 만들어 보자.

아래와 같이 Project를 구성할 위치에서 명령어를 실행해보자.

# newsCrawler라는 Spider Project를 만든다.
$ scrapy startproject newsCrawler

명령 실행이 완료 되면, 다음과 같은 메시지가 나오면서 Start Project가 만들어 진것을 확인 할 수 있다.

You can start your first spider with:
    cd newsCrawler
    scrapy genspider example example.com

이제, newsCrawler라고 만든 프로젝트를 이용하여 Crawler 모듈 개발 작업을 시작하면 되는데… 그전에 먼저, Project의 구성에 대해서 간단히 알아보도록 하자.

  • newsCrawler/ # Root Directory
    • scrapy.cfg # 프로젝트 설정 파일
    • newsCrawler/ # 프로젝트 공간
      • __init__.py
      • items.py # Spider가 작업을 완료한 후 반환하는 결과 값의 Schema를 정의하는 파일
      • middlewares.py # Spider가 요청을 보낼 때 process를 제어하는 파일
      • pipelines.py # Spider가 응답을 받았을 때 process를 제어하는 파일
      • settings.py # Spider 실행에 필요한 전반적인 옵션을 설정하는 파일
      • spiders/ # 데이터를 수집 가공하는 Spider(Crawler) 모듈을 개발하는 공간, 여러개의 Spider가 있을 수 있다.
        • __init__py

위 프로젝트 구성을 보면 “nwesCrawler/spider”라는 위치에서 Spider(Crawler)를 개발하면 되는 것을 알 수 있다.

해당 위치로 이동하여 아래와 같이 Spider(Crawler) 생성 명령을 실행해 보자.

# Project의 spiders폴더로 이동하여 newsBot이라는 Spider를 생성한다.
$ cd newsCrawler/spiders
$ scrapy genspider newsBot 'news.daum.net/ranking/popular'

명령 실행이 완료되면 다음과 같이 기본 Template이 생성되었다는 메시지를 확인 할 수 있다.

Created spider 'newsBot' using template 'basic' in module:
  {spiders_module.__name__}.{module}

이제, 만들어진 Template(newsBot.py)에 이전에 만들어둔 Shell 명령 코드를 넣어서 Spiders(Crawler) 모듈 개발 작업을 하면 된다.

import scrapy

class NewsbotSpider(scrapy.Spider):
    name = 'newsBot'
    allowed_domains = ['news.daum.net/ranking/popular']
    start_urls = ['http://news.daum.net/ranking/popular/']

    def parse(self, response):
        # Web Crawling with Scrapy(1) 참고
        titles = response.xpath('//ul[@class="list_news2"]/li/div[2]/strong/a/text()').extract()
        authors = response.xpath('//ul[@class="list_news2"]/li/div[2]/strong/span/text()').extract()
        previews_text = response.xpath('//ul[@class="list_news2"]/li/div[2]/div[1]/span/text()').extract()
        previews_image = response.xpath('//ul[@class="list_news2"]/li/a/img/@src').extract()

        for item in zip(titles, authors, previews_text, previews_image):
            scraped_info = {
                'title': item[0].strip(),
                'author': item[1].strip(),
                'preview_text': item[2].strip(),
                'preview_image': item[3].strip()
            }
            yield scraped_info

개발 작업 완료되었으면, 아래와 같이 newsBot을 실행시켜 보자.

# newsBot 실행하고 결과는 JSON 파일로 지정위치에 출력한다.
$ scrapy crawl newsBot -o ./newsCrawler/json/result.json

실행이 완료되면 결과가 지정된 위치 “newsCrawler/json”에 JSON 파일로 만들어 진 것을 확인 할 수 있다.

그런데 JSON 파일을 열어보면 다음과 같이 Unicode로 결과가 출력되어 데이터가 잘 수집된 것인지 확인 하기가 어렵다.

[
    {
        "title": "\ubb38 \ub300\ud1b5\ub839 \"3\ub2e8\uacc4 \uaca9\uc0c1\uc740 \ub9c8\uc9c0\ub9c9 \uc218\ub2e8..\ubd88\uac00\ud53c\ud558\uba74 \uacfc\uac10\ud788 \uacb0\ub2e8\"",
        "author": "\uacbd\ud5a5\uc2e0\ubb38",
        "preview_text": "[\uacbd\ud5a5\uc2e0\ubb38] \ubb38\uc7ac\uc778 \ub300\ud1b5\ub839\uc740 13\uc77c \ucf54\ub85c\ub09819 \ud655\uc9c4\uc790\uac00 \uae09\ub4f1\uc138\ub97c \ubcf4\uc774\ub294 \uac83\uacfc \uad00\ub828, \u201c\uc9c0\uae08 \ud655\uc0b0\uc138\ub97c \uaebe\uc9c0 \ubabb\ud558\uba74 \uc0ac\ud68c\uc801 \uac70\ub9ac\ub450\uae30 3\ub2e8\uacc4 \uaca9\uc0c1\ub3c4 \uac80\ud1a0\ud574\uc57c \ud558\ub294 \uc911\ub300\ud55c \uad6d...",
        "preview_image": "https://img1.daumcdn.net/thumb/S95x77ht.u/?fname=https%3A%2F%2Ft1.daumcdn.net%2Fnews%2F202012%2F13%2Fkhan%2F20201213154335225ujgt.jpg&scode=media"
    },
    ..... 생략 .....
]

출력 결과를 알아보기 쉽게 한글로 표현하기 위해서는 settings.py에 출력결과가 utf-8로 인코딩될 수 있도록 설정을 추가 해주면 된다.

FEED_EXPORT_ENCODING = 'utf-8'

설정 내용을 저장하고 다시한번 newsBot을 실행 시켜보자.

다시 작업을 실행하고 결과로 출력된 JSON 파일을 열어보면 한글로 잘 출력된 것을 확인 할 수 있다. 그리고 실제 사이트와 데이터를 비교해 보면 다음과 같이 잘 수집되었음을 확인 할 수 있을 것이다.

실제 사이트와 JSON 결과 파일 비교

데이터 수집 결과 까지 확인했으니 이제 남은 것은 주기적으로 실행되도록 하는 것인데, CRON Job을 사용하면 쉽게 Scheduling을 할 수 있다.

그런데… CRON은 무엇이 길래 Scheduling을 해준다는 것일까?

CRON이란, Unix system OS의 시간기반 Job scheduler다. 고정된 시간, 날짜, 간격에 맞춰 주기적으로 실행 할 수 있도록 scheduling을 할 수 있게 해주는데 여기에 수행할 Job(작업)을 지정해 주면 scheduling된 시점에 주기적으로 Job이 실행 되는 것이다.
(좀 더 자세한 CRON에 대한 설명을 원한다면, WiKi를 참고하길 바란다.)

CRON

CRON Job이 어떤 것인지 알아보았으니 만들어 사용해 보자.

먼저, Job으로 실행될 bash script file(crawl.sh)을 아래와 같이 만들어 준다.

#!/bin/sh
# spider project가 있는 프로젝트로 이동
cd newsCrawler/spiders

# scrapy가 설치된 python의 가상환경에서 spider 실행
pipenv run scrapy crawl newsBot -o ./newsCrawler/json/result.json

그 다음 만든 Script를 아래와 같이 CRON에 추가해 주면 Crawler에 대한 Scheduling 작업이 완료된다.

# 매일 0시 0분에 Crawling 작업을 한다.
0 0 * * * /Users/Python/newsCrawler/crawl.sh

정상적으로 CRON에 등록 되었다면 매일 0시 0분에 Cralwer가 동작 할 것이고 실행 결과는 json 폴더에 업데이트 될 것이다.

참고로, CRON Job이 정상적으로 동작했다면 아래와 같은 Syslog를 확인 할 수 있다.

Dec 14 00:00:00 DESKTOP CRON[1327]: (root) CMD (/Users/Python/newsCrawler/crawl.sh)

여기까지 Scrapy 오픈소스 라이브러리를 사용하여 Web Crawling 작업을 해 보았다.

간단히 사용법을 알아보는 정도에서 구성한 것이라 좀더 자세한 설명이나 추가적인 설명이 필요하다면, Scrapy에 대한 정보는 공식문서를 참고하길 바란다.

Web Crawling with Scrapy(1)

인터넷 상에는 무수히 많은 데이터가 있다. 그리고 우리는 그것을 수집하여 활용하고 싶지만 쉽지가 않다.

예를 들어 최근 뉴스를 수집하여 트랜드를 분석한다고 한다면, 뉴스 데이터를 어디서, 어떻게 얻을 것인가?

이미 잘 정리해서 제공하는 곳이 있다면 참 좋겠지만, 현실적으로 내가 원하는 데이터가 딱맞춰 제공되는 경우는 없다고 보는게 맞다. 그렇다면 원하는 데이터를 직접 수집을 해야한다는 말인데… 어떻게 할 수 있을까?

가장 단순하게는 인터넷 포털 사이트에서 하나하나 검색해서 수집할 수도 있지만, 수집에만 들어가는 시간과 노력이 만만치 않게 들어갈 것이다.

이런 데이터 수집에 대한 어려움을 프로그래밍으로 해결할 수 있다. 그 방법은 바로 Crawling(혹은 Scraping)이란 것이다.

Crawling을 하기위한 방법들은 여러가지가 있지만, 여기서는 Python 오픈소스 라이브러리 제공되고 쉽게 사용할 수 있는 Scrapy를 사용하여 Web Crawling을 하는 방법에 대해서 알아 보겠다.

Scrapy 라이브러리를 사용하기 위해서 다음과 같이 Scrpay 패키지를 설치해야 한다.

$ pip install scrapy

설치가 완료되면 Scrapy에서 제공하는 Shell을 이용할 수 가 있다. Shell에서는 간단히 명령문으로 만으로도 쉽게 Crawling을 실행 해 볼 수 있는데, 본격적으로 Crawling 프로그래밍을 하기 전에 수집할 데이터에 대한 이해를 하기 위해서 먼저 탐색적 접근을 해 볼 수 있다는 장점이 있다.

그럼, Shell을 이용하여 데이터를 탐색하고 수집해 보겠다.

# Scrapy Shell 실행
$ scrapy shell

Shell이 실행 되었으면, 데이터 수집을 시작 할 웹 사이트 위치를 지정해 준다.

# Daum 랭킹뉴스 - 많이 본 뉴스
$ fetch('https://news.daum.net/ranking/popular')
2020-12-08 00:10:23 [scrapy.core.engine] INFO: Spider opened
2020-12-08 00:10:23 [scrapy.core.engine] DEBUG: Crawled (200)  (referer: None)

정상적으로 실행 됬다면, Crawled라는 메시지와 함께 Response Code가 200으로 떨어지는 것을 확인 할 수 있다.

Crawler가 받은 데이터는 로컬 임시폴더(%userprofile%/AppData/Local/Temp)에 저장되어 지는데 다음과 같이 보기 명령문을 통해 데이터를 확인해 볼 수 있다.

$ view(response)

위 보기 명령문이 실행되면, HTML 페이지가 브라우저로 열릴 것이다. 그리고 해당 HTML 페이지는 실제 사이트가 아닌 로컬주소로 나타나는 것을 확인 할 수 있다. 이를 통해서 Scrapy가 웹 사이트를 Crawling하여 수집한 데이터는 HTML 페이지라는 것을 알 수 있다. 그리고 이 HTML 페이지 중에서 필요한 데이터 위치를 찾아서 추출 및 가공을 하여 유용한 데이터로 만들어 내면 되는 것이다.

그런데… HTML 페이지에서 어떻게 필요한 데이터 위치를 찾을까?

대부분의 HTML 페이지는 일종의 Document라고 할 수 있다. 그리고 Document는 DOM(Document Object Model)형태로 표현된다. 때문에 DOM을 활용하면 필요한 데이터 위치를 추적할 수 있다.

여기서 수집 하려고 하는 데이터는 다음과 같다고 정의해 보자.

  1. 제목
  2. 출처
  3. 미리보기(이미지, 글)

브라우저의 개발자 도구(F12)를 실행시켜 DOM 구조를 살펴보자.

DOM 구조와 데이터 위치

모든 뉴스는 “list_news2”라는 클래스를 속성 값으로 가진 UL Tag안에 모여 있는 것을 확인 할 수 있다. 그리고 각 수집 할 데이터 들은 그 하위 LI Tag 안에 위의 색으로 표시해둔대로 위치해 있는 것을 알 수 있다.

이제, 알아낸 데이터 위치 정보를 XPath 함수에 입력하여 필요한 데이터를 수집하면 된다.

첫 번째 수집 대상인 뉴스 기사 제목 부터 수집해 보자. XPath에 뉴스 제목의 위치 정보를 넘기기 위해서 다음과 같이 표현 할 수 있다.

'//ul[@class="list_news2"]/li/div[2]/strong/a/text()'

이 것의 의미는 “UL Tag 중에서 list_news2라는 클래스 속성 값을 갖는 객체를 Root로 한다. Root 밑에 개별 기사는 LI Tag에 담겨 있고, 그 밑에 두 번째 DIV 밑에 STRONG 밑에 A Tag Text에 기사 제목이 있다.” 라는 것이다.

위 표현 값을 XPath 함수에 넣어 실행해 보자.

# 뉴스 기사 제목
$ response.xpath('//ul[@class="list_news2"]/li/div[2]/strong/a/text()').extract()

실행 결과로 50개의 모든 뉴스 기사 제목을 리스트 형태로 반환한 것을 확인 할 수 있다.

['[날씨] 겨울 한파 찾아온다..9일 밤부터 곳곳 눈·비',
  '잘린 손가락 들고 20개 병원 전전 "코로나 아니면 치료도 못 받나요"',
  '"문 대통령 취임 초기 기대 컸지만.. 지금은 아니다"',
  "오보라던 '그들의 술자리'..총장도 검사들도 '조용'",
  '소형 오피스텔까지 싹쓸이.."막을 방법 없다"',
  .... 생략 ....
]

같은 방법으로 나머지 데이터도 수집해 보자.

# 뉴스 출처(언론사)
$ response.xpath('//ul[@class="list_news2"]/li/div[2]/strong/span/text()').extract()

# 미리 보기 글
$ response.xpath('//ul[@class="list_news2"]/li/div[2]/div[1]/span/text()').extract()

# 미리 보기 이미지 주소
$ response.xpath('//ul[@class="list_news2"]/li/a/img/@src').extract()

여기까지 잘 실행이 되었다면, Shell를 통해서 수집하려고한 대상 데이터에 대해서 Crawling이 가능하다는 것을 확인한 샘이다.

Shell 명령 코드로 수행해본 Crawling 작업이 1회성으로 데이터를 수집하고 끝나는 것이 라면 이대로 끝내면 되겠지만, 데이터를 주기적으로 추가 수집하거나 업데이트가 필요하다면, 매번 Shell 명령코드로 수행하기 어려울 것이다.

이런 부분은 Shell 명령 코드를 Crawler 모듈로 만들어서 주기적으로 실행 할 수 있으면 좋을 것이다.

다음 포스트에서는 이번 포스트에서 사용한 Shell 명령 코드를 그대로 활용하여 Spider Project를 만들고 주기적으로 데이터를 업데이트 할 수 있는 Crawler를 만들어 보겠다.

[공유]정보 시각화의 아름다움

오늘날 우리 주변에 대부분의 것들이 데이터로 표현되고 있다. 이러한 정보는 계속 누적되고 정보 과잉 상태로 이어져서 혼란스러워 보일 수 있다. 이런 문제점들은 정보의 시각화를 통해서 해소 될 수 있다. 다양하게 시각화된 그래프를 보고 고유한 패턴을 찾고 새로운 통찰력을 얻는 것이 중요한 것 같다.

The Beauty of data visualization

출처:TED

연관 분석(feat. Python)

지난 포스팅에서 연관분석(Association Analysis)이 무엇인지, 어떻게 결과를 도출하는지 에 대한 이론적인 방법을 살펴보았다.

이제 실제로 연관분석을 Python으로 하나씩 구현해 보자.

분석을 하려면 데이터가 필요하다. 이번 예제에서는 Instacart라는 온라인 기반 농작물 배송 서비스 회사에서 공개한 2017년 9월에 발생한 주문 및 제품 정보에 대한 데이터셋을 다운받아 사용 할 것이다.

데이터는 “The Instacart Online Grocery Shopping Dataset 2017” 링크를 통하면 다운 받을 수 있다. 다운받은 데이터는 압축을 푼 후에 작업 폴더에 옮겨둔다.
(예제에서는 작업할 Project 폴더에 “./Dataset/Instacart”라는 경로에 데이터를 넣어 두었다.)

먼저, 분석할 데이터를 확인해 보자.

필요한 모듈과 함수, 고정 변수 등을 미리 정의해 둔다.

import pandas as pd
import numpy as np
import sys
from itertools import combinations, groupby
from collections import Counter
from IPython.display import display

# 데이터 파일(객체)이 어느정도 사이즈(MB) 인지 확인 하는 함수.
def size(obj):
    return "{0:.2f} MB".format(sys.getsizeof(obj) / (1000 * 1000))

# 파일 저장 경로
path = "./Dataset/Instacart"

Pandas를 이용하여 주문 데이터를 읽어서 데이터의 사이즈와 차원을 확인하고 실제 데이터를 살펴 보기 위해 상위 5개 데이터를 확인해 본다.

orders = pd.read_csv(path + '/order_products__prior.csv')
print('orders -- dimensions: {0};   size: {1}'.format(orders.shape, size(orders)))
display(orders.head())
orders -- dimensions: (32434489, 4);   size: 1037.90 MB
order_idproduct_idadd_to_cart_orderreordered
023312011
122898521
22932730
324591841
423003550
주문 데이터 상위 5개

주문 데이터는 4개의 차원으로 약 3천만 건의 주문 정보를 담고 있으며, 총 데이터의 크기는 1GB라는 것을 알 수 있다. 4개의 차원은 주문번호, 상품번호, 카트에 담긴 순서, 재(추가)주문 상태를 나타내고 있다. 여기서 연관규칙을 찾을 때 필요한 데이터는 주문번호와 상품번호만 있으면 되기 때문에 주문번호를 인덱스로 하고 상품번호를 Value로하는 Series로 변환한다.

orders = orders.set_index('order_id')['product_id'].rename('item_id')
print('dimensions: {0};   size: {1};   unique_orders: {2};   unique_items: {3}'
      .format(orders.shape, size(orders), len(orders.index.unique()), len(orders.value_counts())))
dimensions: (32434489,);   size: 518.95 MB;   unique_orders: 3214874;   unique_items: 49677

차원이 4개에서 1개로 줄었고, 사이즈도 1GB에서 절반정도 줄어든 518MB가 되었다.

데이터가 준비 되었으니, 연관 규칙을 찾기 위한 프로그램(함수)들을 정의해보자.

연관규칙을 찾기 위해서는 지지도, 신뢰도, 향상도 지표를 확인하여 규칙의 효용성을 확인 해야 한다. 이 3개 지표를 계산해 내기 위한 함수를 먼저 정의한다.

# 단일 품목이나 품목 집합에 대한 빈도수를 반환한다.
def freq(iterable):
    if type(iterable) == pd.core.series.Series:
        return iterable.value_counts().rename("freq")
    else: 
        return pd.Series(Counter(iterable)).rename("freq")
    
# 고유한 주문번호 갯수를 반환한다.
def order_count(order_item):
    return len(set(order_item.index))

# 한번에 한 품목 집합을 생성하는 generator를 반환한다.
def get_item_pairs(order_item):
    order_item = order_item.reset_index().values
    for order_id, order_object in groupby(order_item, lambda x: x[0]):
        item_list = [item[1] for item in order_object]
              
        for item_pair in combinations(item_list, 2):
            yield item_pair            

# 품목에 대한 빈도수와 지지도를 반환한다.
def merge_item_stats(item_pairs, item_stats):
    return (item_pairs
                .merge(item_stats.rename(columns={'freq': 'freqA', 'support': 'supportA'}), left_on='item_A', right_index=True)
                .merge(item_stats.rename(columns={'freq': 'freqB', 'support': 'supportB'}), left_on='item_B', right_index=True))

# 품목 이름을 반환한다.
def merge_item_name(rules, item_name):
    columns = ['itemA','itemB','freqAB','supportAB','freqA','supportA','freqB','supportB', 
               'confidenceAtoB','confidenceBtoA','lift']
    rules = (rules
                .merge(item_name.rename(columns={'item_name': 'itemA'}), left_on='item_A', right_on='item_id')
                .merge(item_name.rename(columns={'item_name': 'itemB'}), left_on='item_B', right_on='item_id'))
    return rules[columns]

다음으로, 실제 규칙을 찾기 위해 위 지표를 구하는 함수들을 이용하여, 연관 규칙을 찾는 함수를 정의 한다.

# 미리 준비한 주문 정보(주문번호를 인덱스로 하고 상품번호를 Value로하는 Series)와 최소 지지도를 입력받아 연관 규칙을 반환한다.
def association_rules(order_item, min_support):

    print("Starting order_item: {:22d}".format(len(order_item)))

    # 빈도수와 지지도를 계산한다.
    item_stats             = freq(order_item).to_frame("freq")
    item_stats['support']  = item_stats['freq'] / order_count(order_item) * 100

    # 최소 지지도를 만족하지 못하는 품목은 제외한다. 
    qualifying_items       = item_stats[item_stats['support'] >= min_support].index
    order_item             = order_item[order_item.isin(qualifying_items)]

    print("Items with support >= {}: {:15d}".format(min_support, len(qualifying_items)))
    print("Remaining order_item: {:21d}".format(len(order_item)))

    # 2개 미만의 주문 정보는 제외한다.
    order_size             = freq(order_item.index)
    qualifying_orders      = order_size[order_size >= 2].index
    order_item             = order_item[order_item.index.isin(qualifying_orders)]

    print("Remaining orders with 2+ items: {:11d}".format(len(qualifying_orders)))
    print("Remaining order_item: {:21d}".format(len(order_item)))

    # 빈도수와 지지도를 다시 계산한다.
    item_stats             = freq(order_item).to_frame("freq")
    item_stats['support']  = item_stats['freq'] / order_count(order_item) * 100

    # 품목 집합에 대한 generator를 생성한다.
    item_pair_gen          = get_item_pairs(order_item)

    # 품목 집합의 빈도수와 지지도를 계산한다. 
    item_pairs              = freq(item_pair_gen).to_frame("freqAB")
    item_pairs['supportAB'] = item_pairs['freqAB'] / len(qualifying_orders) * 100

    print("Item pairs: {:31d}".format(len(item_pairs)))

    # 최소 지지도를 만족하지 못하는 품목 집합을 제외한다.
    item_pairs              = item_pairs[item_pairs['supportAB'] >= min_support]

    print("Item pairs with support >= {}: {:10d}\n".format(min_support, len(item_pairs)))

    # 계산된 연관 규칙을 계산된 지표들과 함께 테이블로 만든다.
    item_pairs = item_pairs.reset_index().rename(columns={'level_0': 'item_A', 'level_1': 'item_B'})
    item_pairs = merge_item_stats(item_pairs, item_stats)
    
    item_pairs['confidenceAtoB'] = item_pairs['supportAB'] / item_pairs['supportA']
    item_pairs['confidenceBtoA'] = item_pairs['supportAB'] / item_pairs['supportB']
    item_pairs['lift']           = item_pairs['supportAB'] / (item_pairs['supportA'] * item_pairs['supportB'])
    
    # 향상도를 내림차순으로 정렬하여 연관 규칙 결과를 반환한다.
    return item_pairs.sort_values('lift', ascending=False)

연관 규칙을 찾기 위한 데이터와 프로그램(함수) 준비가 완료 되었다.

연관규칙을 찾아보자.

%%time
rules = association_rules(orders, 0.01)
Starting order_item:               32434489
Items with support >= 0.01:           10906
Remaining order_item:              29843570
Remaining orders with 2+ items:     3013325
Remaining order_item:              29662716
Item pairs:                        30622410
Item pairs with support >= 0.01:      48751

Wall time: 6min 24s

출력된 결과를 확인해보자. 약 3천만 건의 주문 정보에서 최소 지지도 0.01를 넘는 약 4만 8천건의 연관 규칙을 찾아 내었고, 연관 규칙을 만들어 내는 데 걸린 시간은 6분 24초가 걸렸다는 것을 알 수 있다.

찾은 결과를 출력해 보자.

# 품목 ID를 보기 좋게 하기 위해서 품목 이름으로 바꿔준다.
item_name   = pd.read_csv(path + '/products.csv')
item_name   = item_name.rename(columns={'product_id':'item_id', 'product_name':'item_name'})
rules_final = merge_item_name(rules, item_name).sort_values('lift', ascending=False)
display(rules_final)
연관 규칙 결과 테이블

출력된 연관 규칙 테이블을 보면 다소 복잡해 보일 수 있으나, 맨 마지막 열의 향상도(lift)를 보면 품목 간의 관계를 확인 할 수 있다.

  • lift = 1, 품목간의 관계 없다.
    *예: 우연히, 같이 사게되는 경우
  • lift > 1, 품목간의 긍정적인 관계가 있다.
    *예: 같이 사는 경우
  • lift < 1, 품목간의 부정적인 관계가 있다.
    *예: 같이 사지 않는 경우

출력된 향상도를 이용하여 결과를 분석해보자.

먼저, 품목 간의 긍정적인 관계인 향상도가 1 보다 큰 결과를 살펴 보자.

  1. 코티지 치즈는 블루베리 아사이 맛과 딸기 치아 맛을 같이 구매한다.
  2. 고양이 먹이는 치킨 맛과 칠면조 맛을 같이 구매한다.
  3. 요거트는 믹스 베리 맛과 사과 블루베리 맛을 같이 구매한다.

위 결과를 토대로 생각해 보면, 대부분 구입한 한 가지 품목에 대해서 다른 맛을 내는 같은 품목을 같이 구입한다는 것을 알 수 있다.

다음으로, 품목 간의 부정적인 관계인 향상도가 1보다 작은 결과를 보자.

  1. 유기농 바나나를 사는 경우 일반 바나나는 사지 않는다.
  2. 일반 품종의 아보카도를 구입한 경우 하스 아보카도를 구입하지 않는다.
  3. 유기농 딸기를 사는 경우 일반 딸기를 사지 않는다.

이번 결과에서는 구입한 한 가지 품목에 대해서 다른 생산과정 혹은 영양구성의 같은 품목은 같이 구입하지 않는다는 것을 알 수 있다.

이전 포스팅에서 언급했듯이, 단순히 나열된 구매 정보만으로는 확인 할 수 없었던 규칙들을 연관 분석을 통해서 알아 낼 수 있게 되었다. 이러한 정보를 바탕으로 고객들에게 제품을 추천 한다거나 상품의 배열을 바꿔 줄 수 있다면 상품 판매에 의미 있는 결과를 얻을 수 있을 거라 생각한다.

MS-SQL Cursor 사용하기

SQL 작업을 하다보면 조회된 쿼리 결과에 대해서 행 단위 작업이 필요할때가 있다. 이때, Cursor를 사용하면 효율적으로 처리 할 수 있다.

Cursor Command

  • DECLARE: Cursor에 관련된 선언을 하는 명령
  • OPEN: Cursor가 Query결과의 첫번째 Tuple을 포인트로 하도록 설정하는 명령
  • FETCH: Query 결과의 Tuple들 중 현재의 다음 Tuple로 커서를 이동시키는 명령
  • CLOSE: Query 수행을 모두 마친 후 Cursor를 닫기 위한 명령
  • DEALLOCATE: Close된 Cursor의 자원을 반환하는 명령

Cursor life cycle

Cursor Example

DECLARE @pColum_1 NVARCHAR(100), @pColum_2 INT, @pColum_3 BIT
DEClARE pCursor CURSOR
FOR
   select * from [Target Table]

OPEN pCursor

FETCH NEXT FROM pCursor INTO @pColum_1, @pColum_2, @pColum_3

WHILE(@@FETCH_STATUS <> -1)
   BEGIN
      update [Target Table] SET Colum = @Colum_1 + 1
      where Colum_2 = @pColum_2 and Colum_3 = @pColum_3

      FETCH NEXT FROM pCursor INTO @pColum_1, @pColum_2, @pColum_3
   END

CLOSE pCursor
DEALLOCATE pCursor

-유의사항-

Cursor를 사용하게 되면 내부적인 임시테이블을 사용하기 때문에 많이 사용하게 되면 DB성능에 영향을 미치게 된다. 되도록 사용량이 적은 시간때이거나 적은양의 데이터를 처리할 때 사용하는 것이 바람직 하다.