Я начал отправлять журналы Palo Alto в Graylog, и правило потока выбирает их, сопоставляя "
И поток показывает, что он получает события (обратите внимание на «22 сообщения в секунду»):
Тем не менее, когда я нажимаю на поток (или выполняю поиск -> теги: «Пало-Альто»), событий нет
Единственная распространенная проблема, которую я видел в Интернете, - это настройки часового пояса, которые переносят эти события в будущее, но время в нашем отправителе Palo Alto Panorama правильное (PST) и пытается выполнить поиск по абсолютному времени в день в будущее ничего не обнаруживает.
Информация о версии:
Graylog 2.2.2 + 691b4b7, кодовое имя Stiegl
Elasticsearch 2.4.4
Lucene 5.5.2
У меня также есть это вопрос без ответа о том, что функция поиска не работает должным образом, чтобы найти события, которые действительно прибывают нормально. Я сомневаюсь, что это имеет какое-то отношение, но для полноты я включу это сюда.
В лог-файле узлов Graylog-сервера /var/log/graylog-server/server.log я заметил много таких ошибок:
[54]: индекс [graylog_2], тип [message], id [edb8ec50-1320-11e7-92de-005056b541f6], message [MapperParsingException[не удалось разобрать [ReceiveTime]]; вложенный: IllegalArgumentException[Недействительный формат: "2017/03/27 12:09:40" - это malformed at "/03/27 12:09:40"];]
Так что проблема в том, что эти сообщения поступали в Graylog прекрасно, но не могли быть проиндексированы Elasticsearch. В итоге я уронил и мутировал проблемные поля, пока они не понравились Graylog.
if "Palo Alto" in [tags] {
grok {
match => ["message", "<\d*>(?<patimestamp>\w* \d* \d*:\d*:\d*) (?<PanoramaHost>[^ ]*) (?<FutureUse0>[^,]*),(?<ReceiveTime>[^,]*),(?<SerialNumber>[^,]*),(?<PAType>[^,]*),%{GREEDYDATA:pamessage}"]
}
if [PAType] == "SYSTEM" {
csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","vsys","paEventID","Object","FutureUse2","FutureUse3","Module","Severity","Description","SeqNum","ActionFlags"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else if [PAType] == "TRAFFIC" {
csv {source => "[pamessage]" columns => ["Threat-ContentType","ConfigVersion","GenerateTime","SrcAddress","DstAddress","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","VSys","SrcZone","DstZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","SeqNum","ActionFlags","SrcCountry","DstCountry","cpadding","pkts_sent","pkts_received"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else if [PAType] == "THREAT" {
csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcIP","DstIP","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","vsys","SrcZone","DstZone","IngressInterface","EgressInterface","LogFwdProfile","FutureUse2","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Misc","ThreatID","Category","Severity","Direction","SeqNum","ActionFlags","SrcLocation","DstLocation","FutureUse3","ContentType","pcapID","Filedigest","Cloud","FutureUse4","UserAgent","FileType","XForwardedFor","Referer","Sender","Subject","Recipient","ReportID"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else if [PAType] == "CONFIG" {
csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","Host","vsys","Command","Admin","Client","Result","ConfigPath","SeqNum","ActionFlags","BeforeChangeDetail","AfterChangeDetail"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else if [PAType] == "HIP-MATCH" {
csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcUser","vsys","MachineName","OS","SrcAddress","HIPType","FutureUse2","FutureUse3","SeqNum","ActionFlags"]}
mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
} else {
mutate {add_tag => "Uncategorized"}
}
}