티스토리 뷰

파일 입력 플러그인과 TCP, UDP 입력 플러그인과 같은 여러가지 입력 플러그인을 사용해 Logstash로 데이터를 수집해본다.

Heartbeat

Heartbeat 플러그인은 Logstash 플러그인 중 가장 간단하지만 가장 유용한 플러그인 중 하나이다. 해당 플러그인을 사용해 Logstash가 문제 없이 작동하고 실행중인지 확인할 수 있다. 작동 방식은 Elasticsearch 혹은 다른 대상에게 정기적으로 메세지를 전송하는 것이다. 사용 방법은 다음과 같이 구성해주면 된다.

input {
  heartbeat {
    message => "ok"
    interval => 5
    type => "heartbeat"
  }
}

output {
  if [type] == "heartbeat" {
     elasticsearch {
     hosts => "http://localhost:9200"
     index => "heartbeat"
      }
  }
 stdout {
  codec => "rubydebug"
  }

}

해당 conf 파일을 logstash로 실행시켜주면 설정한 interval 값만큼의 시간을 두고 새로 데이터가 전송되는 것을 확인할 수 있다. 이 데이터는 실제로 elasticsearch에 index화 되기 때문에 search도 가능하다.
만약 위의 conf file의 input.heartbeat.message를 "epoch"로 바꾼다면 이벤트 시간이 "clock"이라는 필드에서 "epoch 값"으로 입력된다. 이 "clock"이라는 필드를 사용하면 Logstash에서 "이벤트의 실제 생성 시간 - 수집 시간"과 같은 방식으로 수집 지연 시간을 계산하는 등 계산해서 delay 혹은 혼잡한 파이프라인을 계산할 때 매우 유용하다.
또한 message 설정을 "sequence"로 설정할 수 있다. 이는 기본적으로 "clock"필드에 차례대로 증가하는 숫자를 생성한다. 이렇게 하면 "clock"이라는 필드가 1부터 차례대로 증가하여 저장된다. 이는 순서가 있기 때문에 데이터의 누락 등을 파악하는데 유용하다.

Generator Input 및 DLQ(Dead-Letter Queue)

테스트와 같을 때 사용을 위해서 삽입할 특정 데이터를 가지고 있는 것이 좋다. 물론 스크립트나 프로그램을 사용하여 이러한 데이터를 생성하고 파일로 보낼 수 있지만 Logstash에서 더 간단하게 처리할 수 있는 방법이 있다. Generator Plugin을 사용하여 무작위 또는 사용자 지정 로그 이벤트를 생성할 수 있다.

input {
      generator {
        lines => [
          '{"id": 1,"first_name": "Ford","last_name": "Tarn","email": "ftarn0@go.com","gender": "Male","ip_address": "112.29.200.6"}',
          '{"id": 2,"first_name": "Kalila","last_name": "Whitham","email": "kwhitham1@wufoo.com","gender": "Female","ip_address": "98.98.248.37"}'
        ]
        count => 0
        codec =>  "json"
      }
    }

output {
     elasticsearch {
     hosts => "http://localhost:9200"
     index => "generator"
  }
  stdout {
  codec => "rubydebug"
}
}

지금까지는 Logstash를 사용하여 이벤트를 성공적으로 처리한 사례를 살펴봤다. 하지만 이벤트를 처리하는데 실패하면 어떻게 될까? 일반적으로는 이벤트를 잃는다. 하지만 잃지 않기 위해서 필요한 조치를 취하는 것이 장기적으로 도움이 될 것이다. 이때 사용할 수 있는 것이 DLQ(Dead-Letter Queue) Plugin이다. 이 플러그인을 사용하면 실패 처리된 이벤트를 한 곳에 수집할 수 있다. 수집한 이벤트를 Logstash를 통해 해당 document를 처리하고 적절한 조치 후에 다시 Elasticsearch로 Index화 시킬 수 있다. DLQ는 다음과 같은 플로우로 진행된다.


DLQ 플러그인의 사용을 위해서는 몇가지 작업이 선행되어야 한다. 첫 번째로는 DLQ를 활성화 해야하고, 두 번째는 경로를 설정하는 것이다. 마지막으로는 명령어를 실행해 DLQ를 보관할 디렉토리를 생성하는 것이다.

  • dlq 디렉토리 생성
    • mkdir /home/student/dlq
  • logstash install 시 사용한 유저로 권한을 변경
    • sudo chown -R logstash:logstash /home/student/dlq
  • DLQ 플러그인 활성화 및 경로 설정
    • sudo vim /etc/logstash/logstash.yml
    • dead_letter_queue.enable 을 찾아서 주석을 제거하고 true로 변경
    • path.dead_letter_queue 를 찾아서 주석을 제거하고 위에서 생성한 dlq 디렉토리 경로로 절대경로 설정

이후 dlq.conf 파일을 다음과 같이 설정한 후 실행시켜주면 된다.

input {
  dead_letter_queue {
    path => "/home/student/dlq"
    # commit_offsets => "true"
  }
}

output {
   elasticsearch {
     hosts => "http://localhost:9200"
     index => "dlq-01"
  }
stdout {
  codec => "rubydebug"
}
}

이렇게 하면 dlq 디렉토리에 저장된 문서가 elasticsearch로 index화되어 저장된다. 저장된 후에도 dql 디렉토리에는 기존의 문서가 지워지지 않고 남아있기 때문에 elasticsearch로 index화 된 document가 더이상 read되는것을 원치 않는다면 input.dead_letter_queue.commit_offsets 값을 true로 설정해주면 된다.

HTTP Poller

HTTP Endpoint를 주기적으로 가져와 response를 Elasticsearch에 저장하는데 사용할 수 있는 유용한 플러그인이다. 구성은 다음과 같이 구성할 수 있다.

input {
 http_poller {
      urls => {
       external_api => {
          method => post
          url => "https://jsonplaceholder.typicode.com/posts"
          body => '{ "title": "foo", "body": "bar", "userId": "1"}'
          headers => {
           "content-type" => "application/json"
          }
       }
      }
      tags => "external-api"
      request_timeout => 100
      schedule => {"every" => "5s"}
      codec => "json"
      metadata_target => "http_poller_metadata"
 }
 http_poller  {
     urls => {
     es_health_status => {
        method => get
        url => "http://localhost:9200/_cluster/health"
        headers => {
          Accept => "application/json"
          }
        }
     }
     tags => "es_health"
     request_timeout => 60
     schedule => { cron => "* * * * * UTC"}
     codec => "json"
     metadata_target => "http_poller_metadata"
 }

}


output {
    if "es_health" in [tags] {
      elasticsearch{
      hosts => ["localhost:9200"]
      index => "http-poller-es-health"
    }
  }
   if "external-api" in [tags] {
      elasticsearch{
      hosts => ["localhost:9200"]
      index => "http-poller-api"
    }
  }
      stdout {
      codec => "rubydebug"
    }
}

이 외에도 유용한 플러그인이 존재하겠지만 모두 비슷한 방법으로 구성이 가능하니 필요시에 적절하게 찾아보고 기존것을 참고해서 구성하면 될 것 같다.

'Data Engineering > Elasticsearch' 카테고리의 다른 글

Elasticsearch와 Apache Hadoop  (0) 2022.11.24
[Elasticsearch] Logstash를 사용한 Syslog  (0) 2022.11.24
[Elasticsearch] Logstash Grok  (0) 2022.11.24
[Elasticsearch] Logstash와 MySQL  (0) 2022.11.24
[Elasticsearch] Logstash란  (0) 2022.11.23
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/06   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함