본문 바로가기

Backend Study/ELK Stack

[Elastic Search ] Logstash - mysql 연동하기

mysql의 데이터를 사용해서 elastic search에서 검색을 하기 위해서는 elastic search 에 우리의 데이터베이스에 있는 데이터들을 담는 작업을 따로 해줘야한다. 그리고 elastic search와 database를 일정 간격으로 동기화해주는 작업은 필수이다!

이를 위한 도구로서 logstash를 사용할 것이다. 

 

logstash가 무엇인지는 아래 링크 참고하자. 

 

 

[Elastic Search] ELK 로컬환경에 설치하기, 버전 변경하기 (for MAC)

검색 시스템을 구축하기 위해 먼저 ELK Stack 환경을 구성하려고한다. (ELK는 Elastic Search와 Logstash, Kibana를 앞글자만 딴 단어이다.) 스럽 ELK Stack 구성 Java 11 Elastic Search 7.0 Logstash 7.0 Kibana 7.0 Elasticsearch

blog-sluv.tistory.com

 

 

1.  필요한 프로그램 설치하기 

logstash의 input에 mysql의 데이터를 담기 위해서는 jdbc와 jdbc를 위한 mysql 드라이버가 필요하다. 

 

우선 jdbc를 위한 mysql 드라이브를 설치해준다. 

 

https://dev.mysql.com/downloads/connector/j/

 

MySQL :: Download Connector/J

MySQL Connector/J 8.0 is highly recommended for use with MySQL Server 8.0, 5.7 and 5.6. Please upgrade to MySQL Connector/J 8.0.

dev.mysql.com

위 링크에서 설치할 수 있다.

 

다음으로 JDBC Integration Plugin을 설치한다. 

https://www.elastic.co/guide/en/logstash/current/plugins-integrations-jdbc.html

 

JDBC Integration Plugin | Logstash Reference [8.7] | Elastic

 

www.elastic.co

 

직접 다운로드 해도 되고 아래 명령어를 통해서도 가능하다. 

 

bin/logstash-plugin install logstash-integration-jdbc

# 설치확인
bin/logstash-plugin list | grep logstash-integration

 

2. *.conf 파일 작성하기 

 

logstach에 데이터를 input (동기화) 하기 위해서는 *.conf 파일을 작성해야한다. logstash 실행 시 이 파일을 함께 작동시켜줘야한다. 

하나의 index 당 하나의 conf 파일을 작성하면 되고, 스럽에서는 아이템 검색, 질문 검색, 유저 검색 이렇게 3가지의 index를 가져서 3개의 conf 파일을 작성해줬다. 

 

/logstash-7.0.0/config 경로에 conf 파일을 넣어뒀다. 

conf 파일의 형식은 아래와 같다. 

 

input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "데이터 가져오는 쿼리문 (select 문)"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
output {
  # stdout { codec =>  "rubydebug"}
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

 

 

tracking_column

LogStash가 MYSQL로부터 읽은 마지막 문서를 추적하는데 사용된다. 

 

unix_ts_in_secs 

SELECT 문이 생성하는 필드이고, 표준 Unix 타임스탬프로서 "modification_time" 을 포함한다. tracking_column에 의해 참조된다. 일반 타임스탬프로 사용되기보다는 진행상황을 추적할 때 사용된다.

 

sql_last_value 

LogStash의 폴링 루프의 현재 반복 계산을 위한 시작점을 포함하는 기본 제공 매개변수이며, 위의 jdbc 입력 구성의 SELECT 문에서 참조된다. 이것은 "unix_tx_in_secs"의 가장 최근 값으로 설정되며, .logstash_jdbc_last_run으로부터 읽는다. 

LogStash의 폴링 루프에서 실행되는 MYSQL 쿼리가 반환하는 문서를 위한 시작점으로 사용된다. 쿼리에서 사용된다면, 구분지어줘야한다.

 

schedule

얼마나 자주 MySQL을 폴링해야하는지 지정한다. 

 

filter 

MySQL 레코드로부터 "id"의 값을 "_id"라고 하는 메타데이터 필드로 그냥 복사한다. 그리고 이것은 문서가 정확한 "_id" 값을 가지고 ElasticSearch로 작성되도록 하기 위해서 출력에서 참조된다. 메타데이터 필드를 사용하면 이 임시값이 새 필드로 생성되지 않도록 해준다. 

 

output

각 문서가 ElasticSearch로 작성되고, 우리가 필터 섹션에서 생성한 메타데이터 필드로부터 가져오는 "_id"에 할당되도록 지정한다. 디버깅의 도움으로 활성화될 수 있는 주석처리된 rubydebug 출력도 있다. 

 

modification_time < NOW():

데이터를 정확하게 가져오기에 필요한 필드이다. 

 

 

적용하기 

 

 

input {
  jdbc {
    jdbc_driver_library => "/Users/kimkyuri/Documents/elasticsearch/logstash-7.0.0/logstash-core/lib/jars/mysql-connector-j-8.0.32.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "스럽 디비 주소" 
    jdbc_user => "스럽 디비 유저 아이디"
    jdbc_password => "스럽 디비 비밀번호"
    jdbc_paging_enabled => true
    tracking_column => "user_id"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "select user_id, nickname, nickname as nickname_ngram,
        nickname as nickname_kr_eng2kor, nickname as nickname_kr_chosung, nickname as nickname_kr_jamo,
        created_at, user_status from user"
  }
}
filter {

}
output {
  
  elasticsearch {
      hosts => ["http://localhost:9200"]
      index => "search3"
      document_id => "%{[user_id]}"
  }
  stdout { codec =>  "rubydebug"}
}

 

스럽의 DB정보를 입력해주고, statement에서 mysql에서 가져올 데이터를 쿼리문으로 정의하고있다. schedule은 우선 5초로 지정했으며, 기획에 따라서 수정할 예정이다. 

index이름은 search3으로 지정하고 있고, document_id는 유저 테이블에서 사용하고 있는 pk값으로 설정하기 위해 따로 명시해주었다. (웬만하면 실제로 사용하고 있는 pk값으로 설정해주는게 좋다. )

 

3. Rdbms와 ElasticSearch 연동 조건 

원할한 검색 엔진을 만들기 위해서는 JDBC 입력 플로그인에 있는 Logstash를 이용하여 ElasticSearch와 MYSQL 동기화를 유지해야한다. 정확히 작동하려면 RDBMS에 몇가지 조건이 필요하다! 

 

조건 1.  MySQL 문서가 ElasticSearch로 작성될 때, ElasticSearch의 "_id" 필드는 MySql "id" 필드로 설정되어야한다. 이렇게 설정하면 MySQL 레코드와 ElasticSearch 문서간에 직접 매핑을 제공해준다.

레코드가 MySQL에서 업데이트 되는 경우, 연결된 문서 전체가 ElasticSearch에서 덮어쓰기된다. (효율성은 떨어지는 작업이다.) 

 

조건 2. MySQL에 레코드가 삽입되거나 업데이트되면, 그 레코드는 업데이트 또는 삽입시간을 포함하는 필드(칼럼)를 가져야한다. (이는 서비스의 마케팅이나 데이터적 차원에서도 긍정적인 기능을 한다.) LogStash는 이 필드(칼럼)들이 폴링 루프의 마지막 반복 계산 이래 수정되거나 삽입된 문서만 요청할 수 있도록 하는데 이용한다. LogStash 폴링 루프의 이전 반복 계산에서 받았던 마지막 기록보다 새로운 업데이트나 삽입이 있는 레코드만 요청해야한다는 것을 알고있다. 

 

 

4. Logstash 실행하기 

 

아래 명령어로 실행할 수 있다.

 

./bin/logstash -f [실행경로]/elasticsearch/logstash-7.0.0/config/[index명].conf

 

 

아래 그림과 같은 결과가 보이면 성공적으로 설치된 것이다 ! (현재 test 데이터 밖에 없어서 저런 화면이 뜬다.. 😊 )

logstasth와 kibana는 꼭 elastic search가 실행되어야 정상 작동한다. 오류가 보이면 elasticsearch가 설치되어있는지 확인해보자. 

 

 

 

 

5.  데이터 삭제하기

데이터베이스에서 데이터가 삭제되더라도 위에서 사용한 동기화 방법으로는 elastic Search에 적용되지 않는다. ( 실제 서비스에서 데이터가 삭제되어도 삭제하지 않는 경우가 많은 것도 사실이다! )

"is_deleted" 처럼 데이터의 상태를 나타내는 컬럼을 두고, LogStatsh를 통해 그 변경사항이 ElasticSearch에 적용되도록 만들어주면된다. 대신 MySQL 쿼리는 "is_deleted"가 참인 레코드/문서를 제외할 수 있도록 작성되어야한다. 

 

 

logstash와 데이터베이스를 연동하고, 데이터를 가져오는 것은 검색 엔진에서 매우 중요한 과정이다. 그러므로 꼭 사용법을 숙지하고 있자 ! 

 

 

레퍼런스) 

https://www.elastic.co/kr/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash

 

Logstash와 JDBC를 사용해 RDBMS와 Elasticsearch의 동기화를 유지하는 방법

Elasticsearch가 기존 관계형 데이터베이스(RDBMS)와 더불어 검색 솔루션으로 배포될 때, 두 데이터 저장소 간에 데이터 동기화를 유지하는 것이 중요합니다. Logstash JDBC 입력 플러그인을 이용해 손쉽

www.elastic.co