Поток Graylog получает события, но он пуст

Я начал отправлять журналы Palo Alto в Graylog, и правило потока выбирает их, сопоставляя " tcpdump for events

И поток показывает, что он получает события (обратите внимание на «22 сообщения в секунду»):

Notice it is currently getting 22 messages per second

Тем не менее, когда я нажимаю на поток (или выполняю поиск -> теги: «Пало-Альто»), событий нет

Empty stream

Единственная распространенная проблема, которую я видел в Интернете, - это настройки часового пояса, которые переносят эти события в будущее, но время в нашем отправителе Palo Alto Panorama правильное (PST) и пытается выполнить поиск по абсолютному времени в день в будущее ничего не обнаруживает.

Информация о версии:

Graylog 2.2.2 + 691b4b7, кодовое имя Stiegl

Elasticsearch 2.4.4

Lucene 5.5.2

У меня также есть это вопрос без ответа о том, что функция поиска не работает должным образом, чтобы найти события, которые действительно прибывают нормально. Я сомневаюсь, что это имеет какое-то отношение, но для полноты я включу это сюда.

0
задан 13 April 2017 в 15:14
1 ответ

В лог-файле узлов Graylog-сервера /var/log/graylog-server/server.log я заметил много таких ошибок:

[54]: индекс [graylog_2], тип [message], id [edb8ec50-1320-11e7-92de-005056b541f6], message [MapperParsingException[не удалось разобрать [ReceiveTime]]; вложенный: IllegalArgumentException[Недействительный формат: "2017/03/27 12:09:40" - это malformed at "/03/27 12:09:40"];]

Так что проблема в том, что эти сообщения поступали в Graylog прекрасно, но не могли быть проиндексированы Elasticsearch. В итоге я уронил и мутировал проблемные поля, пока они не понравились Graylog.

if "Palo Alto" in [tags] {
    grok {
        match => ["message", "<\d*>(?<patimestamp>\w* \d* \d*:\d*:\d*) (?<PanoramaHost>[^ ]*) (?<FutureUse0>[^,]*),(?<ReceiveTime>[^,]*),(?<SerialNumber>[^,]*),(?<PAType>[^,]*),%{GREEDYDATA:pamessage}"]
    }
    if [PAType] == "SYSTEM" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","vsys","paEventID","Object","FutureUse2","FutureUse3","Module","Severity","Description","SeqNum","ActionFlags"]}
        mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "TRAFFIC" {
        csv {source => "[pamessage]" columns => ["Threat-ContentType","ConfigVersion","GenerateTime","SrcAddress","DstAddress","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","VSys","SrcZone","DstZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","SeqNum","ActionFlags","SrcCountry","DstCountry","cpadding","pkts_sent","pkts_received"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "THREAT" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcIP","DstIP","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","vsys","SrcZone","DstZone","IngressInterface","EgressInterface","LogFwdProfile","FutureUse2","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Misc","ThreatID","Category","Severity","Direction","SeqNum","ActionFlags","SrcLocation","DstLocation","FutureUse3","ContentType","pcapID","Filedigest","Cloud","FutureUse4","UserAgent","FileType","XForwardedFor","Referer","Sender","Subject","Recipient","ReportID"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "CONFIG" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","Host","vsys","Command","Admin","Client","Result","ConfigPath","SeqNum","ActionFlags","BeforeChangeDetail","AfterChangeDetail"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "HIP-MATCH" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcUser","vsys","MachineName","OS","SrcAddress","HIPType","FutureUse2","FutureUse3","SeqNum","ActionFlags"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else {
        mutate {add_tag => "Uncategorized"}
    }
}
0
ответ дан 24 November 2019 в 04:45

Теги

Похожие вопросы