Safely Cancel Your Celery Tasks


I've been working with Celery for a long time and these days it's a pretty stable part of my stack.  There's a lot of good articles out there on how and why you'd want to use this framework, this is not one of them.  If you're reading this I'm assuming you're already familiar with Celery.

One of the issues I've had over the years is finding a clean a way to cancel tasks that are running.  Celery has a few recommended ways to kill your tasks which I've found don't work well for me or my use-case.  The first and the approach you'll find most recommended is revoking tasks. 

Revoking Tasks

Revoking tasks, according to the Celery documentation, will skip the executing task but it will not (by default) terminate an already running task.  If you want the revoke command to kill an already running task, you'll need to add the terminate keyword to the revoke command.  The Celery documentation doesn't recommend this approach, they consider it a last resort, as it's for terminating the PROCESS not the task.  This means it can terminate processes that have already started running another task.  You should read and fully understand how this works before using it.

Using the revoke command has never been something I've been comfortable with, the thought that I can terminate a process running another task before it's complete is scary.  The documentation also mentions that this should be used for skipping the execution of a task and not terminating an already existing task.  The fact that the terminate command will terminate the entire process is something I don't think should be done when you just want to cancel a single running task.  So for me, this is not the right solution and should only be used during emergencies.

Aborting Your Tasks

Since revoke is a last resort and should not be used for canceling already running tasks, it's time to find another solution.  The solution I've come up with and have been using is similar to Celery's abortable feature.  This in my opinion is much more stable and probably more what people are looking for when they want to stop a running task.  It's a bit harder to find in the documentation and there are a few instability warnings about it (like they don't guarantee the task it will actually be aborted).  This is the warning from the Celery documentation:

After the result.abort() call, the task execution isn’t aborted immediately. In fact, it’s not guaranteed to abort at all. Keep checking result.state status, or call result.get(timeout=) to have it block until the task is finished.

 This warning is enough to scare me off as I want my task to be guaranteed to stop.  There are times when a task could be causing some kind of serious issue and it needs to be stopped immediately.  Therefore this approach does not make me comfortable.

The other requirement for using the Celery abortable feature is that you have to be using a database backend.  Celery uses the database back-end to communicate between the producer and consumer, so if you are not using a database backend this will not work for you.

If, however, you don't feel these constraints are a problem I think this is a good solution.   For one it's built into the framework and it's pretty straightforward to implement.  I'm not going to walk through how to use it but you can read the full documentation here.

Celery Task Signal

The approach I've taken is similar to what the Celery abortable solution does.  I've used this solution in multiple projects that process 100s of millions of Celery tasks over the years.  My solution also relies on communication through the database.  There has to be a way for you to communicate with the task and I've found that setting it up via the database works best.  It's fairly easy to setup and maintain plus it's very fast for what I need.  I haven't run into a situation where this infrastructure has caused performance issues and it's been extremely stable.

The core of the solution relies on a database table I call CeleryTaskSignal.  This table does the communication between the producer and the consumer (which is the task).  The table is very straight forward, I have a signal column (which is a list of supported signals.  This can be things like cancel or pause, etc), and a completed column.  These columns are always on the table.  There are times where I may have other columns associated to the table, maybe there is a ForeignKey object that is tied directly to the task or maybe you want to send a messages (using a TextField) to the task as well.  These all fit in nicely, but the main part of the table is the signal and completed fields.

class CeleryTaskSignal(TimeStampedModel):   
    CANCEL_TASK = 'cancel_task'
    PAUSE_TASK = 'pause_task'
    SIGNAL_CHOICES = (
        (CANCEL_TASK, 'Cancel Task'),
        (PAUSE_TASK, 'Pause Task'),
    )

    signal = models.CharField(max_length=25, choices=SIGNAL_CHOICES, null=True)
    completed = models.BooleanField(default=False)
    objects = CeleryTaskSignalManager()

    def mark_completed(self, save=True):
        self.completed = True

        if save:
            self.save()

    class Meta:
        ordering = ('modified',)

This is similar to Celery's abortable approach, but it gives you control over how you cancel your tasks.  It'll allow you to guarantee the task will be canceled and you don't have to rely on a database backend to ensure the communication happens.  The approach is also very straight forward and you can add a Django admin page for the table which will allow you to send the signals from the admin if you need it.

This also allows us to build infrastructure for things that aren't just canceling tasks.  It puts us in a good position to add additional signals as our applications become more complex in the future.  The consumer simply has to write a query that looks for a specific signal that has yet to complete.

Sending A Signal To The Task

Once you have the CeleryTaskSignal table setup "listening" to a signal is straight forward.  All you need to do is write a simple query that will look for a new unprocessed signal in the table.   For example, if we want to look for a cancel signal we'll run a Django query looking for any entries in the CeleryTaskSignal table that have a signal of cancel_task and completed is False.  The query would look something like this.

CeleryTaskSignal.objects.fiter(signal=CeleryTaskSignal.CANCEL_TASK, completed=False)

If you get an entry back you'll want to cancel your task, clean up anything you need on the task and then update the signal you just consumed so you can mark completed = True.  This gives you full control on how you want to cancel your Celery tasks.

In a nutshell that's the design I've been using to gracefully communicate with tasks in the different projects I've built using Celery.  The design is extremely straightforward and easy to extend.  You can add messages, or pretty much anything you want to communicate to your Celery tasks.  This will also ensure the process running your task will not be abruptly killed and you get the control to manage your tasks so it fits perfectly into your infrastructure.