Elasticsearch — Improving Aggregation Performance with Routing

Haydar Külekci
5 min readAug 12, 2023

--

There are different ways to improve aggregation performance. In this article, I will briefly mention how routing affects aggregation performance. So, before starting how we can improve it, let me introduce our dataset.

Photo by Abhinav on Unsplash (orange plane flying in the sky and of of the pilots is standing on the wings)

As you know, when you install Elasticsearch and Kibana. Kibana provides some sample data sets for you. One of them is “Sample flight data”. In my example, I used that dataset. But I reindexed the dataset multiple times to increase the number of documents.

Kibana Sample Data Set List Page Screenshot

After adding the dataset to the cluster, you can see the sample data by searching:

GET kibana_sample_data_flights/_search

Before reindexing data to our sample indices, let’s create our indices. So, let’s fetch the mapping. And paste it to create our indices:

PUT test1
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1

},
"mappings": {
"properties": {
"AvgTicketPrice": {
"type": "float"
},
....
"timestamp": {
"type": "date"
}
}
}
}

We will create 2 indices as test1 and test2 . I used the exact mapping of the flight data index. There is no specific mapping definition other than this.

Then later, let’s index the data with the following script:

import os
from random import randint
from dateutil.parser import parse
from elasticsearch.helpers import scan, bulk
from elasticsearch import Elasticsearch
from dotenv import load_dotenv
load_dotenv()
es = Elasticsearch(
cloud_id=os.getenv('ELASTIC_CLOUD_ID'),
basic_auth=(os.getenv('ELASTIC_USERNAME'), os.getenv('ELASTIC_PASSWORD'))
)

def search(index_name, with_routing=False):
results = scan(es, {"query": {"match_all": {}}}, index="kibana_sample_data_flights")
for item in results:
doc = item['_source']
doc['TimeInLong'] = parse(doc['timestamp']).timestamp() + randint(1000, 200000)
obj = {
"_index": index_name,
"_source": doc
}
if with_routing:
obj["routing"] = doc['OriginCountry']
yield obj

if __name__ == '__main__':
bulk(es, search('test1'))
bulk(es, search('test2', True))

The script is pretty basic; it just reads from kibana_sample_data_flights index and inserts them into test1 and test2 indices.

So, as you can see, there is two bulk operation in the script. One of them is with _routing the other one is without _routing. Before seeing the results, let’s try to understand what routing is here and why I used OriginCountry as a router.

A note from Elasticsearch documentation about _routing :

As you know, in Elasticsearch, each document is stored in shards. And, you can have multiple shards per index, and with routing we can keep similar documents into the same shard. Elasticsearch uses the following formula while indexing rerouted documents:

outing_factor = num_routing_shards / num_primary_shards
shard_num = (hash(_routing) % num_routing_shards) / routing_factor

num_routing_shards is the value of the index.number_of_routing_shards index setting. num_primary_shards is the value of the index.number_of_shards index setting.

For more, please check this page.

In order to group similar documents together, we can use routing while indexing in Elasticsearch. Based on the routing condition, this will reroute the documents to a similar shard.

So, I used OriginCountry field value as routing value because it is the base aggregation of my query. So, I want to keep the documents, are have the same value for OriginCountry field value, within the same shard.

In this data set, the OriginCountry field is not homogeneous. So, some shards will be more data from others and for this reason, we can cause performance problems again with larger datasets. While using this routing please consider this. And check your dataset before using. There is an index partition solution to prevent this problem, too.

Sample flight data have 13059 records. After running the script 5 times, I had more than 60K documents. And our shards are like below:

# GET _cat/shards/test1?s=index&v&h=index,shard,prirep,docs,store
index shard prirep docs store
test1 0 p 13086 6.6mb
test1 0 r 10603 5.9mb
test1 1 r 12925 6.5mb
test1 1 p 10543 5.8mb
test1 2 p 12845 6.5mb
test1 2 r 10483 5.7mb
test1 3 r 10580 5.8mb
test1 3 p 12969 6.6mb
test1 4 p 10527 5.8mb
test1 4 r 12911 6.9mb

# GET _cat/shards/test1?s=index&v&h=index,shard,prirep,docs,store
test2 0 p 7710 3.9mb
test2 0 r 7710 3.9mb
test2 1 r 13924 7.3mb
test2 1 p 13924 7.5mb
test2 2 p 516 423.5kb
test2 2 r 516 377kb
test2 3 r 18108 9.3mb
test2 3 p 18108 9.4mb
test2 4 p 12478 6.5mb
test2 4 r 12478 6.6mb

As I mentioned above, because of routing, some shards have smaller sizes some of them larger for our test2 index. Even with this environment, let’s do a quick test with the following query:

QUERY = {
"origin": {
"terms": {
"field": "OriginCountry",
"size": 100,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": False,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggs": {
"Destination": {
"terms": {
"field": "DestCountry",
"size": 100,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": False,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggs": {
"max": {
"max": {
"field": "TimeInLong"
}
}
}
}
}
}
}

In the query, I want to mention that the first aggregation is my routing field. So, this is the point why the routing improves our aggregation performance and why I used this value as a routing condition.

We have sub-aggregations with terms aggregation and a max aggregation. Let's execute this query multiple times in both indices and calculate the average execution time with the following script. I used timeit library to measure execution time:

def search1():
result = es.search(
index='test1',
aggs=QUERY
)
return result['took']


def search2():
result = es.search(
index='test2',
aggs=QUERY,
preference='_local'
)
return result['took']


if __name__ == "__main__":
import sys
import timeit
number = int(sys.argv[1])
result = timeit.timeit(search1, number=number)
print(f"Search 1 avg time : {result / number}")
result = timeit.timeit(search2, number=number)
print(f"Search 2 avg time : {result / number}")

I execute the script on my laptop with 60K documents. We can see an improvement as below, even if the documents do not homogeneously spread on shards.

(venv) ➜  python search_benchmarks/bench_query_simple.py 100
Search 1 avg time : 0.19298003719999998
Search 2 avg time : 0.18048028824999995

(venv) ➜ python search_benchmarks/bench_query_simple.py 500
Search 1 avg time : 0.19295567646400003
Search 2 avg time : 0.185763924162

I could not complete a thorough comparison with various datasets. 😔 However, I observed a performance increase of %10–15 when testing with my production data and query which is more complex. 💪 In this example, a 6% performance increase is possible, and greater performance can be achieved if you have heavier sub-aggregations. 🚀

Thanks for reading.

Please follow me here, on Linkedin, or on Twitter to reach more content.

--

--

Haydar Külekci

Elastic Certified Engineer - Open to new opportunities & seeking sponsorship for UK/Netherland relocation 🇳🇱🇬🇧 https://www.linkedin.com/in/hkulekci/