Load CSV data to ElasticSearch using FileBeat

Filebeat is lightweight and has a smaller footprint compared to Logstash. Use Filebeat to load the CSV data if you are not looking for any of the advanced features provided by Logstash.
TECH ·

You can find multiple blogs describing how to load CSV data to ElasticSearch using Logstash. In this blog, I describe how to load CSV data using Filebeat.

Check https://www.elastic.co/guide/en/beats/filebeat/current/diff-logstash-beats.html to understand the difference between Logstash and Filebeat. Filebeat is lightweight and has a smaller footprint compared to Logstash. Logstash on the other hand is slower, but provides more features like aggregating data from multiple sources, advanced transformations, etc

So, it makes sense to use Filebeat to load the CSV data if you are not looking for any of the advanced features provided by Logstash.

We will look at two different ways we can load CSV data to ElasticSearch.

Source Data

For this tutorial, we will use the dataset from https://www.kaggle.com/marian447/retail-store-sales-transactions .

$ head scanner-data.csv
"","Date","Customer_ID","Transaction_ID","SKU_Category","SKU","Quantity","Sales_Amount"
"1",2017-01-02,2547,1,"X52","0EM7L",1,3.13
"2",2017-01-02,822,2,"2ML","68BRQ",1,5.46
"3",2017-01-02,3686,3,"0H2","CZUZX",1,6.35
"4",2017-01-02,3719,4,"0H2","549KK",1,5.59
"5",2017-01-02,9200,5,"0H2","K8EHH",1,6.88

Create New Index

You can use Kibana dashboard to create new index in ElasticSearch.

  1. Open Kibana dashboard http://localhost:5601
  2. Go to DevTools under the Management category
  3. Use the Console window to execute the REST API calls

We do not include the first field that contains the serial number. So, there are 7 columns. We first define the mapping for the sales index as follows -

PUT /sales
{
	"mappings":{
	"properties":{
		"date":{
			"type":"date",
			"format": ["yyyy-MM-dd"]
		},
		"customer_id":{
			"type":"keyword"
		},
		"transaction_id":{
			"type":"keyword"
		},
		"sku_category":{
			"type":"keyword"
		},
		"sku":{
			"type":"keyword"
		},
		"quantity":{
			"type":"integer"
		},
		"sales_amount":{
			"type":"float"
		}
	}
	}
}

Load CSV data using Filebeat

Now lets look at the two ways we can load CSV data to this index in ElasticSearch. Please use only one of the below recommended ways.

Using Ingest Pipeline

In this method, we load the lines from CSV file using filebeat as is without any processing and then we decode the data using ingest pipeline on ElasticSearch.

We first define the pipeline parse_sales_data using REST API in DevTools. In the pipeline, we decode the fields using csv processor, and then we remove the serial number field.

PUT _ingest/pipeline/parse_sales_data
{
	"processors": [
	{
		"csv": {
		"description": "Parse sales data from scanner",
		"field": "message",
		"target_fields": ["sr","date","customer_id","transaction_id","sku_category","sku","quantity","sales_amount"],
		"separator": ",",
		"ignore_missing":true,
		"trim":true
		},
		"remove": {
		"field": ["sr"]
		}
	}
	]
}

Our filebeat configuration is simple. It defines the path, index, and the pipeline that will process the incoming data. Add the below lines to filebeat.yml -

- type: log
	enabled: true
	paths:
	- /data/logs/scanner-data.csv # path to your CSV file
	exclude_lines: [^\”\”]          # header line
	index: sales
	pipeline: parse_sales_data

Using decode_csv_fields processor in filebeat

In this method, we decode the csv fields during the filebeat processing and then upload the processed data to ElasticSearch. We use a combination of decode_csv_fields and extract_array processor for this task. Finally, we drop the unnecessary fields using drop_fields processor.

Add the below lines to your filebeat.yml -

- type: log
	enabled: true
	paths:
	- /data/logs/scanner-data-2.csv # path to the CSV file
	exclude_lines: ['^\"\"']          # header line to exclude
	index: sales
	processors:
	- decode_csv_fields:
		fields: 
			message: decoded.csv 
		separator: ","
	- extract_array:
		field: decoded.csv
		mappings:
			sr: 0
			date: 1
			customer_id: 2
			transaction_id: 3
			sku_category: 4
			sku: 5
			quantity: 6
			sales_amount: 7
	- drop_fields:
		fields: ["decoded","sr"]

You can use any one of the above two methods to load your csv data to your ElasticSearch node.