You can find multiple blogs describing how to load CSV data to ElasticSearch using Logstash. In this blog, I describe how to load CSV data using Filebeat.
Check https://www.elastic.co/guide/en/beats/filebeat/current/diff-logstash-beats.html to understand the difference between Logstash and Filebeat. Filebeat is lightweight and has a smaller footprint compared to Logstash. Logstash on the other hand is slower, but provides more features like aggregating data from multiple sources, advanced transformations, etc
So, it makes sense to use Filebeat to load the CSV data if you are not looking for any of the advanced features provided by Logstash.
We will look at two different ways we can load CSV data to ElasticSearch.
Source Data
For this tutorial, we will use the dataset from https://www.kaggle.com/marian447/retail-store-sales-transactions .
$ head scanner-data.csv
"","Date","Customer_ID","Transaction_ID","SKU_Category","SKU","Quantity","Sales_Amount"
"1",2017-01-02,2547,1,"X52","0EM7L",1,3.13
"2",2017-01-02,822,2,"2ML","68BRQ",1,5.46
"3",2017-01-02,3686,3,"0H2","CZUZX",1,6.35
"4",2017-01-02,3719,4,"0H2","549KK",1,5.59
"5",2017-01-02,9200,5,"0H2","K8EHH",1,6.88
Create New Index
You can use Kibana dashboard to create new index in ElasticSearch.
- Open Kibana dashboard http://localhost:5601
- Go to DevTools under the Management category
- Use the Console window to execute the REST API calls
We do not include the first field that contains the serial number. So, there are 7 columns. We first define the mapping for the sales index as follows -
PUT /sales
{
"mappings":{
"properties":{
"date":{
"type":"date",
"format": ["yyyy-MM-dd"]
},
"customer_id":{
"type":"keyword"
},
"transaction_id":{
"type":"keyword"
},
"sku_category":{
"type":"keyword"
},
"sku":{
"type":"keyword"
},
"quantity":{
"type":"integer"
},
"sales_amount":{
"type":"float"
}
}
}
}
Load CSV data using Filebeat
Now lets look at the two ways we can load CSV data to this index in ElasticSearch. Please use only one of the below recommended ways.
Using Ingest Pipeline
In this method, we load the lines from CSV file using filebeat as is without any processing and then we decode the data using ingest pipeline on ElasticSearch.
We first define the pipeline parse_sales_data using REST API in DevTools. In the pipeline, we decode the fields using csv processor, and then we remove the serial number field.
PUT _ingest/pipeline/parse_sales_data
{
"processors": [
{
"csv": {
"description": "Parse sales data from scanner",
"field": "message",
"target_fields": ["sr","date","customer_id","transaction_id","sku_category","sku","quantity","sales_amount"],
"separator": ",",
"ignore_missing":true,
"trim":true
},
"remove": {
"field": ["sr"]
}
}
]
}
Our filebeat configuration is simple. It defines the path, index, and the pipeline that will process the incoming data. Add the below lines to filebeat.yml -
- type: log
enabled: true
paths:
- /data/logs/scanner-data.csv # path to your CSV file
exclude_lines: [^\”\”] # header line
index: sales
pipeline: parse_sales_data
Using decode_csv_fields processor in filebeat
In this method, we decode the csv fields during the filebeat processing and then upload the processed data to ElasticSearch. We use a combination of decode_csv_fields and extract_array processor for this task. Finally, we drop the unnecessary fields using drop_fields processor.
Add the below lines to your filebeat.yml -
- type: log
enabled: true
paths:
- /data/logs/scanner-data-2.csv # path to the CSV file
exclude_lines: ['^\"\"'] # header line to exclude
index: sales
processors:
- decode_csv_fields:
fields:
message: decoded.csv
separator: ","
- extract_array:
field: decoded.csv
mappings:
sr: 0
date: 1
customer_id: 2
transaction_id: 3
sku_category: 4
sku: 5
quantity: 6
sales_amount: 7
- drop_fields:
fields: ["decoded","sr"]
You can use any one of the above two methods to load your csv data to your ElasticSearch node.