Publishing to Elasticsearch using Django and Postgres DictCursor


One of the projects I've been working on relies heavily on elasticsearch and the current dataset is a few million records. I'm currently indexing about a million records and expect to index the entire data set in the near future. I originally took a simple approach to indexing the data which took about an hour and a half to fully index. The data orginates from an external source and I'm updating the data on weekly basis so I wanted the indexing process to run a little faster. In this post I'm going to explain the original approach I took and then I will outline what I've done to get the indexing process to run in under 30 seconds.

Original Approach

The application is built in django and I'm using two python libraries, elasticsearch.py and elasticsearch-dsl.py, to interact with Elasticsearch. This allows me to create my index files using elasticsearch-dsl's DocType class. This also allows me create my index within my python code so I can manage the creation and deletion of the index from my publishing scripts. Below is a sample DocType that can be used to create and manage my Elasticsearch index.

class IssueDocument(es.DocType):
    issue_id = es.Integer()
    publisher_name = es.String()
    series_name = es.String()
    writers = es.String()
    pencils = es.String()
    image = es.String()
    full_title = es.String()

   class Meta:
       index = 'issue'

Now that I've built my index class using the DocType parent class, I can easily grab a copy of the index instance (by the index name) and can update, publish and delete the index. This makes interacting with the index easy and I can use the same code to create and publish my index data to each environment (dev, stage and prod) without having to get a copy from another environment. This approach worked perfectly for a small sample set of data but once I started using a bigger chunk of my data (around a million records need to be published) this approach really slowed down. Below is a sample of the way I was publishing my dataset.

def publish_all(delete=True):
    documents_to_index = []
    es = elasticsearch.Elasticsearch()

    if delete:
        delete_index(es)

    series_id = Series.objects.select_related().filter(
       publisher__in=PUBLISHERS_TO_INDEX,
    ).values_list('id')
    logger.info("Found %s comic book series that need to be indexed..." % len(series_id))

    issues = Issue.objects.by_publisher(PUBLISHERS_TO_INDEX)
    logger.info("Converting %s issues to Elastic Search Index.." % len(issues))
   
    for issue in issues:
        product = get_product(issue)

    # Map each issue found in the database to the correct Elasticsearch
    # field.
    document = mappings.IssueDocument(
        issue_id=issue.id,
        publisher_name=issue.get_publisher(),
        series_name=issue.series.name,
        writers=','.join(issue.get_writers()),
        pencils=','.join(issue.get_pencils()),
        full_title=issue.full_title(),    
    )
    documents_to_index.append(document.to_dict(include_meta=True))
    elasticsearch.helpers.bulk(es, documents_to_index)

After doing some analysis and performance monitoring it was clear that the bottleneck was the conversion from the database resultset to the Elasticsearch mapping. I was basically looping over the entire resultset record by record and mapping each column to the DocType Model. There is a handful of aggregations and conditional logic that has to be run on each record. Doing this on a per record basis using model methods seemed very clean and easy to manage. Due to the size of the dataset to be indexed this approach began to slow down drasticlly. Since performance for the re-index is so important I decided to rely heavily on a handful of Postgres features to solve this problem.

Postgres Query

Although the query itself is not very complicated there is a bunch of logic that has to be done within the query. Inorder to manage the logic part of the query I used postgres CASE statements. For the most part this is pretty straight forward. The case statements were simply in the select part of the query. A good example of this is to see if I have an image stored for a record in the database. The query would look something like this...

select name, CASE WHEN medium_image IS NULL THEN '/img/no-img-available-md.jpg' ELSE i.medium_image END AS "image"
FROM comics c
INNER JOIN comics_images AS i ON i.comics_id = c.id

I also needed to build a few comma seperated lists from data in other tables. Again this is not a big lift but there were a handful of them that needed to be built. I'm using postgres' string_agg function to concatenate a list of artists and writers for each issue. The query would look something like this.

select name, (SELECT string_agg(distinct(pencils), ', ') From comics_story where issue_id = i.id AND type_id = 19 AND pencils != '' AND pencils != '?') AS "pencils",
FROM comics

Mixing all of these CASE and aggregate functions makes for a fairly complicated but speedy query. In order to send the data to Elasticsearch I need the data to be in json format. My original plan was to us the row_to_json Posgres function. This is pretty straight forward, an example query would look like this

select row_to_json(row(a_table))
from (
    select * from comics_issue LIMIT 25
) a_table

The results from row_to_json() will look like this:

"{"id":12345,"number":"123","title":"Some Title"...}

In theory this would work perfectly and I would be able to wrap my query using the row_to_json() function and I would have a fully built json document that I could easily publish to elasticsearch but....

Django/Postgres Integration

My original strategy was to drop down and use Djangos Manager.raw class method but there is one small issue here. The raw method will return a python tuple when I'm not returning the dataset as a Django model. This caused a bit of an issue because I didn't want to iterate over the resultset. This means I'll have to use the pyscopg2 driver with a custom cursor.

Psycopg2 has a few different cursors that you can use. One of the cursors that is available is the DictCursor which, as you would expect, will allow you to retrieve records using an interface that is similar to Pythons dictionary. This means I can retrieve the resultset in dictionary form and send it to the elasticsearch bulk uploader. There is no need to convert any of the data.

When using the DictCursor with django you aren't able to grab the default cursor from the connection, instead you will have to grab the default connection and then create the cursor using the DictCursor class. This is a pretty straight forward excercise but it took some time to figure out as it was not clearly documented. Below is the function I wrote that simply queries the db and publishes the resultset to elasticsearch.

from django.db import connections

def publish(delete=True)
    es = elasticsearch.Elasticsearch()
    wrapped_connection = connections['default']

    if delete:
        delete_index()
        create_index()

    if wrapped_connetion is None:
        # Ensure connection is immeditaly opened.
        cursor = wrapped_connection.cursor()

    connection = wrapped_conn.connection
    with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
        logger.info('Retrieving issues from database............')
        cursor.execute(PUBLISH_QUERY, (PUBLISHERS_TO_INDEX,))
        rows = cursor.fetchall()

        # Publish all rows found and push them to our Elasticsearch index.
        logger.info('Bulking loading comic issues.....')
        elasticsearch.helpers.bulk(es, rows)

This will allow us to run our publish query and push to elasticsearch without having to iterate over the resultset. Going from my original approach to this database approach has cut my publish time to under 30 seconds for the entire data set.

Conclusion

Although this approach isn't perfect I feel comfortable with it. When working with Django or any ORM it's important to understand the limitions and when it's the right time to jump in and use raw SQL. As with most things there is a trade-off here. Since speed is so important I'm comfortable with duplicating some of this logic and adding a little complexity to the database query. Most of the SQL CASE statements that I have written into the query are duplicated in the django models part of my application. This opens up opportinities to create bugs since we would have to make changes to both the query and django models if we wanted to change something like the default image or if I wanted to change the way I construct the titles. Given the large speed increase I'm getting from this approach, I feel good about the trade-off.