Monitoring celery – your distributed task queue

Hey. I recently noticed I needed some kind of celery monitoring. Here’s a summary of what I learned while searching for a solution

TL;DR: Sign up at healthchecks.io, add checks and integrations then add a single celery task that runs every minute. Configure the Period and have it drop an email or a Slack message when the check is not invoked. Also, this post is not sponsored in any way.

Here’s a situation. I’m running a web app that offloads work to celery tasks. This web app uses the task engine to run code that potentially uses a lot of time, like language detection, label scanning, calculating similarity, indexing into a search engine, adding synonyms, adding emoji names, etc after receiving content via an API call. Then, after some time, you start to notice that the celery tasks are no longer running while the processes are running. Simple process monitoring would not have caught this as the processes were still running, but tasks were not being picked up.

Monitoring your website is easier: just invoke an endpoint and test the results. Tools like visualping.io are really good at that. Celery tasks are different as they’re running in the background and typically not directly available via the internet.

Here’s what I do, in a simple list:

    • Sign-up at healthchecks.io and add a check via Add Check. Make sure you configure the expected period and grace period and add the integrations you need.
    • Copy the newly created URL and store it somewhere in the projects settings.py
    • Add a task that invokes the endpoint for your service. An example implementation is below
    • Use a celerybeat schedule that runs the task at an interval.
    • Wait.

Now here’s some example code which you should modify to your needs. It’s in use for some time now, but I wouldn’t say it’s battle tested.

from celery.utils.log import get_task_logger
from django.apps import apps
from requests import post, get, ConnectTimeout, status_codes
 
from brownpapersession.celery import app as celery_app, CeleryMailingTask
 
 
log = get_task_logger(__name__)
 
 
@celery_app.task(name='integration.send_healthcheck', ignore_result=True, base=CeleryMailingTask)
def send_healthcheck(**kwargs):
  config = apps.get_app_config('integration')
  if not config.HEALTHCHECKS_URI:
    log.warning('service=healthchecks, reason=configuration')
    return
  try:
    response = get(
      config.HEALTHCHECKS_URI,
      timeout=3.3,
      headers={
        'User-Agent': 'brwnppr.com/0.6',
      },
    )
  except ConnectTimeout as e:
    log.warning('service=healthchecks, uri={uri}, detail={detail}, reason=timeout'.format(
      uri=config.HEALTHCHECKS_URI,
      detail=e,
    ))
    raise AssertionError(e)
  if status_codes.codes.OK == response.status_code:
    log.debug('service=healthchecks, uri={uri}, http.status={status}'.format(
      uri=config.HEALTHCHECKS_URI,
      status=response.status_code,
    ))
  else:
    log.warning('service=healthchecks, uri={uri}, http.status={status}'.format(
      uri=config.HEALTHCHECKS_URI,
      status=response.status_code,
    ))

Now what’s the beauty in this? Well, this actually notifies you when something is wrong and it will not bother you when everything is running properly. Integrating is lightweight and easy. On top of that the healthchecks website shows you when the last trigger was received. Take these integrations one step further and add a webhook which resolves the issue or displays a message on your website.

Wondering what CeleryMailingTask is? It will be the subject of a next post.

Really happy to take suggestions or improvements via the comments.

Leave a Reply

Your email address will not be published. Required fields are marked *