Вложенные объекты из MySQL в ElasticSearch

Я новичок в ES и пытаюсь загрузить данные из MYSQL в Elasticsearch с помощью logstash jdbc.

В моей ситуации я хочу использовать значения столбцов в качестве имен полей, пожалуйста, смотрите новые и шестнадцатеричные в выходных данных, мне нужны значения id в качестве имен полей.

Mysql data

cid    id       color      new     hex      create            modified
1      101     100 euro    abcd   #86c67c  5/5/2016 15:48   5/13/2016 14:15
1      102     100 euro    1234   #fdf8ff  5/5/2016 15:48   5/13/2016 14:15

требуется вывод

{
  "_index": "colors_hexa",
  "_type": "colors",
  "_id": "1",
  "_version": 218,
  "found": true,
  "_source": {
    "cid": 1,
    "color": "100 euro",
    "new" : {
            "101": "abcd",
            "102": "1234",
        }
    "hex" : {
            "101": "#86c67c",
            "102": "#fdf8ff",
        }
    "created": "2016-05-05T10:18:51.000Z",
    "modified": "2016-05-13T08:45:30.000Z",
    "@version": "1",
    "@timestamp": "2016-05-14T01:30:00.059Z"
  }
}

Конфигурация Logstash :

input {
 jdbc {
   jdbc_driver_library => "/etc/logstash/mysql/mysql-connector-java-5.1.39-bin.jar"
   jdbc_driver_class => "com.mysql.jdbc.Driver"
   jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
   jdbc_user => "root"
   jdbc_password => "*****"
   schedule => "* * * * *"

   statement => "select cid,id,color, new ,hexa_value ,created,modified from colors_hex_test order by cid"
   jdbc_paging_enabled => "true"
   jdbc_page_size => "50000"
}
}

   output {
    elasticsearch {
        index => "colors_hexa"
        document_type => "colors"
        document_id => "%{cid}"
        hosts => "localhost:9200"
    }
}

Кто-нибудь может помочь с тегом фильтра для этих данных, здесь проблема с полями 'new' и 'hex'. Я пытаюсь преобразовать две записи в один документ.

1
задан 12 November 2016 в 02:57
1 ответ

Вы ищете фильтр агрегатов . Один из их примеров явно относится к варианту использования JDBC, который вы ищете здесь (см. Пример 4).

Поскольку ввод JDBC является запланированным действием, вы можете настроить свой агрегатный фильтр для объединения всех событий, которые поступают в короткий промежуток времени. Скажем, 10 секунд. Все строки, введенные входом JDBC, будут доставлены очень тесно сгруппированными и в конечном итоге должны быть объединены.

Этот фильтр более сложен, чем другие, поскольку вам нужно писать rubyкод для обработки конкатенации полей, которую вы ищете. Но он должен уметь это делать.

1
ответ дан 3 December 2019 в 23:38

Теги

Похожие вопросы