input {
kafka {
type => "network"
bootstrap_servers => ["10.7.20.17:9091,10.7.20.17:9092,10.7.20.17:9093"]
topics => "network-log"
group_id => "logstash01"
consumer_threads => 3
max_poll_records => 1000
auto_commit_interval_ms => 5000
enable_auto_commit => true
}
kafka {
type => "session"
bootstrap_servers => ["10.7.20.17:9091,10.7.20.17:9092,10.7.20.17:9093"]
topics => "session-log"
group_id => "logstash01"
consumer_threads => 3
max_poll_records => 1000
auto_commit_interval_ms => 5000
enable_auto_commit => true
}
}
filter {
if [type] == "network" {
grok {
match => {
"message" => [
"%{WORD} %{HOSTNAME:hostname} %%%{DATA:module_type}: %{DATA}=%{IPV4:DevIP}; %{DATA}=%{DATA:srcZoneName};%{DATA}=%{DATA:destZoneName};%{DATA}=%{DATA:Type};%{DATA}=%{DATA:ObjectPolicy};%{DATA}=%{INT:rule_ID};%{DATA}=%{DATA:Protocol};%{DATA}=%{IPV4:srcIPAddr};%{DATA}=%{IPV4:destIPAddr};%{DATA}=%{DATA:IcmpType};%{DATA}=%{DATA:IcmpCode};%{DATA}=%{INT:MatchCount};%{DATA}=%{DATA:Event};",
"%{WORD} %{HOSTNAME:hostname} %%%{DATA:module_type}: %{DATA}=%{IPV4:DevIP}; %{DATA}=%{DATA:srcZoneName};%{DATA}=%{DATA:destZoneName};%{DATA}=%{DATA:Type};%{DATA}=%{DATA:ObjectPolicy};%{DATA}=%{INT:rule_ID};%{DATA}=%{DATA:Protocol};%{DATA}=%{DATA:Application};%{DATA}=%{IPV4:srcIPAddr};%{DATA}=%{INT:srcPortNum};%{DATA}=%{IPV4:destIPAddr};%{DATA}=%{INT:destPortNum};%{DATA}=%{INT:MatchCount};%{DATA}=%{DATA:Event};",
"%{WORD} %{HOSTNAME:hostname} %%%{DATA:module_type}: -%{DATA}=%{IPV4:DevIP}-%{DATA}=%{DATA}=%{DATA}=%{DATA:VirDev}; %{DATA}=%{DATA:srcZoneName};%{DATA}=%{DATA:destZoneName};%{DATA}=%{INT:rule_ID};%{DATA}=%{DATA:policyActType};%{DATA}=%{DATA:protType};%{DATA}=%{IPV4:srcIPAddr};%{DATA}=%{IPV4:destIPAddr};%{DATA}=%{INT:srcPortNum};%{DATA}=%{INT:destPortNum};%{DATA}=%{MONTHNUM2:start_month}%{MONTHDAY:start_day}%{YEAR:start_year}%{HOUR:start_hour}%{MINUTE:start_minutc}%{SECOND:start_second};%{DATA}=%{MONTHNUM2:end_month}%{MONTHDAY:end_day}%{YEAR:end_year}%{HOUR:end_hour}%{MINUTE:end_minutc}%{SECOND:end_second};",
"%{WORD} %{HOSTNAME:hostname} %%%{DATA:module_type}: -%{DATA}=%{IPV4:DevIP}-%{DATA}=%{DATA}=%{DATA}=%{DATA:VirDev}; %{DATA}=%{DATA:srcZoneName};%{DATA}=%{DATA:destZoneName};%{DATA}=%{INT:rule_ID};%{DATA}=%{DATA:policyActType};%{DATA}=%{DATA:protType};%{DATA}=%{IPV4:srcIPAddr};%{DATA}=%{IPV4:destIPAddr};%{DATA}=%{DATA:icmpType};%{DATA}=%{DATA:icmpCode};%{DATA}=%{MONTHNUM2:start_month}%{MONTHDAY:start_day}%{YEAR:start_year}%{HOUR:start_hour}%{MINUTE:start_minutc}%{SECOND:start_second};%{DATA}=%{MONTHNUM2:end_month}%{MONTHDAY:end_day}%{YEAR:end_year}%{HOUR:end_hour}%{MINUTE:end_minutc}%{SECOND:end_second};",
"%{WORD} %{HOSTNAME:hostname} %%%{DATA:module_type}: -%{DATA}=%{IPV4:DevIP}-%{DATA}=%{DATA}=%{DATA}=%{DATA:VirDev}; %{DATA}=%{DATA:atckType};%{DATA}=%{DATA:rcvIfName};%{DATA}=%{IPV4:srcIPAddr};%{DATA}=%{DATA};%{DATA}=%{IPV4:destIPAddr};%{DATA}=%{DATA};%{DATA}=%{INT:atckSpeed};%{DATA}=%{YEAR:atck_year}%{MONTHNUM2:atck_month}%{MONTHDAY:atck_day}%{HOUR:atck_hour}%{MINUTE:atck_minutc}%{SECOND:atck_second}",
"%{WORD} %{HOSTNAME:hostname} %%%{DATA:module_type}: -%{DATA}=%{IPV4:DevIP}; %{DATA}=%{DATA:srcZoneName}; %{DATA}=%{DATA:Protocol}; %{DATA}=%{DATA:TcpFlag}; %{DATA}=%{IPV4:srcIPAddr}; %{DATA}=%{DATA:SndDSLiteTunnelPeer}; %{DATA}=%{DATA:RcvVPNInstance}; %{DATA}=%{IPV4:destIPAddr}; %{DATA}=%{DATA:Action}; %{DATA}=%{YEAR:atck_year}%{MONTHNUM2:atck_month}%{MONTHDAY:atck_day}%{HOUR:atck_hour}%{MINUTE:atck_minutc}%{SECOND:atck_second}",
"%{WORD} %{HOSTNAME:hostname} %%%{DATA:module_type}: -%{DATA}=%{IPV4:DevIP}; %{DATA}=%{DATA:srcZoneName}; %{DATA}=%{DATA:Protocol}; ; %{DATA}=%{IPV4:srcIPAddr}; %{DATA}=%{DATA:SndDSLiteTunnelPeer}; %{DATA}=%{DATA:RcvVPNInstance}; %{DATA}=%{DATA:Action}; %{DATA}=%{YEAR:atck_year}%{MONTHNUM2:atck_month}%{MONTHDAY:atck_day}%{HOUR:atck_hour}%{MINUTE:atck_minutc}%{SECOND:atck_second}"
]
}
}
if "_grokparsefailure" not in [tags] {
if [start_year]{
mutate {
add_field => { "start_time" => "%{start_year}-%{start_month}-%{start_day} %{start_hour}:%{start_minutc}:%{start_second}" }
}
}
if [end_year]{
mutate {
add_field => { "end_time" => "%{end_year}-%{end_month}-%{end_day} %{end_hour}:%{end_minutc}:%{end_second}" }
}
}
if [atck_year]{
mutate {
add_field => { "atck_time" => "%{atck_year}-%{atck_month}-%{atck_day} %{atck_hour}:%{atck_minutc}:%{atck_second}" }
}
}
if [srcZoneName] in ["Untrust","untrust"] or [rcvIfName] == "Vlan-interface501"{
geoip {
database => "/usr/share/logstash/pipeline/GeoLite2-City_20230811/GeoLite2-City.mmdb"
source => srcIPAddr
target => "geoip"
fields => ["country_name","region_name","city_name","location"]
}
} else if [destZoneName] in ["Untrust","untrust"] {
geoip {
database => "/usr/share/logstash/pipeline/GeoLite2-City_20230811/GeoLite2-City.mmdb"
source => destIPAddr
target => "geoip"
fields => ["country_name","region_name","city_name","location"]
}
}
mutate {
remove_field => ["atck_year","atck_month","atck_day","atck_hour","atck_minutc","atck_second","end_year","end_month","end_day","end_hour","end_minutc","end_second","_id","_index","_score","start_year","start_month","start_day","start_hour","start_minutc","start_second"]
}
}
}else if [type] == "session" {
grok {
match => {
message => ["%{NUMBER}> %{NUMBER} %{IP:dev_ip} %{YEAR} %{MONTH} %{MONTHDAY} %{TIME} %{WORD:hostname} - %{DATA}:%{DATA:session_type} %{NUMBER:start_time}\|%{IP:src_ip}\|%{NUMBER:src_port}\|%{IP:nat_ip}\|%{NUMBER:nat_port}\|%{IP:dst_ip}\|%{NUMBER:dst_port}\|%{NUMBER:pro_num}",
"%{NUMBER}> %{NUMBER} %{IP:dev_ip} %{YEAR} %{MONTH} %{MONTHDAY} %{TIME} %{WORD:hostname} - %{DATA}:%{DATA:session_type} %{NUMBER:start_time}\|%{NUMBER:end_time}\|%{IP:src_ip}\|%{NUMBER:src_port}\|%{IP:nat_ip}\|%{NUMBER:nat_port}\|%{IP:dst_ip}\|%{NUMBER:dst_port}\|%{NUMBER:pro_num}"
]
}
}
date {
match => ["start_time", "UNIX"]
target => "start_time"
}
date {
match => ["end_time", "UNIX"]
target => "end_time"
}
mutate {
remove_field => ["_id","_index","_score"]
}
}
}
output {
if "_grokparsefailure" in [tags] {
elasticsearch {
hosts => ["10.7.20.16:9201", "10.7.20.16:9202", "10.7.20.16:9203"]
index => "failed-log-%{+YYYY.MM.dd}"
}
}else if [type] == "network" {
elasticsearch {
hosts => ["10.7.20.16:9201", "10.7.20.16:9202", "10.7.20.16:9203"]
index => "network-log-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["10.7.20.16:9201", "10.7.20.16:9202", "10.7.20.16:9203"]
index => "session-log-%{+YYYY.MM.dd}"
}
}
}