[Logstash on K8S] Azure SQL to ElasticSearch

Azure SQL(이하 sql)의 data를 ElasticSearch(이하 es)로 검색을 하기 위해서 sql 데이터를 es의 index로 옮겨야 한다. 데이터를 옮기는 방법은 여러가지 방법이 있지만 일반적으로 많이 쓰이는 데이터 처리 오픈소스인 Logstash를 사용하여 옮기는 방법을 알아보자.

 Logstash를 운영하는 방식도 다양하지만 간단하게 사용하려면 역시 K8S만 한 것이 없기 때문에 K8S에 구성해보자.

ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap-start
  namespace: search
  labels:
    task: sql-search-init    
data:
  start.sh: |
    #!/bin/bash
    curl -L -O https://go.microsoft.com/fwlink/?linkid=2203102
    tar -xvf ./sqljdbc_11.2.0.0_kor.tar.gz
    mv ./sqljdbc_11.2/enu/mssql-jdbc-11.2.0.jre11.jar ./lib/mssql-jdbc-11.2.0.jre11.jar
  • start.sh 에서는 jdbc 드라이버를 셋팅하는 작업을 수행한다. 이 링크에서 항상 최신 버전의 jdbc 드라이버를 확인하는 것을 권장한다.
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: search
  labels:
    task: sql-search    
data:
  logstash.yml: |
    http.host: "127.0.0.0"
    path.config: /usr/share/logstash/pipeline
  logstash.conf: 
    input {
      jdbc {
        jdbc_driver_library => "/usr/share/logstash/lib/mssql-jdbc-11.2.0.jre11.jar"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://{database server}:1433;database={Database name};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
        jdbc_user => "{user}"
        jdbc_password => "{password}"
        statement => "{SQL Query statement};"        
        schedule => "{CRON}"
        tracking_column => "{tracking column}"
        
      }
    }    
    output {
      elasticsearch {
        hosts => "{elastic host}"
        index => "{index name}"
        document_id => "%{index id name}"
        user => elastic
        password => "{elastic password}"
      }
      stdout {
        codec => rubydebug
      }
    }
  • input
    • jdbc_connection_string 에서는 접속할 Azure SQL의 connection string을 넣어 주면된다.
    • jdbc_user SQL 접속 계정 정보 값을 넣어준다. logstash 전용 계정을 만들어 사용하는 것을 권장한다.
    • jdbc_password 계정 비밀번호 값을 넣어준다.
    • statement index에 담을 table 혹은 작성한 query를 넣어 준다.
    • schedule 얼마 만큼 반복 작업을 할 것인지 스케줄을 정한다. 표기는 CRON을 사용한다.
    • tracking_column 데이터 변경에 대한 기준이 되는 column을 넣어 준다 (ex. timestemp)
  • output
    • hosts elastic host 정보를 넣어준다.
    • index SQL정보를 저장할 index 이름
    • document_id 인덱스에 저장할 문서를 식별할 키 값
    • password elstic에 접속할 비밀번호

Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sql-logstash
  namespace: search
spec:
  replicas: 1
  selector:
    matchLabels:
      task: sql-search      
  template:
    metadata:
      labels:
        task: sql-search        
    spec:
      containers:
      - name: logstash
        image: docker.elastic.co/logstash/logstash:8.4.2
        ports:
          - containerPort: 5044
        imagePullPolicy: Always
        volumeMounts:
        - mountPath: /usr/share/logstash/config
          name: config-volume
        - mountPath: /usr/share/logstash/pipeline
          name: logstash-pipeline-volume
        - mountPath: /start
          name: start
        lifecycle:
          postStart:
            exec:
              command:
               - /start/start.sh
      volumes:
      - name: config-volume
        configMap:
          name: logstash-configmap
          items:
          - key: logstash.yml
            path: logstash.yml
      - name: logstash-pipeline-volume
        configMap:
          name: logstash-configmap
          items:
          - key: logstash.conf
            path: logstash.conf
      - name: start
        configMap:
          name: logstash-configmap-start
          defaultMode: 0777
      securityContext:
        fsGroup: 101

Service

apiVersion: v1
kind: Service
metadata:
  labels:
    task: sql-search
    kubernetes.io/name: logstash
  name: sql-logstash
  namespace: search
spec:
  ports:
  - port: 5000
    targetPort: 5000
  selector:
    k8s-app: logstash

Configmap 부터 Service 까지 순서대로 모두 K8S에 적용해 주면 된다.
(여러 개의 파일로 보는 것이 귀찮다면 하나의 yaml에 모두 넣어서 한번에 실행해도 된다.)

MS-SQL Replace ASCII control code

SQL 쿼리 결과를 XML 형태로 출력 할 때 아래와 같은 오류가 발생하는 경우가 있다.

FOR XML could not serialize the data for node ‘[ Column name]’ because it contains a character (0x001F) which is not allowed in XML. To retrieve this data using FOR XML, convert it to binary, varbinary or image data type and use the BINARY BASE64 directive.

XML로 변활 할 대상 데이터에 ASCII control code가 포함되어 있어서 발생하는 경우이다. 그래서 대상 데이터 안에 포함 되어 있는 해당 아래 스크립트 같이 XML로 변환 전에 모두 치환해 주어야 한다.

SELECT 
  REPLACE([Column name], char(0), '') 
FROM 
  [Table Name]

Control code를 삭제하는 Function을 만들어 두면 필요할 때 가져다 쓰면 좋다.

CREATE FUNCTION [dbo].[FN_ReplaceControlCharacter]  
(  
    @mString NVARCHAR(MAX)  
)  
RETURNS NVARCHAR(MAX)  
AS  
BEGIN      	
	SET @mString = REPLACE(@mString, char(0), '')
	SET @mString = REPLACE(@mString, char(1), '')
	SET @mString = REPLACE(@mString, char(2), '')
	SET @mString = REPLACE(@mString, char(3), '')
	SET @mString = REPLACE(@mString, char(4), '')
	SET @mString = REPLACE(@mString, char(5), '')
	SET @mString = REPLACE(@mString, char(6), '')
	SET @mString = REPLACE(@mString, char(7), '')
	SET @mString = REPLACE(@mString, char(8), '')
	SET @mString = REPLACE(@mString, char(9), '')
	SET @mString = REPLACE(@mString, char(10), '')
	SET @mString = REPLACE(@mString, char(11), '')
	SET @mString = REPLACE(@mString, char(12), '')
	SET @mString = REPLACE(@mString, char(13), '')
	SET @mString = REPLACE(@mString, char(14), '')
	SET @mString = REPLACE(@mString, char(15), '')
	SET @mString = REPLACE(@mString, char(16), '')
	SET @mString = REPLACE(@mString, char(17), '')
	SET @mString = REPLACE(@mString, char(18), '')
	SET @mString = REPLACE(@mString, char(19), '')
	SET @mString = REPLACE(@mString, char(20), '')
	SET @mString = REPLACE(@mString, char(21), '')
	SET @mString = REPLACE(@mString, char(22), '')
	SET @mString = REPLACE(@mString, char(23), '')
	SET @mString = REPLACE(@mString, char(24), '')
	SET @mString = REPLACE(@mString, char(25), '')
	SET @mString = REPLACE(@mString, char(26), '')
	SET @mString = REPLACE(@mString, char(27), '')
	SET @mString = REPLACE(@mString, char(28), '')
	SET @mString = REPLACE(@mString, char(29), '')
	SET @mString = REPLACE(@mString, char(30), '')
	SET @mString = REPLACE(@mString, char(31), '')

    RETURN @mString    
END  

0부터 31까지 순차적으로 있어서 While Loop를 사용하면 간결하게 보일 수 있긴 하지만, 데이터가 많은 경우(ex 100만개 이상의 row) 2배 이상의 속도 차이가 날 수 있다. 성능을 고려해야 한다면, 최대한 필요한 Control code를 특정 지어서 직접 치환하는 것을 권장한다.

MS-SQL Stored procedure에 List(or Array) parameter 전달하기

Custom type을 정의하면 Stored procedure(이하 SP)에 List(or Array) 형태의 정보를 전달하여 처리 할 수 있다.

먼저, List로 전달할 정보를 담을 Custom table type을 정의한다.
객체 정보를 담는 Class를 정의한다고 생각하면 된다.

CREATE TYPE CodeList
AS TABLE
(
  Code varchar(32)  
);

SP에 변수를 type을 위에서 만든 Custom type으로 정의한다. 그러면, SP는 정의된 Table 정보를 받아 처리 할 수 있게 된다.

CREATE PROCEDURE GetInfoByCodes
	@pCodeList AS CodeList READONLY
AS 
BEGIN
	SET NOCOUNT ON;
	SELECT * FROM InfoTable
	WHERE Code in (SELECT * FROM @pCodeList)	
END
GO

임의로 Custom table type으로 만든 변수를 만들어 다음과 같이 SP를 실행해 볼 수 있다.

DECLARE @pCodes CodeList
INSERT INTO @pCodes VALUES('A40274208')
INSERT INTO @pCodes VALUES('A10028014') 
INSERT INTO @pCodes VALUES('A56087115') 

EXEC GetInfoByCodes @pCodeList = @pCodes

Calculating DTU in AWS RDS for MSSQL

On-Prem이나 VM 형태로 사용하던 MSSQL Server를 Azure SQL로 Migration을 한다고 가정해보자. 기존에 사용하던 처리 성능에 맞춰 Cloud 상에 알맞은 수준의 리소스를 준비하고 옮겨야 할 것이다.

기존에 사용하던 SQL Server는 CPU와 Memory, Disk I/O등 Hardware적인 요소로 처리 성능을 나타낸다. 하지만 Azure SQL에서는 DTU(Database Throughput Unit)라는 단위를 사용하여 처리 성능을 나타내기 때문에 성능 비교가 쉽지가 않다.

DTU는 Hardware 성능 요소인 CPU, Disk I/O와 Database Log flush 발생양을 이용하여서 계산해낸 단위이다. 때문에 MSSQL Server에서 각 Metric들을 추출해 낼 수 있다면, DTU를 계산해 낼 수 있다. DTU 계산은 이미 Azure DTU Calculator라는 계산기가 제공되고 있기 때문에 추출해낸 Metric 값들을 설명대로 잘 넣어 주기만 하면 쉽게 확인해 볼 수 있다.

Metric을 추출하는 방법은 대상이 되는 SQL Server에서 DTU Calculator에서 제공하는 PowerShell Script를 관리자 권한으로 실행시켜 주기만 하면 된다. Script는 CPU, Disk I/O, Database Log flush에 대한 Metric들을 DTU Calculator에서 요구하는 형식으로 추출해준다. 때문에 일반적으로 On-Prem이나 VM 형태로 MSSQL Server를 사용하고 있다면, 계산해 내는데 별다른 어려움이 없을 것이다.

하지만 AWS RDS for MSSQL 같은 경우 MSSQL Server를 AWS에서 Cloud Service로 제공하기 위해 Wrapping한 서비스다보니 SQL Server에 대한 관리자 권한을 얻을 수가 없다. 이러한 이유 때문에 DTU Calculator에서 제공하는 PowerShell Script를 사용할 수 가 없다.

이런 경우, DTU 계산에 필요한 Metric들을 AWS에서 별도로 추출해야 한다. AWS에는 Cloudwatch라는 서비스를 제공하고 있는데, AWS 서비스들에 대한 Performance Metric을 기록하고 제공하는 역할을 한다.

Cloudwatch에서 metric을 추출하기 위해서는 AWS CLI를 사용하는 것이 편리하다. AWS Console의 우측 상단에 있는 CLI 실행 아이콘을 눌러 AWS CLI를 실행해 보자.

AWS CLI가 실행 되었다면, 아래 조회 명령어(list-metrics)를 실행시켜 제공되는 metric들에 대한 정보를 확인해 보자.

aws cloudwatch list-metrics --namespace AWS/RDS --dimensions Name=DBInstanceIdentifier,Value=[RDS for MSSQL Name]

잘 실행되었다면, cloudwatch에서 제공하는 Metric 리스트들을 확인 할 수 있다. 리스트 중 DTU계산에 필요한 CPU Processor Time과 Disk Reads/sec, Writes/sec에 값에 해당 하는 Metric 다음과 같다. (참고로, Log Bytes Flushed/sec에 해당하는 metric은 제공되지 않는다.)

{
    "Namespace": "AWS/RDS",
    "MetricName": "ReadIOPS",
    "Dimensions": [
        {
            "Name": "DBInstanceIdentifier",
            "Value": "[RDS for MSSQL Name]"
        }
    ]
},
{
    "Namespace": "AWS/RDS",
    "MetricName": "WriteIOPS",
    "Dimensions": [
        {
            "Name": "DBInstanceIdentifier",
            "Value": "[RDS for MSSQL Name]"
        }
    ]
},
{
    "Namespace": "AWS/RDS",
    "MetricName": "CPUUtilization",
    "Dimensions": [
        {
            "Name": "DBInstanceIdentifier",
            "Value": "[RDS for MSSQL Name]"
        }
    ]
}

이제, 필요한 metric 정보에 대해서 확인 하였으니 수집해보자. 아래 metric 수집 명령어(get-metric-statistics)를 사용하면 수집된 metric정보가 csv파일로 저장된다.

aws cloudwatch get-metric-statistics --namespace AWS/RDS --dimensions Name=DBInstanceIdentifier,Value=[RDS for MSSQL Name] --metric-name=CPUUtilization --period 3600 --statistics Average --start-time 2021-01-01T00:00:00.000Z --end-time 2021-02-01T00:00:00.000Z | jq -r '.Datapoints[] | [.Timestamp, .Average, .Unit] | @csv' | cat > cpu.csv

aws cloudwatch get-metric-statistics --namespace AWS/RDS --dimensions Name=DBInstanceIdentifier,Value=[RDS for MSSQL Name] --metric-name=ReadIOPS --period 3600 --statistics Average --start-time 2021-01-01T00:00:00.000Z --end-time 2021-02-01T00:00:00.000Z | jq -r '.Datapoints[] | [.Timestamp, .Average, .Unit] | @csv' | cat > read.csv

aws cloudwatch get-metric-statistics --namespace AWS/RDS --dimensions Name=DBInstanceIdentifier,Value=[RDS for MSSQL Name] --metric-name=WriteIOPS --period 3600 --statistics Average --start-time 2021-01-01T00:00:00.000Z --end-time 2021-02-01T00:00:00.000Z | jq -r '.Datapoints[] | [.Timestamp, .Average, .Unit] | @csv' | cat > write.csv

생성한 csv 파일은 AWS CLI Storage에 저장 되어있다. 우측 상단의 Actions > Download File 메뉴를 사용하면 로컬 환경으로 다운 받을 수 있다.

3개의 파일 모두 다운 받아서 DTU 계산에 필요한 형식으로 맞춰준다.

  • % Processor Time = CPUUtilization
  • Disk Reads/sec = ReadIOPS
  • Disk Writes/sec = WriteIOPS
  • Log Bytes Flushed/sec에 해당하는 데이터가 없기때문에 0으로 넣어준다.

그림과 같은 형태로 구성될 것이다.

RDS for MSSQL metrics for DTU Calculator

이제, 한땀 한땀 준비한 데이터를 Azure DTU Calculator에 넣어 주기만 하면 되는데 한가지 주의할 점이 있다. 우리가 얻은 데이터는 RDS for MSSQL Server에 대한 정보이다. 때문에 여러 DB Instance에 대한 계산을 하는 Elastic Database 메뉴를 선택하여 계산 하도록 해야한다.

계산이 끝나면 아래와 같이 나오는데 이 결과를 통해서 AWS RDS에서 사용하던 성능 수준을 Azure SQL에서 그대로 사용하기 위해서 구성 해야하는 Service Tire/Performance Level을 확인 할 수 있다.

DTU Result

MS-SQL Cursor 사용하기

SQL 작업을 하다보면 조회된 쿼리 결과에 대해서 행 단위 작업이 필요할때가 있다. 이때, Cursor를 사용하면 효율적으로 처리 할 수 있다.

Cursor Command

  • DECLARE: Cursor에 관련된 선언을 하는 명령
  • OPEN: Cursor가 Query결과의 첫번째 Tuple을 포인트로 하도록 설정하는 명령
  • FETCH: Query 결과의 Tuple들 중 현재의 다음 Tuple로 커서를 이동시키는 명령
  • CLOSE: Query 수행을 모두 마친 후 Cursor를 닫기 위한 명령
  • DEALLOCATE: Close된 Cursor의 자원을 반환하는 명령

Cursor life cycle

Cursor Example

DECLARE @pColum_1 NVARCHAR(100), @pColum_2 INT, @pColum_3 BIT
DEClARE pCursor CURSOR
FOR
   select * from [Target Table]

OPEN pCursor

FETCH NEXT FROM pCursor INTO @pColum_1, @pColum_2, @pColum_3

WHILE(@@FETCH_STATUS <> -1)
   BEGIN
      update [Target Table] SET Colum = @Colum_1 + 1
      where Colum_2 = @pColum_2 and Colum_3 = @pColum_3

      FETCH NEXT FROM pCursor INTO @pColum_1, @pColum_2, @pColum_3
   END

CLOSE pCursor
DEALLOCATE pCursor

-유의사항-

Cursor를 사용하게 되면 내부적인 임시테이블을 사용하기 때문에 많이 사용하게 되면 DB성능에 영향을 미치게 된다. 되도록 사용량이 적은 시간때이거나 적은양의 데이터를 처리할 때 사용하는 것이 바람직 하다.