Elasticsearch — Ingest Pipeline

Haydar Külekci
4 min readJul 30, 2022
Photo by EJ Strat on Unsplash

In my previous article, we discussed what aliases are and how we are using them efficiently. Please read it first if you don’t read it before.

So, as you know, at the end of the first article, we mentioned a problem with indexing data with aliases. We had a problem related to data indexing about choosing an index while indexing time series documents. We had a document as below, and we are expecting to index this document into book_events.2022.01 index but our configuration indexes the document inside the book_events.2022.06 So, we will try to fix this issue in the article.

Let’s remember our latest situation. We used a is_write_index flag to coordinate the writing for the indices. But this is pointing to our latest index. For this reason, Elasticsearch saves the documents to our latest index without looking at the data inside. There are some solutions to this problem. In the first one, we can directly point index name while indexing.

POST book_events.2022.01/_doc
{
"@timestamp": "2022-01-03T00:01:01Z",
"operation": "create",
"name": "Test Book"
}

So, this will index the document exactly into that index. In this solution, we are giving responsibility to the ingestion application. The application we use for ingestion can decide the index name to index the document.

In every talking and meeting, I have mentioned that there could be multiple solutions for every problem if it is not really a unique one. Most of the time, I am also trying to share different solutions with the clients. So, we have another solution if we don’t want to use this solution. Let’s use ingest pipeline to solve the same problem.

First, I need to tell about a little bit about what it is. You can be familiar if you are aware of logstash. There were input, output, and filters on the Logstash pipeline, and you can change the data with these filters. Ingest Pipeline is pretty similar to Logstash. So, we are creating a pipeline for the data, and the pipeline helps us to change the data. For more, please check the documentation. Also, a little bit old blog, but I had a translation for Turkish here, too. Another example I used ingest pipeline here for searching on the binary pdf file, too. Anyhow, let’s create a pipeline for our current problem:

PUT _ingest/pipeline/change_index_according_to_timestamp
{
"description": "change index name according to timestamp",
"processors": [
{
"date" : {
"field" : "@timestamp",
"target_field" : "index_suffix",
"formats": ["ISO8601"],
"output_format" : "yyyy.MM"
}
},
{
"set": {
"field": "_index",
"value": "{{_index}}.{{index_suffix}}"
}
},
{
"remove": {
"field": "index_suffix"
}
}
]
}

As you can see, we created a pipeline with three processors. The first one parsing our @timestamp field as the date and formatting it into a field that is named as index_suffix . Then, on the second processor, we are using index_suffix field as a suffix of our index. Then, we are removing that field from documents because we won’t use it. In the end, this pipeline will set the correct index according to the timestamp while indexing the documents. Let’s test our pipeline before testing on data :

POST _ingest/pipeline/change_index_according_to_timestamp/_simulate
{
"docs": [
{
"_index": "book_events",
"_source": {
"@timestamp": "2022-01-03T00:01:01Z",
"operation": "create",
"name": "Test Book"
}
}
]
}

When you execute the request, you will get the following response :

{
"docs": [
{
"doc": {
"_index": "book_events.2022.01",
"_id": "_id",
"_source": {
"name": "Test Book",
"@timestamp": "2022-01-03T00:01:01Z",
"operation": "create"
},
"_ingest": {
"timestamp": "2022-07-30T11:52:08.464946433Z"
}
}
}
]
}

As you can see in the result, the index name is book_events.2022.01 even I sent as book_events in my document. So, let’s try with real example :

POST book_events/_doc?pipeline=change_index_according_to_timestamp
{
"@timestamp": "2022-01-03T00:01:01Z",
"operation": "create",
"name": "Test Book"
}
# Response
{
"_index": "book_events.2022.01",
"_id": "XFf0ToIBiHiDFxKD93ur",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 2,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}

So, our document is indexed into book_events.2022.01 as correctly. So, At this point, we don’t need is_write_index flag even. We can use ingest pipeline to redirect the documents into correct indices.

Keep following me for more …

https://twitter.com/kulekci

https://github.com/hkulekci

I recommend reading the article below. I wrote some extras about this subject according to feedback:

--

--

Haydar Külekci

Elastic Certified Engineer - Open to new opportunities & seeking sponsorship for UK/Netherland relocation 🇳🇱🇬🇧 https://www.linkedin.com/in/hkulekci/