Autoscale Celery workers on ECS Fargate based on RabbitMQ metrics.

Jose López
April 3rd, 2020 · 2 min read

Asynchronous task queues are tools to allow pieces of a software program to run in a separate machine/process. Celery is a task queuing app. Celery communicates via messages, usually using a broker (e.g. RabbitMQ) to mediate between clients and workers.

If you are reading this, I’m assuming that you’re already using Celery and RabbitMQ. In this article, we are going to leverage ECS Fargate scaling capabilities to quickly create/destroy new workers based on RabbitMQ queue depth metrics.

The Terraform code used in this article is available on OBytes GitHub public repos.

Lambda function to push RabbitMQ metrics to CloudWatch

The first thing we need to do is to gather RabbitMQ metrics and push them to CloudWatch. For this, we are going to use a Lambda function that uses boto3 and requests libraries to get a list of queues and their message counts using RabbitMQ API. Then, it will publish these data to AWS CloudWatch.

1#!/usr/bin/env python3
2from __future__ import with_statement, print_function
3from base64 import b64decode
4import os
5import time
6import urllib
7
8
9import boto3
10from botocore.vendored import requests
11
12
13def get_queue_depths_and_publish_to_cloudwatch(host,
14 port,
15 username,
16 password,
17 vhost,
18 namespace):
19 """
20 Calls the RabbitMQ API to get a list of queues and populate cloudwatch
21
22 :param host:
23 :param port:
24 :param username:
25 :param password:
26 :param vhost:
27 :param namespace:
28 :return:
29 """
30 depths = get_queue_depths(host, port, username, password, vhost)
31 publish_depths_to_cloudwatch(depths, namespace)
32
33
34def get_queue_depths(host, port, username, password, vhost):
35 """
36 Get a list of queues and their message counts
37
38 :param host:
39 :param port:
40 :param username:
41 :param password:
42 :param vhost:
43 :return:
44 """
45 # Get list of queues
46 try:
47 r = requests.get('https://{}:{}/api/queues'.format(host, port),
48 auth=requests.auth.HTTPBasicAuth(username, password))
49 except requests.exceptions.RequestException as e:
50 log('rabbitmq_connection_failures')
51 print("ERROR: Could not connect to {}:{} with user {}".format(
52 host, port, username))
53 return []
54
55 queues = r.json()
56 total = 0
57 depths = {}
58 for q in queues:
59
60 # Ignore celery and pyrabbit queues
61 if q['name'] == "aliveness-test":
62 continue
63 elif q['name'].endswith('.pidbox') or q['name'].startswith('celeryev.'):
64 continue
65
66 # Get individual queue counts
67 try:
68 r = requests.get('https://{}:{}/api/queues/{}/{}'.format(
69 host,
70 port,
71 urllib.parse.quote_plus(vhost),
72 urllib.parse.quote_plus(q['name'])),
73 auth=requests.auth.HTTPBasicAuth(username, password))
74 except requests.exceptions.RequestException as e:
75 log('queue_depth_failure', tags=['queue:{}'.format(q['name'])])
76 break
77
78 qr = r.json()
79 if r.status_code == 200 and 'messages' in qr:
80 queue_depth = qr['messages']
81 depths[q['name']] = queue_depth
82 total = total + int(queue_depth)
83 else:
84 log('queue_depth_failure', tags=['queue:{}'.format(q['name'])])
85
86 depths['total'] = str(total)
87 return depths
88
89
90def publish_depths_to_cloudwatch(depths, namespace):
91 """
92
93 :param depths:
94 :param namespace:
95 :return:
96 """
97 cloudwatch = boto3.client(
98 'cloudwatch', region_name=os.environ.get("AWS_REGION"))
99 for q in depths:
100 try:
101 cloudwatch.put_metric_data(
102 Namespace=namespace,
103 MetricData=[{
104 'MetricName': q,
105 'Timestamp': time.time(),
106 'Value': int(depths[q]),
107 'Unit': 'Count',
108 }])
109 log(namespace, 'gauge', depths[q], [
110 'queue:' + q
111 ])
112 except Exception as e:
113 print(str(e))
114 log('cloudwatch_put_metric_error')
115
116
117def lambda_handler(event, context):
118
119 queue_group = context.function_name.split('-', 1)[0]
120
121 host = os.environ.get("RABBITMQ_HOST")
122 port = os.environ.get("RABBITMQ_PORT")
123 user = os.environ.get("RABBITMQ_USER")
124 pw = os.environ.get("RABBITMQ_PASS")
125 get_queue_depths_and_publish_to_cloudwatch(
126 host=host,
127 port=port,
128 username=user,
129 password=boto3.client('kms').decrypt(CiphertextBlob=b64decode(pw))[
130 'Plaintext'].decode('utf8').replace('\n', ''),
131 vhost="/",
132 namespace=queue_group + ".rabbitmq.depth")
133
134
135def log(metric_name, metric_type='count', metric_value=1, tags=[]):
136 """
137 :param metric_name:
138 :param metric_type:
139 :param metric_value:
140 :param tags:
141 :return:
142 """
143 # MONITORING|unix_epoch_timestamp|metric_value|metric_type|my.metric.name|#tag1:value,tag2
144 print("MONITORING|{}|{}|{}|{}|#{}".format(
145 int(time.time()),
146 metric_value,
147 metric_type,
148 'rabbitmq_cloudwatch.' + metric_name, ','.join(tags)))
149
150
151if __name__ == "__main__":
152 lambda_handler(event=None, context=None)

The function needs IAM permissions to create CloudWatch metrics, log groups and access the KMS key used to encrypt the RABBITMQ_PASS variable. We also need to allow access to RabbitMQ API (port 15672 by default) from this Lambda function.

Autoscale Celery workers based on RabbitMQ queue depth metrics

Now that we have our RabbitMQ queue depth metrics on CloudWatch, we need to configure autoscaling on ECS Fargate to scale the number of workers based on these data.

The values for autoscaling will depend on your setup. The best way to find accurate values is test and error. If you have peaks on your workload (e.g. Celery tasks that syncs products with the DB) that triggers a lot of tasks in a short span of time, you’ll want to scale to a big number of workers to be able to process those quickly. However, if you have a steady workload you’ll want to scale more slowly.

First of all, we need to create a CloudWatch metric alarm. This alarm will trigger the autoscaling policies based on RabbitMQ values. For test environments that are barely used, it might be interesting to have 0 workers by default and create them dynamically as there are messages on the queue. This will add some delay to the processing of new tasks (Fargate takes a few minutes to deploy new tasks), but it might be worthwhile.

Then, we need to configure the autoscaling target and policies. As I mentioned previously, these values might be adjusted based on your workflow, but you can configure as many steps as you want to scale based on your needs.

After everything is set up, if you check out ECS autoscaling tab, you should see something similar to the image shown below.

And your Celery workers will autoscale based on queue metrics!

And that’s all folks! If you have any doubt, feel free to reach me out using the comments or via Twitter (@kstromeiraos).

More articles from Obytes

Forms in React Native, The right way

This article is a step-by-step tutorial to create a generic form component that can be used whenever you need to deal with forms.

March 18th, 2020 · 4 min read

Avoid Cloudflare bypassing by using secret headers.

How-to avoid bypassing Cloudflare protection by using Cloudflare Workers to add a secret header.

March 10th, 2020 · 2 min read

ABOUT US

Our mission and ambition is to challenge the status quo, by doing things differently we nurture our love for craft and technology allowing us to create the unexpected.