티스토리 뷰
파일 입력 플러그인과 TCP, UDP 입력 플러그인과 같은 여러가지 입력 플러그인을 사용해 Logstash로 데이터를 수집해본다.
Heartbeat
Heartbeat 플러그인은 Logstash 플러그인 중 가장 간단하지만 가장 유용한 플러그인 중 하나이다. 해당 플러그인을 사용해 Logstash가 문제 없이 작동하고 실행중인지 확인할 수 있다. 작동 방식은 Elasticsearch 혹은 다른 대상에게 정기적으로 메세지를 전송하는 것이다. 사용 방법은 다음과 같이 구성해주면 된다.
input {
heartbeat {
message => "ok"
interval => 5
type => "heartbeat"
}
}
output {
if [type] == "heartbeat" {
elasticsearch {
hosts => "http://localhost:9200"
index => "heartbeat"
}
}
stdout {
codec => "rubydebug"
}
}
해당 conf 파일을 logstash로 실행시켜주면 설정한 interval 값만큼의 시간을 두고 새로 데이터가 전송되는 것을 확인할 수 있다. 이 데이터는 실제로 elasticsearch에 index화 되기 때문에 search도 가능하다.
만약 위의 conf file의 input.heartbeat.message를 "epoch"로 바꾼다면 이벤트 시간이 "clock"이라는 필드에서 "epoch 값"으로 입력된다. 이 "clock"이라는 필드를 사용하면 Logstash에서 "이벤트의 실제 생성 시간 - 수집 시간"과 같은 방식으로 수집 지연 시간을 계산하는 등 계산해서 delay 혹은 혼잡한 파이프라인을 계산할 때 매우 유용하다.
또한 message 설정을 "sequence"로 설정할 수 있다. 이는 기본적으로 "clock"필드에 차례대로 증가하는 숫자를 생성한다. 이렇게 하면 "clock"이라는 필드가 1부터 차례대로 증가하여 저장된다. 이는 순서가 있기 때문에 데이터의 누락 등을 파악하는데 유용하다.
Generator Input 및 DLQ(Dead-Letter Queue)
테스트와 같을 때 사용을 위해서 삽입할 특정 데이터를 가지고 있는 것이 좋다. 물론 스크립트나 프로그램을 사용하여 이러한 데이터를 생성하고 파일로 보낼 수 있지만 Logstash에서 더 간단하게 처리할 수 있는 방법이 있다. Generator Plugin을 사용하여 무작위 또는 사용자 지정 로그 이벤트를 생성할 수 있다.
input {
generator {
lines => [
'{"id": 1,"first_name": "Ford","last_name": "Tarn","email": "ftarn0@go.com","gender": "Male","ip_address": "112.29.200.6"}',
'{"id": 2,"first_name": "Kalila","last_name": "Whitham","email": "kwhitham1@wufoo.com","gender": "Female","ip_address": "98.98.248.37"}'
]
count => 0
codec => "json"
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "generator"
}
stdout {
codec => "rubydebug"
}
}
지금까지는 Logstash를 사용하여 이벤트를 성공적으로 처리한 사례를 살펴봤다. 하지만 이벤트를 처리하는데 실패하면 어떻게 될까? 일반적으로는 이벤트를 잃는다. 하지만 잃지 않기 위해서 필요한 조치를 취하는 것이 장기적으로 도움이 될 것이다. 이때 사용할 수 있는 것이 DLQ(Dead-Letter Queue) Plugin이다. 이 플러그인을 사용하면 실패 처리된 이벤트를 한 곳에 수집할 수 있다. 수집한 이벤트를 Logstash를 통해 해당 document를 처리하고 적절한 조치 후에 다시 Elasticsearch로 Index화 시킬 수 있다. DLQ는 다음과 같은 플로우로 진행된다.
DLQ 플러그인의 사용을 위해서는 몇가지 작업이 선행되어야 한다. 첫 번째로는 DLQ를 활성화 해야하고, 두 번째는 경로를 설정하는 것이다. 마지막으로는 명령어를 실행해 DLQ를 보관할 디렉토리를 생성하는 것이다.
- dlq 디렉토리 생성
mkdir /home/student/dlq
- logstash install 시 사용한 유저로 권한을 변경
sudo chown -R logstash:logstash /home/student/dlq
- DLQ 플러그인 활성화 및 경로 설정
sudo vim /etc/logstash/logstash.yml
dead_letter_queue.enable
을 찾아서 주석을 제거하고true
로 변경path.dead_letter_queue
를 찾아서 주석을 제거하고 위에서 생성한 dlq 디렉토리 경로로 절대경로 설정
이후 dlq.conf 파일을 다음과 같이 설정한 후 실행시켜주면 된다.
input {
dead_letter_queue {
path => "/home/student/dlq"
# commit_offsets => "true"
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "dlq-01"
}
stdout {
codec => "rubydebug"
}
}
이렇게 하면 dlq 디렉토리에 저장된 문서가 elasticsearch로 index화되어 저장된다. 저장된 후에도 dql 디렉토리에는 기존의 문서가 지워지지 않고 남아있기 때문에 elasticsearch로 index화 된 document가 더이상 read되는것을 원치 않는다면 input.dead_letter_queue.commit_offsets 값을 true
로 설정해주면 된다.
HTTP Poller
HTTP Endpoint를 주기적으로 가져와 response를 Elasticsearch에 저장하는데 사용할 수 있는 유용한 플러그인이다. 구성은 다음과 같이 구성할 수 있다.
input {
http_poller {
urls => {
external_api => {
method => post
url => "https://jsonplaceholder.typicode.com/posts"
body => '{ "title": "foo", "body": "bar", "userId": "1"}'
headers => {
"content-type" => "application/json"
}
}
}
tags => "external-api"
request_timeout => 100
schedule => {"every" => "5s"}
codec => "json"
metadata_target => "http_poller_metadata"
}
http_poller {
urls => {
es_health_status => {
method => get
url => "http://localhost:9200/_cluster/health"
headers => {
Accept => "application/json"
}
}
}
tags => "es_health"
request_timeout => 60
schedule => { cron => "* * * * * UTC"}
codec => "json"
metadata_target => "http_poller_metadata"
}
}
output {
if "es_health" in [tags] {
elasticsearch{
hosts => ["localhost:9200"]
index => "http-poller-es-health"
}
}
if "external-api" in [tags] {
elasticsearch{
hosts => ["localhost:9200"]
index => "http-poller-api"
}
}
stdout {
codec => "rubydebug"
}
}
이 외에도 유용한 플러그인이 존재하겠지만 모두 비슷한 방법으로 구성이 가능하니 필요시에 적절하게 찾아보고 기존것을 참고해서 구성하면 될 것 같다.
'Data Engineering > Elasticsearch' 카테고리의 다른 글
Elasticsearch와 Apache Hadoop (0) | 2022.11.24 |
---|---|
[Elasticsearch] Logstash를 사용한 Syslog (0) | 2022.11.24 |
[Elasticsearch] Logstash Grok (0) | 2022.11.24 |
[Elasticsearch] Logstash와 MySQL (0) | 2022.11.24 |
[Elasticsearch] Logstash란 (0) | 2022.11.23 |
- Total
- Today
- Yesterday
- CSAPP
- 파이썬
- DP
- mahout
- HDFS
- Hadoop
- 프로그래머스
- GROK
- BOJ
- sqoop
- 이코테
- heapq
- Espher
- CS
- cka
- OS
- Elasticsearch
- 백준
- oozie
- elasticsaerch
- 빅데이터를지탱하는기술
- logstash
- Flutter
- 네트워크
- kubernetes
- Algorithm
- DFS
- kafka
- Python
- 빅데이터
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 |