elasticsearch 성능 최적화 및 보안 위협 분석

SECURITY/보안기술(SECOPS) / /
반응형

elasticsearch 구성  

 

IDC내 모든 중요 보안 이벤트 로그 -> arcsight -> ELK Stack (Elasticsearch +Logstah + Kibana)

 

Data  

 

Index Rate : 평균 5400/s

평일 Event : 

358,177,040 hits  (GET /_cat/count/xxxx-2018.05.29?v)

일별 index 용량 : 평균 100G~120G

 

이슈1

 

arcsight 에서 log format을 CEF 형식으로 -> elastic 으로 전송시 엘라스틱 디스크 용량 감당 노답.

예) arcsight 일별 100G VS elasticsearch 300G ~ 400G 정도 

 

해결

 

arcsight 에서 엘라스틱으로 data 전송시 cef, syslog 두가지 형태로 분류하여 logstash에 전송

(예: 이벤트 용량이 많은 것은 syslog형태로 , 이벤트 용량이 적은것은 cef형태로 전송 / 선택은 자유)

 

밑에서 설명하겠지만 필자의 경우는 mapping 하기가 일일이 귀찬아서 위와 같은 방법 선택

logstash 에서 arcsight module을 install 하면 자동으로 mapping이 되기 때문에 편함

 

Logstash 환경 설정 변경

 

arcsight -> cef형식이 아닌 syslog(raw tcp) 형태로 엘라스틱으로 전송. 

 

logstash.conf 수정

syslog 형태 일때 filter 정의 (불필요 필드 삭제등)

 

filter {

 if [type] == "syslog" {

        grok {

                match => [ "message", "<(?<ruleID>.*)>(?<msg>.*)" ]

 }

 

  kv { source => "msg" }

 

  geoip {

      source => "src"

      target => "source"

    }

  geoip {

      source => "dst"

      target => "destination"

    }

  geoip {

      source => "srcip"

      target => "source"

    }

  geoip {

      source => "dstip"

      target => "destination"

    }

 

  mutate {

      rename => { "src" => "sourceAddress" }

      rename => { "dst" => "destinationAddress" }

      rename => { "srcip" => "sourceAddress" }

      rename => { "dstip" => "destinationAddress" }

      rename => { "src_port" => "sourcePort" }

      rename => { "dst_port" => "destinationPort" }

      rename => { "srcport" => "sourcePort" }

      rename => { "dstport" => "destinationPort" }

      rename => { "devname" => "deviceHostName" }

      rename => { "status" => "deviceAction" }

      rename => { "action" => "deviceAction" }

      rename => { "rcvd" => "bytesIn" }

      rename => { "rcvdbyte" => "bytesIn" }

      rename => { "sent" => "bytesOut" }

      rename => { "sentbyte" => "bytesOut" }

    }

 

  mutate {

    remove_field => [ "msg", "message", "host", "port", "proto", "device_id", "vd", "src_int", "dst_int", "dir_disp", "log_id", "duration", "dstintf", "logid", "sessionid", "srcintf", "time", "date", "devid", "crscore", "craction", "type", "outintf", "srccountry", "dstcountry", "src_country", "dst_country", "tags", "SN" ]

 }

 

}  

}

 
 
-------------------------------------------------------------------------------------------
 
 
arcsight 연동 -> cef 형식일 경우 filter 수정시에는 아래 파일을 수정
/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/x-pack-6.2.4-java/modules/arcsight/configuration/logstash/arcsight.conf.erb
 
filter {
 
  # Map the @timestamp with the event time, as recorded in deviceReceiptTime
 
  date {
    match => [ "deviceReceiptTime", "MMM dd yyyy HH:mm:ss", "MMM  d yyyy HH:mm:ss", "UNIX_MS" ]
  }
 
  # To map the attacker Geo IP if plausible
 
  geoip {
    source => "sourceAddress"
    target => "source"
  }
 
  # To map the target Geo IP if plausible
 
  geoip {
    source => "destinationAddress"
    target => "destination"
  }
 
  # To map the log producing device Geo IP if plausible
 
  geoip {
    source => "deviceAddress"
    target => "device"
  }
 
  prune {
    blacklist_names => [ "^ad.+", "^A.+", "^B.+", "^AK1+", "^deviceCustom+", "^agent+", "^deviceEvent+", "^file+", "^cfg+", "^category+", "^flex+", "^xau+", "^*ZoneURI$", "^*boundInterface$", "^*.TimeZone$" ]
  }
 
  mutate {
    remove_field => [ "msg", "message", "tags", "requestContext", "SN", "deviceReceiptTime", "deviceVersion", "deviceProcessName", "cefVersion", "type", "sourceUserName", "deviceEventCategory", "host", "port", "deviceDirection", "generatorID" ]
 }
 
}
 
* prune 필터 적용시에는 logstash-plugin install logstash-filter-prune  플러그인을 설치해야함.
위와 같이 이벤트 용량별로 cef, syslog 형태의 두가지로  input을 받고 필요없는 필드들을 삭제하여 디스크 공간을 크게 확보 할수 있었음.

 

이외 로그 유실 염려 차원에서 logstash Queuing (logstash.yml) 테스트를 해보았으나, 

오히려 엘라스틱 측면에서 indexing 성능이 떨어지는 이슈가 발생 (튜닝을 잘못 했을수도...)

가급적이면 앞단에 TPS가 우수한  kafka 같은 메세징 시스템을 구성하는것이 바람직...

 

엘라스틱 6.x 대에서는 logstash를 kibana를 통하여 간단하게 모니터링이 가능하다 

 

설정은 logstash.conf에서 

xpack.monitoring.enabled: true

xpack.monitoring.elasticsearch.url: http://10.10.10.x:9200

 

 

 

 

이슈 2

Log mesaage :  WARN GC overhead, spent [720ms] collecting in the last ......

 

kibana 에서 dashboard reload시 주로 발생

원인은 dashboard 그래프에 aggregation 및 sorting 등의 작업이 많아서 발생...

 

개선 (그래도 가끔 발생) / 메모리 이슈에 가장 최선책은 data 노드 증설

indices.fielddata.cache.size : 80%

indices.breaker.fielddata.limit: 84%

not_analyzed 필드 생성 후 정렬
coordinate node 분리?
 
* jvm heap 용량 대비 60% -> 80%로 변경
 
변경후 data node jvm 관련 그래프에서 모니터링 

이외 성능 튜닝

6.x 버전 부터는 threadpool -> thread_pool 로 변경
 
thread pool 관련 정보 설정
 
curl -XGET '10.10.10.x:9200/_nodes/thread_pool?pretty
curl -XGET '10.10.10.x:9200/_nodes/stats/thread_pool?pretty'
curl -XGET '10.10.10.x9200/_cat/thread_pool?pretty'
 
엘라스틱 인덱싱 성능과 밀접한 관련이 있으므로 충분한 검증 후 적용 해야함
(서버 리소스 cpu core, memory 와 관련)
 
thread_pool.index.queue_size: 1200  (cpu core 수와 연관)
thread_pool.get.queue_size: 20000
thread_pool.search.queue_size: 20000
thread_pool.bulk.queue_size: 20000
indices.memory.index_buffer_size: 25%
 
템플릿 적용
curl -XPUT '10.10.10.x:9200/_template/template_syslog' -H 'Content-Type: application/json' -d '  
 { 
   "template" : "syslog-*",
    "mappings": {
    "doc": {
      "_source": {
        "enabled": false
      },
      "_all": {
        "enabled": false
      }
    }
    },
   "settings" : { "number_of_replicas" : "0", "number_of_shards" : "16", "refresh_interval" : "30s" }
}'
 
_source false 시 kibana 에서는 보이지 않음
 
샤드(shard) 갯수는 가이드라인이 있으나, 사실 운영해보면서 결정하는게 최선책 일듯.....
 
dynamic mapping 예
syslog type으로 엘라스틱으로 받을때 ip 필드 같은경우 mapping을 안해주면 string type으로 저장됨.
이로 인해 subnet 대역으로 검색시 source.ip:"192.168.10.0/24" 검색 불가
 
 
{
  "template" : "syslog-*",
  "mappings": {
    "doc": {
      "properties": {
        "source.ip": {
          "type": "ip"
        },
        "destination.ip": {
          "type": "ip"
        },
        "transip": {
          "type": "ip"
        },
        "tran_sip": {
          "type": "ip"
        },
        "tranip": {
          "type": "ip"
        },
        "tunnel_ip": {
          "type": "ip"
        },
        "tunnelip": {
          "type": "ip"
        },
        "source.port": {
          "type": "integer"
        },
        "destination.port": {
          "type": "integer"
        },
       "destination.geo": { "properties": { "location": { "type": "geo_point" }}},
       "source.geo": { "properties": { "location": {"type": "geo_point" }}}
        }
      }
    }
 }
 
다양한 이슈 
 
* query 결과 10,000 row 이상을 원할때 -> scroll api 사용 -> _search?scroll=1m&pretty'
  index.max_result_window" : "10000000" 
 
* shard 갯수 정의 (매우 어려움)
 
shard 가 너무 많아도 문제 / 너무 적어도 문제 
shard 당 data size가 너무 크면 성능 저하 
 
최선책은 운영하면서 다양한 성능 모니터링으로 개선이 정답
 
참고 자료 
https://www.elastic.co/kr/blog/how-many-shards-should-i-have-in-my-elasticsearch-cluster?ultron=kr-may-newsletter=newsletter&hulk=email&mkt_tok=eyJpIjoiWXpObE1USTVZMlExTldOaiIsInQiOiJMK2FBeFRpY1Iyd0ZpTTBhaE9UZzNxQm5OTzVHU1JoNEhhcHhVNWp2RjhRTzVVVTdKMWh2cHlWRTNjNTl4WjkwZm0yV2RSNnE3K1V0OG1OeE5NMVwvbURGejBXVUYrODljaUdtWGVTUGliVnlNVEU3VEhsZTRyOGxjUFJMT2hRcmsifQ%3D%3D
 
참고 
  • Shard 한개(Lucene 인덱스 하나)는 최대 21억건 문서, 2740억건의 고유 term까지만 허용
  • 따라서 데이터 증강량을 판별하여 이에 맞게 Sharding 수준을 정해야 함
  • 샤드당 30G 이하로 설정 (<- 딱히 정답은 없으나 여러가지 가이드라인 문서들 참고)
* kibana -> reporting 에서 좀더 많은 row수를 원할때 
    무료 버전에서는 csv 형태로 파일 다운로드 가능
  기본값은 파일 사이즈가 10M (10만 row 정도 / search field 에 따라 차이남)
 
kibana.yml 수정
xpack.reporting.csv.maxSizeBytes: 31457280  <- 30M 설정시
xpack.reporting.queue.timeout: 60000
xpack.reporting.queue.pollInterval: 8000
 
위와 같이 설정 변경후 268,576 row / 파일사이즈 25M 까지 reporting 기능으로 다운가능
(더 많은 row 수를 원할땐 어차피 파일 용량 limit 문제 이므로 / deviceHostName 등 보지 않을 field를 search시 빼면됨)

 

 

공식 문서에서는 위와 같이 설정값 조정으로 가능하나, 설정값이 너무 크면 elastic 클러스터에 영향을 준다고...

 
아래는 디폴트 10M 에서 실패후 -> 위와 같이 설정 변경후 성공 예제

 

 

 

반응형
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기