GO Serverless! Part 4 - Realtime Interactive and Secure Applications with AWS Websocket API Gateway

Hamza Adami
November 17th, 2021 · 19 min read

If you didn’t read the previous part, we highly advise you to do that!


Previously, we’ve built a simple Flask/Fast APIs, we’ve deployed them as a Lambda Functions and exposed them through AWS API Gateway V2 as HTTP APIs.

Throughout this part, we will build a Generic, Reusable and Pluggable Websocket Stack (SUMU) that can be hooked to any application to make it interactive and provide realtime capability. the stack should be:

  • Interoperable - integrate with any application.
  • Secure - support Websocket Secured (WSS) and clients’ JWT authorization during websocket upgrade request.
  • Fast - provide fast messages deliveries.

Additionally, we will build a simple demo application implementing the basic features that a modern websocket-compatible application needs.

TLDR

SUMU

SUMU(live in Japanese) is a Generic, Reusable and Pluggable Websocket Stack that consists of the following components:

  • Connections Store: a DynamoDB table where all users active connections are cached, SUMU automatically adds new connections to the table, delete closed connections from it and prune stale connections using DDB TTL attribute. additionally, It streams INSERT and DELETE events, so other apps can track users’ presence.

  • Integration Inputs/Outputs: SUMU provides an Input SNS Topic and an Input SQS Queue to receive notification requests from external applications seeking to notify the connected users. It also provides an Output SNS Topic and an Output SQS Queue for external applications to receive messages from connected users.

  • Websocket Request JWT Authorizer: a request JWT Authorizer hooked with the connection route and capable of integrating with any JWT IaaS provider (Firebase, Cognito, Auth0…) in order verify the JWT token signature, expiration time and allowed audiences and return authorization policies to APIGW.

  • Websockets API Gateway: SUMU provides a Websocket API Gateway with a connection and disconnection routes integrated with DynamoDB for connections tracking, a Keepalive (ping/pong) route to avoid IDLE connections termination and two messaging (publish/send) routes integrated with SNS/SQS to fanout users’ messages to backend applications.

  • Websockets Notifications Async Pusher: a serverless and fast AWS API Gateway websockets notifications’ pusher built using Python AsyncIO to support asynchronous and concurrent non-blocking IO calls to DynamoDB connections store and API Gateway management API. making it suitable for receiving notification requests from backend applications and broadcasting those messages to multiple users with a fast and cost-effective approach.

  • Presence Watchdog: Connections Tracker for tracking all users’ connections and notifying backend applications about users’ presence, It can fanout an ONLINE presence event whenever a user connects, and an OFFLINE presence event whenever a user terminate all his connections from all devices.

Usage

SUMU is built using pure AWS serverless technologies and it can be provisioned with just 2 Terraform modules, the actual SUMU module and a helper module to expose the Websocket API with a custom domain.

1# The actual SUMU stack
2module "sumu" {
3 source = "git::https://github.com/obytes/terraform-aws-sumu//modules/serverless"
4 prefix = local.prefix
5 common_tags = local.common_tags
6
7 # Authorizer
8 issuer_jwks_uri = "https://www.googleapis.com/service_accounts/v1/metadata/x509/[email protected]"
9 authorized_audiences = ["sumu-websocket", ]
10 verify_token_expiration = true
11
12 s3_artifacts = {
13 arn = aws_s3_bucket.artifacts.arn
14 bucket = aws_s3_bucket.artifacts.bucket
15 }
16 github = {
17 owner = "obytes"
18 webhook_secret = "not-secret"
19 connection_arn = "arn:aws:codestar-connections:us-east-1:{ACCOUNT_ID}:connection/{CONNECTION_ID}"
20 }
21 github_repository = {
22 authorizer = {
23 name = "apigw-jwt-authorizer"
24 branch = "main"
25 }
26 pusher = {
27 name = "apigw-websocket-pusher"
28 branch = "main"
29 }
30 }
31 ci_notifications_slack_channels = {
32 info = "ci-info"
33 alert = "ci-alert"
34 }
35
36 stage_name = "mvp"
37 apigw_endpoint = "https://live.kodhive.com/push"
38 presence_source = "topic"
39}
40
41# The Websocket API Exposer
42module "gato" {
43 source = "git::https://github.com/obytes/terraform-aws-gato//modules/core-route53"
44 prefix = local.prefix
45 common_tags = local.common_tags
46
47 # DNS
48 r53_zone_id = aws_route53_zone.prerequisite.zone_id
49 cert_arn = aws_acm_certificate.prerequisite.arn
50 domain_name = "kodhive.com"
51 sub_domains = {
52 stateless = "api"
53 statefull = "live"
54 }
55
56 # Rest APIS
57 http_apis = []
58
59 ws_apis = [
60 {
61 id = module.sumu.ws_api_id
62 key = "live"
63 stage = module.sumu.ws_api_stage_name
64 }
65 ]
66}

Connecting users

You can connect to SUMU Websocket API using the native Websocket component or a helper package like Sockette. The authorization query string is required to establish the connection with the API.

1import Sockette from "sockette";
2
3function connect(accessToken: string): void {
4 let endpoint = `wss://live.kodhive.com/push?authorization=${accessToken}`;
5 setConnecting(true);
6 ws = new Sockette(endpoint, {
7 timeout: 5e3,
8 maxAttempts: 5,
9 onopen: e => {keepAlive();setConnected(true);setConnecting(false);},
10 onmessage: e => {
11 console.log(JSON.parse(e.data).message)
12 },
13 });
14}

To keep the user’s connection active, you can send ping frames periodically, API Gateway will respond with a pong frame immediately.

1let keepAliveInterval: any = null;
2function ping() {
3 if (ws && connected) {
4 ws.json({action: 'ping'});
5 } else message.error("Not yet connected!")
6}
7function keepAlive() {
8 if (ws && connected) {
9 clearInterval(keepAliveInterval)
10 keepAliveInterval = setInterval(ping, 3 * 60 * 1000) // Every 3 minutes
11 } else message.error("Not yet connected!")
12}
13keepAlive()

Sending messages from clients to backend

SUMU is integrated with SNS and SQS. clients can send messages to SNS or publish them to SQS queue, The message should be a JSON String that contains the action and the actual message:

  • Send a message to backend applications through SQS:
1function send(type: string, msg: {}): void {
2 if (ws && connected) {
3 ws.json({
4 action: "send",
5 message: {type: type, message: msg}
6 });
7 message.success("Message sent!");
8 } else message.error("Not yet connected!")
9}
  • Publish a message to backend applications through SNS:
1function publish(type: string, msg: {}): void {
2 if (ws && connected) {
3 ws.json({
4 action: "publish",
5 message: {type: type, message: msg}
6 });
7 message.success("Message published!");
8 } else message.error("Not yet connected!")
9}

Subscribing backends to clients’ messages

For instance, you can subscribe a Lambda Function as a backend processor of clients’ messages by creating an SNS subscription and allowing SNS to invoke the Lambda Function:

1resource "aws_sns_topic_subscription" "_" {
2 topic_arn = var.messages_topic_arn
3 protocol = "lambda"
4 endpoint = module.server.lambda["alias_arn"]
5}
6
7resource "aws_lambda_permission" "with_sns" {
8 statement_id = "AllowExecutionFromSNS"
9 action = "lambda:InvokeFunction"
10 function_name = module.server.lambda["arn"]
11 qualifier = module.server.lambda["alias"]
12 principal = "sns.amazonaws.com"
13 source_arn = var.messages_topic_arn
14}

In addition to Lambda, you can create subscriptions to publish messages to HTTP webhook endpoints, SMS and Emails.

Polling clients messages from backends

In case a backend application wants to process clients’ messages in batches, you can create an SQS event source and give the Lambda Function permission to receive messages from the queue:

1resource "aws_lambda_event_source_mapping" "_" {
2 enabled = true
3 batch_size = 10
4 event_source_arn = var.messages_queue_arn
5 function_name = module.server.lambda["alias_arn"]
6 maximum_batching_window_in_seconds = 0 # Do not wait until batch size is fulfilled
7}
8
9data "aws_iam_policy_document" "policy" {
10 statement {
11 actions = [
12 "sqs:ChangeMessageVisibility",
13 "sqs:ChangeMessageVisibilityBatch",
14 "sqs:DeleteMessage",
15 "sqs:DeleteMessageBatch",
16 "sqs:GetQueueAttributes",
17 "sqs:ReceiveMessage"
18 ]
19
20 resources = [
21 var.messages_queue_arn
22 ]
23 }
24}

SQS is better than SNS if you want to avoid hitting the Lambda Concurrency Limit which is 1,000 (Can be increased to 100,000 by AWS service request)

Notifying clients from backend

Backend applications can push notifications to AWS API Gateway Websocket connected users by sending a notification request to the service integrated with the Pusher (SNS|SQS), notifications requests should meet the following format:

For multicast notifications, the message should be a JSON String that contains the list of users and the actual data:

1import json
2
3message = {
4 "users": ["783304b1-2320-44db-8f58-09c3035a686b", "a280aa41-d99b-4e1c-b126-6f39720633cc"],
5 "data": {"type": "notification", "message": "A message sent to multiple user"}
6}
7message_to_send = json.dumps(message)

For broadcast notifications, the same but do not provide users list or provide an empty users list:

1import json
2
3message = {
4 "data": {"type": "announcement", "message": "A broadcast to all users"}
5}
6message_to_send = json.dumps(message)

For exclusion notifications, instead of providing users list, provide a list of excluded users:

1import json
2
3message = {
4 "exclude_users": ["783304b1-2320-44db-8f58-09c3035a686b"],
5 "data": {
6 "type": "announcement",
7 "message": {
8 "user_id": "783304b1-2320-44db-8f58-09c3035a686b",
9 "status": "OFFLINE"
10 }
11 }
12}
13message_to_send = json.dumps(message)

Notification requests through SNS

SUMU Pusher is subscribing to notifications SNS Topic, and whenever backend applications Publish notification requests to SNS, the later will quickly notify the Pusher by sending the notification request to the subscribed Pusher Lambda.

This will result in a fast delivery because this approach does not introduce a polling mechanism and SNS will notify the Pusher whenever a notification request is available.

However, at scale SNS will trigger a Pusher Lambda Function for every notification request and given that the Lambda Concurrency Limit is 1,000 per account (Can be increased to 100,000 by support-ticket) notification requests will be throttled for very large applications that can have more than 100,000 concurrent inflight messages.

Publish to SNS when you have a small/medium application with a moderate number of users

1import os
2import json
3import time
4import boto3
5
6message = {
7 "users": ["783304b1-2320-44db-8f58-09c3035a686b", "a280aa41-d99b-4e1c-b126-6f39720633cc"],
8 "data": {
9 "type": "notification",
10 "message": {
11 "text": "Your order has been fulfilled!",
12 "timestamp": int(time.time())
13 }
14 }
15}
16boto3.client("sns").sns.publish(
17 TargetArn=os.environ["NOTIFICATIONS_TOPIC_ARN"],
18 Message=json.dumps(message),
19)

Sending notification requests through SQS

Unlike SNS, when sending notifications to SQS queue, the Pusher Lambda Function event source is configured to poll notification requests from the SQS Queue, and it will periodically poll notification requests from the Queue using Long Polling Technique.

This will result in notifications requests to be processed in batches, which comes with many benefits:

  • Fewer Lambda Invocations - to not reach the Lambda Concurrency Limit.
  • Concurrent Notifications - as the pusher uses AsyncIO, it will be able to process batches of SQS Records concurrently.
  • Low cost - thanks to SQS Batches and fewer Lambda Invocations.

Pusher can meet the same speed and performance of SNS because the SQS queue receive_wait_time_seconds is set to 20. this will make the Lambda Service do Long Polling instead of Short Polling. In addition to that, Lambda service will have a background worker that has five instances polling every 20 seconds, this will ensure that the lambda will receive the notifications requests as soon as they arrive in the queue.

AWS: the automatic scaling behavior of Lambda is designed to keep polling costs low when a queue is empty while simultaneously letting us scale up to high throughput when the queue is being used heavily. When an SQS event source mapping is initially created and enabled, or when messages first appear after a period with no traffic, then the Lambda service will begin polling the SQS queue using five parallel long-polling connections. The Lambda service monitors the number of inflight messages, and when it detects that this number is trending up, it will increase the polling frequency by 20 ReceiveMessage requests per minute and the function concurrency by 60 calls per minute. As long as the queue remains busy it will continue to scale until it hits the function concurrency limits.

5 parallel connections, each will send 3 ReceiveMessage requests per minute = 15 messages every minute. so 900 every hour, 21600 every day and 648,000 every month.

AWS gives you one million messages for free every month. After that it’s only $0.40 per million messages, so the cost is very low for consuming messages from SQS.

Send to SQS when you have a large application with millions of concurrent inflight messages.

1import os
2import json
3import time
4import boto3
5
6message = {
7 "users": ["783304b1-2320-44db-8f58-09c3035a686b", "a280aa41-d99b-4e1c-b126-6f39720633cc"],
8 "data": {
9 "type": "notification",
10 "message": {
11 "text": "Your order has been fulfilled!",
12 "timestamp": int(time.time())
13 }
14 }
15}
16boto3.client("sqs").send_message(
17 QueueUrl=os.environ.get("NOTIFICATIONS_QUEUE_URL"),
18 MessageBody=json.dumps(message),
19)

A live demo application is deployed at https://sumu.kodhive.com, hurry up and test it, the live demo will be removed after 1 month from publishing the article!

That’s all 🎉. enjoy your serverless Websocket API. However, if you are not in a rush, continue the article to see how we’ve built it. you will not regret that.

“You take the blue pill, the story ends, you wake up in your bed and believe whatever you want to believe. You take the red pill, you stay in wonderland, and I show you how deep the rabbit hole goes.” Morpheus

Integration Mechanism

Source: Integration

It's working

First thing to think about when building reusable and interoperable stacks is how the dependent applications will integrate with them in terms of input/output.


In order for our stack to be interoperable with other external applications, we will expose:

  • Two Outputs - SNS Topic and SQS Queue for clients’ messages.
  • Two Inputs - SNS Topic and SQS Queue for backend’s notification requests.

Output

  • Messages SNS Topic: where the clients’ messages will be published, external applications’ backend processors can subscribe to this topic and SNS will broadcast published messages to the subscribers as they arrive.
1# components/integration/messages.tf
2resource "aws_sns_topic" "messages" {
3 name = "${local.prefix}-messages"
4}
  • Messages SQS Queue: where the clients’ messages will be sent, external applications’ backend processors can be configured with the queue as an event source, long poll messages and process them in batches.
1# components/integration/messages.tf
2# Receives users messages from "send" route
3resource "aws_sqs_queue" "messages" {
4 name = "${local.prefix}-messages"
5 fifo_queue = false
6 content_based_deduplication = false
7
8 delay_seconds = 0 # No delay
9 max_message_size = 262144 # 256 KiB
10 receive_wait_time_seconds = 20 # Long Polling not Short Polling
11 message_retention_seconds = 345600 # 4 Days
12 visibility_timeout_seconds = 120*6 # 12 minutes
13
14 redrive_policy = jsonencode({
15 deadLetterTargetArn = aws_sqs_queue.messages_dlq.arn
16 maxReceiveCount = 5 # Move failed messages to DLQ after 5 failures
17 })
18
19 tags = local.common_tags
20}
21
22resource "aws_sqs_queue" "messages_dlq" {
23 name = "${local.prefix}-messages-dlq"
24}

Input

  • Notifications SNS Topic: where external applications’ backend processors publish notification requests, SUMU Publisher that we will build later on this article, will subscribe to this topic and will fulfill notification requests by posting them to target users connections.
1# components/integration/notifications.tf
2resource "aws_sns_topic" "notifications" {
3 name = "${local.prefix}-notifications"
4}
  • Notifications SQS Topic: where external applications’ can send notification requests for batch processing by SUMU Publisher, this will level the load from Lambda Service, in order for SUMU publisher to process notification requests in batches without hitting Lambda Concurrency Limit and without enduring IO wait time cost as the requests will be processed concurrently.
1# components/integration/notifications.tf
2# Receives backend notifications requests
3resource "aws_sqs_queue" "notifications" {
4 name = "${local.prefix}-notifications"
5 fifo_queue = false
6 content_based_deduplication = false
7
8 delay_seconds = 0 # No delay
9 max_message_size = 262144 # 256 KiB
10 receive_wait_time_seconds = 20 # Long Polling not Short Polling
11 message_retention_seconds = 345600 # 4 Days
12 visibility_timeout_seconds = 120*6 # 12 minutes, 6 times the lambda timeout (AWS Recommendation)
13
14 redrive_policy = jsonencode({
15 deadLetterTargetArn = aws_sqs_queue.notifications_dlq.arn
16 maxReceiveCount = 5 # Move failed messages to DLQ after 5 failures
17 })
18
19 tags = local.common_tags
20}
21
22resource "aws_sqs_queue" "notifications_dlq" {
23 name = "${local.prefix}-notifications-dlq"
24}

Load leveling

Do you remember Load Leveling Pattern from part 1? we are following the same pattern here by creating an SQS queue allowing external applications to poll from the queue as much as they can process without crashing low resources’ backends.

Also, this will allow SUMU Notifications Pusher Lambda to process notifications from the queue in batches instead of one by one. Eventually, we will avoid the Lambda concurrency limit because there will be fewer invocations.

We have created two SQS queues and a dead letter queue for each one of them, where failed clients messages and failed backend messages will be placed for further investigation, the redrive policy will move failed messages to dead letter queues after 5 failures.

We have set the delay_seconds to 0 seconds to not delay notifications and receive_wait_time_seconds to 20 seconds to use Long Polling instead of Short Polling because it’s both a fast and a cost effective mode. the visibility_timeout_seconds is set to 6 times the timeout configured for the Pusher Lambda function as per AWS recommendation.

AWS In almost all cases, Amazon SQS long polling is preferable to short polling. Long-polling requests let your queue consumers receive messages as soon as they arrive in your queue while reducing the number of empty ReceiveMessageResponse instances returned.

Users Connections Store

Source: Users Connection Store

Store

AWS API Gateway does not provide a builtin functionality to store and manage users’ connections, so we have to build a websocket connections store and tracking mechanism.

The first thing we have to have is a connections store, and for that we will leverage DynamoDB because of its:

  • Performance and scalability - Amazon DynamoDB provides high throughput at very low latency and gives us the ability to auto-scale by tracking how close our usage is to the upper bounds.

  • Events Stream - DynamoDB streams allows us to receive and update item-level data before and after changes.

  • Time To Live: allowing us to set timestamps for deleting expired data from our tables. as soon as the timestamp expires.

  • Cost Effective – One year free tier allows more than 40 million database operations/month and pricing is based on throughput (read/write per second) rather than storage.

1# components/database/connections.tf
2resource "aws_dynamodb_table" "connections" {
3 name = "SumuConnections"
4
5 ################
6 # Billing
7 ################
8 billing_mode = "PAY_PER_REQUEST"
9 read_capacity = 0
10 write_capacity = 0
11
12 ##################
13 # Index/Sort Keys
14 ##################
15 hash_key = "user_id"
16 range_key = "connection_id"
17
18 ##################
19 # Attributes
20 ##################
21 # User (Partition Key)
22 attribute {
23 name = "user_id"
24 type = "S"
25 }
26
27 # WS Connection ID (Sort Key)
28 attribute {
29 name = "connection_id"
30 type = "S"
31 }
32
33 ttl {
34 enabled = true
35 attribute_name = "delete_at"
36 }
37
38 stream_enabled = true
39 stream_view_type = "KEYS_ONLY"
40
41 tags = {
42 table_name = "SumuConnections"
43 }
44}

The configuration for the connections table is as follows:

Billing mode: pay per request for a Serverless Mode.

Partition Key: This will store the user id which is the user’s JWT sub attribute. in order for the Messages’ Pusher to retrieve all connections of a specific user.

Sort Key: This will hold the user’s websocket connection_id generated after successful authorization and connection upgrade. every user will have multiple connections, and we will not create an inverted schema Global Secondary Index for this range key, because we will not need to get the user of a specific connection. instead, we will only need to list the connections of a specific user.

TTL: Time To Live for each connection, we will generate it after connection upgrade and we will set it to +2h future timestamp for when DynamoDB will consider the connection as stale and it will automatically delete it. it’s +2h because API Gateway Websocket connection max timeout is 2 hours, if a connection remains on the table more than 2 hours, this is an indicator of a stale connection that should be pruned.

Stream: we’ve enabled DynamoDB streams to fanout DELETE and INSERT events, this will make the presence’s watchdog that we will build later able to track users online and offline events.

Websocket JWT Request Authorizer

Source: AWS API Gateway JWT Authorizer

Fingerprint

Clients will connect to the WebSocket API when initiating a WebSocket Upgrade Request. If the request succeeds, the $connect route is executed while the connection is being established.

Because the WebSocket connection is a stateful connection, we can configure the authorization on the $connect route only. AuthN/AuthZ will be performed only at connection time. If the $connect request fails due to AuthN/AuthZ failure, the connection will not be made.

AWS API Gateway supports multiple mechanisms for authentication and authorization, it supports AWS IAM Roles/Policies, IAM Tags and Custom Lambda Authorizers.

In order for our stack to support most third-party applications we will go with Lambda JWT Authorizer, because JWT is widely used and considered a standard for web applications. bellow is the handler for authorization requests:

1import os
2import time
3
4import requests
5from cachecontrol import CacheControl
6from cachecontrol.caches import FileCache
7from jose import jwt, jws, jwk, JWTError
8from jose.utils import base64url_decode
9
10FIREBASE_JWK_URI = "https://www.googleapis.com/service_accounts/v1/metadata/x509/[email protected]"
11
12
13def search_for_key(token, keys, construct=True):
14 # get the kid from the headers prior to verification
15 headers = jwt.get_unverified_headers(token)
16 kid = headers["kid"]
17 # search for the kid in the downloaded public keys
18 key = list(filter(lambda k: k["kid"] == kid, keys))
19 if not key:
20 raise JWTError(f"Public key not found in jwks.json")
21 else:
22 key = key[0]
23 if construct:
24 return jwk.construct(key)
25 else:
26 return key
27
28
29def get_public_key(token):
30 """
31 Because Google's public keys are only changed infrequently (on the order of once per day),
32 we can take advantage of caching to reduce latency and the potential for network errors.
33 """
34 jwks_uri = os.environ["JWT_ISSUER_JWKS_URI"]
35 sess = CacheControl(requests.Session(), cache=FileCache("/tmp/jwks-cache"))
36 request = sess.get(jwks_uri)
37 ks = request.json()
38 keys = []
39 #
40 if jwks_uri == FIREBASE_JWK_URI:
41 for k, v in ks.items():
42 keys.append({
43 "alg": "RS256",
44 "kid": k,
45 "pem": v
46 })
47 return search_for_key(token, keys, construct=False)
48 else:
49 keys = ks["keys"]
50 return search_for_key(token, keys, construct=True)
51
52
53def valid_signature(token, key):
54 if isinstance(key, dict):
55 # verify the signature, exception should be thrown if verification failed
56 jws.verify(token, key["pem"], [key["alg"]], verify=True)
57 else:
58 # get the last two sections of the token,
59 # message and signature (encoded in base64)
60 message, encoded_signature = str(token).rsplit('.', 1)
61 # decode the signature
62 decoded_signature = base64url_decode(encoded_signature.encode("utf-8"))
63 # verify the signature
64 if not key.verify(message.encode("utf8"), decoded_signature):
65 raise JWTError("Signature verification failed")
66 return True
67
68
69def decode(token, verify_expiration=True, authorized_audiences=None):
70 # since we passed the verification, we can now safely
71 # use the unverified claims
72 claims = jwt.get_unverified_claims(token)
73 # additionally we can verify the token expiration
74 if verify_expiration:
75 if time.time() > claims["exp"]:
76 raise JWTError("Token is expired")
77 # and the Audience
78 if authorized_audiences:
79 # OID TOKEN (aud), OAUTH ACCESS TOKEN (client_id)
80 aud = claims.get("aud", claims.get("client_id"))
81 if not aud:
82 raise JWTError("Token does not have aud nor client_id attribute")
83 if aud not in authorized_audiences:
84 raise JWTError("Token was not issued for this audience")
85 # now we can use the claims
86 return claims
87
88
89def verify(token):
90 key = get_public_key(token)
91 if valid_signature(token, key):
92 authorized_audiences = os.environ.get("JWT_AUTHORIZED_AUDIENCES", []).split(",")
93 return decode(
94 token,
95 verify_expiration=os.environ.get("JWT_VERIFY_EXPIRATION", "true") == "true",
96 authorized_audiences=authorized_audiences if len(authorized_audiences) else None
97 )
98
99
100def generate_policy(principal, effect, reason=None):
101 auth_response = {
102 "principalId": principal,
103 "policyDocument": {
104 "Version": "2012-10-17",
105 "Statement": [
106 {
107 "Action": "execute-api:Invoke",
108 "Effect": effect,
109 "Resource": os.environ.get("AUTHORIZED_APIS", "*").split(",")
110 }
111 ]
112 }
113 }
114 if reason:
115 auth_response.update({
116 'context': {
117 'error': reason
118 }
119 })
120 return auth_response
121
122
123def check_auth(token):
124 if not token:
125 return generate_policy("rogue", "Deny", reason="Missing Access Token")
126 try:
127 claims = verify(token)
128 if claims:
129 return generate_policy(claims["sub"], "Allow")
130 except Exception as e:
131 return generate_policy("rogue", "Deny", reason=str(e))
132
133
134def handle(event, context):
135 token = event["headers"].get("Authorization", event.get("queryStringParameters", {}).get("authorization"))
136 policy = check_auth(token)
137 return policy

The authorizer will be able to:

  • Integrate with any JWT token provider.
  • Verify user’s JWT token integrity using provider’s public keys.
  • Verify token expiration.
  • Check that the token is issued for an audience in the allowed audiences list.
  • Bind the JWT’s sub user attribute to the API Gateway authorization policy.
  • Return a deny or allow authorization policy to API Gateway.

Deploy the Authorizer

To deploy the Authorizer as a Lambda Function, we will use the Terraform modules from Part3

  • Create the Lambda Function
1# modules/serverless/components.tf
2module "authorizer" {
3 source = "git::https://github.com/obytes/terraform-aws-codeless-lambda.git//modules/lambda"
4 prefix = "${local.prefix}-authorizer"
5 common_tags = local.common_tags
6
7 handler = "app.main.handle"
8 envs = {
9 AUTHORIZED_APIS = join(",", module.gateway.authorized_apis)
10 JWT_ISSUER_JWKS_URI = var.issuer_jwks_uri
11 JWT_AUTHORIZED_AUDIENCES = join(",", var.authorized_audiences)
12 JWT_VERIFY_EXPIRATION = var.verify_token_expiration
13 }
14}
  • Deploy the Lambda Function
1# modules/serverless/ci.tf
2module "authorizer_ci" {
3 source = "git::https://github.com/obytes/terraform-aws-lambda-ci.git//modules/ci"
4 prefix = "${local.prefix}-authorizer-ci"
5 common_tags = var.common_tags
6
7 # Lambda
8 lambda = module.authorizer.lambda
9 app_src_path = "sources"
10 packages_descriptor_path = "sources/requirements/lambda.txt"
11
12 # Github
13 s3_artifacts = var.s3_artifacts
14 github = var.github
15 pre_release = var.pre_release
16 github_repository = var.github_repositories.authorizer
17
18 # Notifications
19 ci_notifications_slack_channels = var.ci_notifications_slack_channels
20}

Websocket AWS API Gateway

Source: AWS API Gateway Websocket API

Gateway

After preparing the integration, the connections store and the request authorizer components we will now set up our API Gateway and integrate it with those components, but first we have to create an IAM Role for API Gateway to assume and attach an IAM Policy to it so it can integrate with those components:

1# components/gateway/iam.tf
2# Integration role
3resource "aws_iam_role" "_" {
4 name = var.prefix
5 assume_role_policy = data.aws_iam_policy_document.assume_policy.json
6 path = "/"
7}
8data "aws_iam_policy_document" "assume_policy" {
9 statement {
10 effect = "Allow"
11 actions = ["sts:AssumeRole"]
12 principals {
13 type = "Service"
14 identifiers = ["apigateway.amazonaws.com"]
15 }
16 }
17}
18data "aws_iam_policy_document" "policy" {
19 statement {
20 effect = "Allow"
21 actions = [
22 "lambda:InvokeFunction",
23 ]
24 resources = [
25 var.request_authorizer.alias_arn
26 ]
27 }
28 statement {
29 effect = "Allow"
30 actions = [
31 "dynamodb:PutItem",
32 "dynamodb:DeleteItem",
33 ]
34
35 resources = [
36 var.connections_table.arn,
37 ]
38 }
39 statement {
40 effect = "Allow"
41 actions = [
42 "sns:Publish",
43 ]
44
45 resources = [
46 var.messages_topic.arn,
47 ]
48 }
49 statement {
50 effect = "Allow"
51 actions = [
52 "sqs:SendMessage",
53 ]
54
55 resources = [
56 var.messages_queue.arn,
57 ]
58 }
59}
60resource "aws_iam_policy" "_" {
61 name = local.prefix
62 policy = data.aws_iam_policy_document.policy.json
63 path = "/"
64}
65resource "aws_iam_role_policy_attachment" "_" {
66 policy_arn = aws_iam_policy._.arn
67 role = aws_iam_role._.name
68}

We’re in a good place regarding permissions, let’s create an API Gateway V2 Websocket API resource where we specify the selection expressions for the route and API Keys:

1# components/gateway/api.tf
2resource "aws_apigatewayv2_api" "_" {
3 name = "${local.prefix}-ws-api"
4 description = "Sumu Websockets API"
5 protocol_type = "WEBSOCKET"
6
7 route_selection_expression = "$request.body.action"
8 api_key_selection_expression = "$request.header.x-api-key"
9
10 tags = local.common_tags
11}

Next, we integrate the API Gateway Websocket API with the Lambda JWT Authorizer we have created earlier, we set the authorization type to REQUEST because we are going to use a request authorizer.

There is no method in the JavaScript WebSockets API for specifying additional headers for the client/browser to send. for that, we instruct API Gateway to get the authorization header from the query string route.request.querystring.authorization.

1# components/gateway/authorizers.tf
2resource "aws_apigatewayv2_authorizer" "request" {
3 name = "${var.prefix}-request-authz"
4 api_id = aws_apigatewayv2_api._.id
5 authorizer_type = "REQUEST"
6 identity_sources = [
7 "route.request.querystring.authorization",
8 ]
9 authorizer_uri = var.request_authorizer.invoke_arn
10 authorizer_credentials_arn = aws_iam_role._.arn
11}

After integrating the Request Authorizer with API Gateway and in addition to the IAM Role, we need an additional Lambda Permission to give API Gateway access to invoke the Authorizer Lambda Function:

1# components/gateway/permission.tf
2resource "aws_lambda_permission" "allow_apigw" {
3 statement_id = local.prefix
4 action = "lambda:InvokeFunction"
5 function_name = var.request_authorizer.name
6 qualifier = var.request_authorizer.alias
7 principal = "apigateway.amazonaws.com"
8 source_arn = "${aws_apigatewayv2_api._.execution_arn}/${aws_apigatewayv2_stage._.name}/*"
9}

Connection route

Setting up an integration for $connect is optional. because API Gateway provides a default $connect route. however, in our case we want to build custom $connect and $disconnect routes for many reasons:

  • We want to be notified when clients connect and disconnect.
  • We want to throttle connections and control who connects.
  • We want our backend to publish users presence messages (online, offline) back to clients using Fanout SNS Topics.
  • We want to store each connection ID and other information into a database (Amazon DynamoDB).

Unlike other approaches that evolves Lambda Functions to manage user’s connections, We will follow a fast and cost effective approach and instead of going through a proxy Lambda Function to persist the connection, we will call DynamoDB PutItem action directly from API Gateway.

First, we need a request template that acts as the DynamoDB PutItem action body, we will map the principalId from the authorizer context to the table’s Hash Key user_id, and map the generated connectionId to the table’s Range Key connection_id.

1# components/gateway/connect/ddb_put.json
2{
3 "TableName": "SumuConnections",
4 "Item": {
5 "user_id": {
6 "S": "$context.authorizer.principalId"
7 },
8 "connection_id": {
9 "S": "$context.connectionId"
10 },
11 #set($delete_connection_at = ($context.requestTimeEpoch / 1000) + 7200)
12 "delete_at": {
13 "S": "$delete_connection_at"
14 }
15 }
16}

As per AWS docs, the maximum connection duration for WebSocket APIs is 2 hours and to make sure stale connections are deleted from DynamoDB, we set the connection TTL attribute delete_at to now+2H.


Next, we will configure the integration with DynamoDB using the integration resource, we will provide:

  • The integration URI: that will be called by API Gateway, including the action type PutItem

  • The credentials ARN: the arn of the IAM role we created earlier. allowing ddb:PutItem permission

  • The request template: the request body template we’ve created earlier.

1# components/gateway/connect/integration.tf
2resource "aws_apigatewayv2_integration" "ack_presence" {
3 api_id = var.api_id
4 description = "Acknowledge user presence"
5
6 passthrough_behavior = "WHEN_NO_MATCH"
7 payload_format_version = "1.0"
8
9 # Upstream
10 integration_type = "AWS"
11 integration_uri = "arn:aws:apigateway:${data.aws_region.current.name}:dynamodb:action/PutItem"
12 connection_type = "INTERNET"
13 credentials_arn = var.credentials_arn
14 integration_method = "POST"
15 timeout_milliseconds = 29000
16
17 request_templates = {
18 "application/json" = file("${path.module}/ddb_put.json")
19 }
20
21 lifecycle {
22 ignore_changes = [
23 passthrough_behavior
24 ]
25 }
26}

After setting up the integration, we will add a new route that will receive the connection requests, and authorize the users with our JWT Request Authorizer before establishing the connection:

1# components/gateway/connect/route.tf
2resource "aws_apigatewayv2_route" "connect" {
3 api_id = var.api_id
4
5 # UPSTREAM
6 target = "integrations/${aws_apigatewayv2_integration.ack_presence.id}"
7 route_key = "$connect"
8 operation_name = "Acknowledge user presence"
9
10 # AUTHORIZATION
11 authorizer_id = var.request_authorizer_id
12 authorization_type = "CUSTOM"
13 api_key_required = false
14
15 route_response_selection_expression = "$default"
16}

Lastly, we add an integration response and a route response to return to the client:

1# components/gateway/connect/response.tf
2resource "aws_apigatewayv2_integration_response" "hello" {
3 api_id = var.api_id
4 integration_id = aws_apigatewayv2_integration.ack_presence.id
5 integration_response_key = "/200/"
6}
7
8resource "aws_apigatewayv2_route_response" "hello" {
9 api_id = var.api_id
10 route_id = aws_apigatewayv2_route.connect.id
11 route_response_key = "$default"
12}

Disconnection route

The $disconnect route is executed after the connection is closed. The connection can be closed by the client or by the server after:

  • IDLE connection timeout - 10 minutes, if the client did not send keepalive (ping/pong) requests.
  • Maximum connection duration: even if the client sends keepalive requests, it will be disconnected by API Gateway after 2 hours.

As the connection is already closed when the route is executed, $disconnect is a best-effort event. API Gateway will try its best to deliver the $disconnect event to our integration, but it cannot guarantee delivery. this is why we are using the DynamoDB TTL attributes to delete the stale connection automatically after 2hours.


The same as the connection route we will need a request template. however, it will be used this time to delete the closed connection.

1# components/gateway/disconnect/ddb_delete.json
2{
3 "TableName": "SumuConnections",
4 "Key": {
5 "user_id": {
6 "S": "$context.authorizer.principalId"
7 },
8 "connection_id": {
9 "S": "$context.connectionId"
10 }
11 }
12}

The integration request will be the same as the connection integration request, the only things we need to change are the DynamoDB action which will be DeleteItem, and reference the new deletion request template:

1# components/gateway/disconnect/integration.tf
2resource "aws_apigatewayv2_integration" "ack_absence" {
3 api_id = var.api_id
4 description = "Acknowledge user absence"
5
6 passthrough_behavior = "WHEN_NO_MATCH"
7 payload_format_version = "1.0"
8
9 # Upstream
10 integration_type = "AWS"
11 integration_uri = "arn:aws:apigateway:${data.aws_region.current.name}:dynamodb:action/DeleteItem"
12 connection_type = "INTERNET"
13 credentials_arn = var.credentials_arn
14 integration_method = "POST"
15 timeout_milliseconds = 29000
16
17 request_templates = {
18 "application/json" = file("${path.module}/ddb_delete.json")
19 }
20
21 lifecycle {
22 ignore_changes = [
23 passthrough_behavior
24 ]
25 }
26}

When the connection is terminated gracefully or ungracefully, API Gateway will trigger the disconnection route, so let’s create this route. this time we will not need the authorizer because as we said, it’s only needed for connection route:

1# components/gateway/disconnect/route.tf
2resource "aws_apigatewayv2_route" "disconnect" {
3 api_id = var.api_id
4
5 # UPSTREAM
6 target = "integrations/${aws_apigatewayv2_integration.ack_absence.id}"
7 route_key = "$disconnect"
8 operation_name = "Acknowledge user absence"
9
10 # AUTHORIZATION
11 authorization_type = "NONE"
12 api_key_required = false
13
14 route_response_selection_expression = "$default"
15}

For graceful connection termination, we need the disconnect route to return responses to clients:

1# components/gateway/disconnect/response.tf
2resource "aws_apigatewayv2_integration_response" "bye" {
3 api_id = var.api_id
4 integration_id = aws_apigatewayv2_integration.ack_absence.id
5 integration_response_key = "/200/"
6}
7
8resource "aws_apigatewayv2_route_response" "bye" {
9 api_id = var.api_id
10 route_id = aws_apigatewayv2_route.disconnect.id
11 route_response_key = "$default"
12}

Keep Alive route

To ensure the clients connections are not considered IDLE by API Gateway after 10 minutes timeout, we will implement a ping/pong mechanism to serve as a keepalive and as a means to verify that the remote client is still responsive. The clients may send a Ping request periodically after the connection is established and before the connection is closed.

As a start we need two request templates, one for a MOCK service that will receive the ping request, and the second one is for the actual pong response sent to the client:

1# components/gateway/keepalive/ping.json
2{
3 "statusCode" : 200
4}
1# components/gateway/keepalive/pong.json
2{
3 "type": "pong",
4 "statusCode" : 200,
5 "connectionId" : "$context.connectionId"
6}

We need an integration to a MOCK service that will receive the ping request from the ping route and return HTTP_OK status code:

1# components/gateway/keepalive/integration.tf
2resource "aws_apigatewayv2_integration" "ping" {
3 api_id = var.api_id
4 description = "Receive ping frame from client"
5
6 # Upstream
7 integration_type = "MOCK"
8
9 template_selection_expression = "200"
10 request_templates = {
11 "200" = file("${path.module}/ping.json")
12 }
13
14 lifecycle {
15 ignore_changes = [
16 passthrough_behavior
17 ]
18 }
19}

We also need a route to receive the ping request from the client and forward it to the MOCK integration:

1# components/gateway/keepalive/route.tf
2resource "aws_apigatewayv2_route" "ping" {
3 api_id = var.api_id
4
5 # UPSTREAM
6 target = "integrations/${aws_apigatewayv2_integration.ping.id}"
7 route_key = "ping"
8 operation_name = "Ping websocket server"
9
10 # AUTHORIZATION
11 authorization_type = "NONE"
12 api_key_required = false
13
14 route_response_selection_expression = "$default"
15}

Upon receipt of a Ping frame, API Gateway must send a Pong response in response, unless it already received a disconnect request. It should respond with Pong response as soon as is practical:

1# components/gateway/keepalive/response.tf
2resource "aws_apigatewayv2_integration_response" "ping" {
3 api_id = var.api_id
4 integration_id = aws_apigatewayv2_integration.ping.id
5 integration_response_key = "/200/" # must be /XXX/ or $default
6
7 template_selection_expression = "200"
8 response_templates = {
9 "200" = file("${path.module}/pong.json")
10 }
11}
12
13resource "aws_apigatewayv2_route_response" "ping" {
14 api_id = var.api_id
15 route_id = aws_apigatewayv2_route.ping.id
16 route_response_key = "$default" # must be default
17}

Thanks to the ping route we’ve created, users can just send this ping websocket message periodically to keep their connection active:

1{"action": "ping"}

JSON Messages sent to API Gateway websocket routes should contain an action that match the target route or else it will be sent to the default route.

Publish route

We have dealt with connections lifecycle routes, and now we need to create the actual messaging routes, that will receive users messages and publish it to backend applications. before starting, in an event-driven architecture it’s crucial to agree on a format that messages should respect:

1{
2 "action": "publish",
3 "message": {
4 "type": "type of the message (eg: call_op, send_message ...)",
5 "message": {
6 "the actual": "message"
7 }
8 }
9}

To publish a message, the action should always be set to publish because we will name our route key as such. But first, let’s create our integration and this time we will do things differently. we will publish every received message to an SNS topic, so external applications interested in the message can subscribe to the topic:

1# components/gateway/publish/integration.tf
2resource "aws_apigatewayv2_integration" "publish" {
3 api_id = var.api_id
4 description = "Publish websocket message through SNS"
5
6 passthrough_behavior = "WHEN_NO_MATCH"
7 payload_format_version = "1.0"
8
9 # Upstream
10 integration_type = "AWS"
11 integration_uri = "arn:aws:apigateway:${data.aws_region.current.name}:sns:action/Publish"
12 connection_type = "INTERNET"
13 credentials_arn = var.credentials_arn
14 integration_method = "POST"
15 timeout_milliseconds = 5000
16
17 request_parameters = {
18 "integration.request.querystring.TopicArn" = "'${var.messages_topic_arn}'"
19 "integration.request.querystring.Message" = "route.request.body.message"
20 # Sender ID Attribute
21 "integration.request.querystring.MessageAttributes.entry.1.Name" = "'user_id'",
22 "integration.request.querystring.MessageAttributes.entry.1.Value.DataType" = "'String'",
23 "integration.request.querystring.MessageAttributes.entry.1.Value.StringValue" = "context.authorizer.principalId",
24 # Message Timestamp Attribute
25 "integration.request.querystring.MessageAttributes.entry.2.Name" = "'timestamp'",
26 "integration.request.querystring.MessageAttributes.entry.2.Value.DataType" = "'Number'",
27 "integration.request.querystring.MessageAttributes.entry.2.Value.StringValue" = "context.requestTimeEpoch",
28 # Message Source Attribute
29 "integration.request.querystring.MessageAttributes.entry.3.Name" = "'source'",
30 "integration.request.querystring.MessageAttributes.entry.3.Value.DataType" = "'String'",
31 "integration.request.querystring.MessageAttributes.entry.3.Value.StringValue" = "'apigw.route.publish'",
32 }
33
34 lifecycle {
35 ignore_changes = [
36 passthrough_behavior
37 ]
38 }
39}

Same as DynamoDB, we’ve set the integration uri to the SNS Publish action and provided the integration with the IAM role credentials ARN allowed to call that action.

This time we didn’t provide a request template because SNS does not allow messages to be sent in request body and requires API Gateway to send them in request query string. For that we have used the request_parameters attribute to map API Gateway request attributes with SNS request query string.

For every websocket message, the user_id attribute will be added to the message attributes, so external applications can distinguish the message originator in a safe and secure way. we are also mapping the message timestamp at AWS API Gateway because backend applications should not trust messages’ timestamps provided by clients, and should only trust the ones generated by API Gateway.

After setting up the integration, now we need a route to receive messages from users and publish them to the SNS integration:

1# components/gateway/publish/route.tf
2resource "aws_apigatewayv2_route" "publish" {
3 api_id = var.api_id
4
5 # UPSTREAM
6 target = "integrations/${aws_apigatewayv2_integration.publish.id}"
7 route_key = "publish"
8 operation_name = "Publish websocket message through SNS"
9
10 # AUTHORIZATION
11 authorization_type = "NONE"
12 api_key_required = false
13
14 route_response_selection_expression = "$default"
15}

Finally, we will create the integration response and the route response:

1# components/gateway/publish/response.tf
2resource "aws_apigatewayv2_integration_response" "publish" {
3 api_id = var.api_id
4 integration_id = aws_apigatewayv2_integration.publish.id
5 integration_response_key = "/200/"
6}
7
8resource "aws_apigatewayv2_route_response" "publish" {
9 api_id = var.api_id
10 route_id = aws_apigatewayv2_route.publish.id
11 route_response_key = "$default"
12}

Send route

As we agreed earlier, SUMU should also support sending clients messages to SQS queue for batch processing, and for that we will need to integrate AWS API Gateway Websocket API with SQS.

We will follow the same steps as we did with SNS integration, but this time SQS query params will have different names, and the action is SendMessage instead of Publish.

1# components/gateway/send/integration.tf
2resource "aws_apigatewayv2_integration" "send" {
3 api_id = var.api_id
4 description = "Send websocket message through SQS"
5
6 passthrough_behavior = "WHEN_NO_MATCH"
7 payload_format_version = "1.0"
8
9 # Upstream
10 integration_type = "AWS"
11 integration_uri = "arn:aws:apigateway:${data.aws_region.current.name}:sqs:action/SendMessage"
12 connection_type = "INTERNET"
13 credentials_arn = var.credentials_arn
14 integration_method = "POST"
15 timeout_milliseconds = 5000
16
17 request_parameters = {
18 "integration.request.querystring.QueueUrl" = "'${var.messages_queue_url}'"
19 "integration.request.querystring.MessageBody" = "route.request.body.message"
20 # Sender ID Attribute
21 "integration.request.querystring.MessageAttributes.1.Name" = "'user_id'"
22 "integration.request.querystring.MessageAttributes.1.Value.DataType" = "'String'"
23 "integration.request.querystring.MessageAttributes.1.Value.StringValue" = "context.authorizer.principalId"
24 # Message Timestamp Attribute
25 "integration.request.querystring.MessageAttributes.2.Name" = "'timestamp'"
26 "integration.request.querystring.MessageAttributes.2.Value.DataType" = "'Number'"
27 "integration.request.querystring.MessageAttributes.2.Value.StringValue" = "context.requestTimeEpoch"
28 # Message Source Attribute
29 "integration.request.querystring.MessageAttributes.3.Name" = "'source'",
30 "integration.request.querystring.MessageAttributes.3.Value.DataType" = "'String'",
31 "integration.request.querystring.MessageAttributes.3.Value.StringValue" = "'apigw.route.send'",
32 }
33
34 lifecycle {
35 ignore_changes = [
36 passthrough_behavior
37 ]
38 }
39}

Same as the publish messaging route, we need a send route:

1# components/gateway/send/route.tf
2resource "aws_apigatewayv2_route" "send" {
3 api_id = var.api_id
4
5 # UPSTREAM
6 target = "integrations/${aws_apigatewayv2_integration.send.id}"
7 route_key = "send"
8 operation_name = "Send websocket message through SQS"
9
10 # AUTHORIZATION
11 authorization_type = "NONE"
12 api_key_required = false
13
14 route_response_selection_expression = "$default"
15}

And integration/route responses:

1resource "aws_apigatewayv2_integration_response" "send" {
2 api_id = var.api_id
3 integration_id = aws_apigatewayv2_integration.send.id
4 integration_response_key = "/200/"
5}
6
7resource "aws_apigatewayv2_route_response" "send" {
8 api_id = var.api_id
9 route_id = aws_apigatewayv2_route.send.id
10 route_response_key = "$default"
11}

Deploying the Gateway

After preparing all routes, the only remaining step is to deploy our API.

1# components/gateway/stages.tf
2resource "aws_apigatewayv2_stage" "_" {
3 name = var.stage_name
4 api_id = aws_apigatewayv2_api._.id
5 description = "Default Stage"
6 auto_deploy = true
7
8 access_log_settings {
9 format = jsonencode(local.access_logs_format)
10 destination_arn = aws_cloudwatch_log_group.access.arn
11 }
12
13 default_route_settings {
14 logging_level = null
15 throttling_burst_limit = 5000
16 throttling_rate_limit = 10000
17 }
18
19 lifecycle {
20 ignore_changes = [
21 deployment_id,
22 ]
23 }
24}

The stage deployment mode is set to auto deploy, to avoid deploying the API on each change.

Presence Watchdog

Source: Presence Watchdog

Watchdog

There are multiple applications (eg: Chat Applications, Game Applications…) that requires users presence tracking, and they want the ability to get notified whenever a user gone offline and whenever he’s back online.

These presence events can then be dispatched by backend applications to all other users or a subset of users (eg, Friends, Groups). To provide this capability we need:

  • Connections/Disconnections stream: a stream of INSERT and DELETE events from the DynamoDB connections table.
  • Presence watchdog: a Lambda Function listening to the stream and decide if the user is offline or still online.
  • Presence Events Source: where the watchdog will publish/send presence events. we will use the already created messages topic/queue. external applications can decide the presence events’ source to be a topic or a queue, and eventually subscribe to the messages’ topic for presence events or poll from the queue, and broadcast the events to other parties.

We already have the DynamoDB steam enabled and the SNS topic in place, so let’s create the presence watchdog Lambda Function:

1# components/presence/sources/main.py
2from __future__ import print_function
3
4import json
5import logging
6import os
7import time
8
9import boto3
10from boto3.dynamodb.conditions import Key
11
12logger = logging.getLogger()
13logger.setLevel(logging.WARNING)
14PRESENCE_SOURCE = os.environ.get("PRESENCE_SOURCE", "queue")
15
16
17def user_still_online(user_id: str):
18 """
19 Check if user was disconnected from all devices
20 :param user_id: user principal id
21 :return: True if offline False if online
22 """
23 resource = boto3.resource("dynamodb")
24 connections_table = resource.Table(os.environ["CONNECTIONS_TABLE"])
25 active_connections = connections_table.query(
26 KeyConditionExpression=Key("user_id").eq(user_id)
27 ).get("Items", [])
28 return len(active_connections)
29
30
31def publish_user_presence(user_id: str, present: bool = True, event_time: float = 0):
32 """
33 Notify online/offline events
34 :param user_id: user principal id
35 :param present: True if online False if online
36 :param event_time: useful for precedence check if user
37 connects/disconnects rapidly and events came unordered
38 """
39 event = json.dumps({
40 "type": "presence",
41 "message": {
42 "user_id": user_id,
43 "status": "ONLINE" if present else "OFFLINE",
44 "timestamp": event_time
45 }
46 })
47 attributes = dict(
48 source={"DataType": "String", "StringValue": "lambda.presence.watchdog", },
49 user_id={"DataType": "String", "StringValue": user_id, },
50 timestamp={"DataType": "Number", "StringValue": f"{int(time.time())}", },
51 )
52 if PRESENCE_SOURCE == "topic":
53 boto3.client("sns").publish(
54 TargetArn=os.environ.get("MESSAGES_TOPIC_ARN"),
55 Message=event,
56 MessageAttributes=attributes
57 )
58 elif PRESENCE_SOURCE == "queue":
59 boto3.client("sqs").send_message(
60 QueueUrl=os.environ.get("MESSAGES_QUEUE_URL"),
61 MessageBody=event,
62 MessageAttributes=attributes
63 )
64 else:
65 print("Subscribe to presence directly from DynamoDB stream")
66
67
68def handler(event, context):
69 print(event)
70 try:
71 for record in event["Records"]:
72 event_time = record["dynamodb"]["ApproximateCreationDateTime"]
73 user_id = record["dynamodb"]["Keys"]["user_id"]["S"]
74 if record["eventName"] == "INSERT":
75 print(f"user {user_id} is online, notify!")
76 publish_user_presence(user_id, True, event_time)
77 elif record["eventName"] == "REMOVE":
78 print(f"user {user_id} gone offline!, check other user devices...")
79 if not user_still_online(user_id):
80 print(f"user {user_id} gone offline from all devices!, notify!")
81 publish_user_presence(user_id, False, event_time)
82 else:
83 print(f"user {user_id} still online on other devices, skip!")
84 except Exception as error:
85 logger.exception(error)

After user’s connection, the API Gateway will add the connection to the connections table. DynamoDB will produce an INSERT stream that will be caught by our watchdog, and finally the Watchdog will fanout an online event using the SNS Topic or SQS Queue, depending on what presence source the external application have chosen.

After user’s disconnection. the API Gateway will remove the connection from connections table. DynamoDB will produce a REMOVE stream that will be caught by our watchdog. If the user still have active connections in other devices the watchdog will just skip, but if it is the last connection, the watchdog will fanout an offline event to the SNS Topic or SQS Queue.

In order for the lambda function to receive connections/disconnections streams, we have to subscribe our Lambda function to the DynamoDB stream, and to make sure the presence Lambda does not retry infinitely in case of a malformed event, we’ve set the maximum_retry_attempts to 5 and configured the source mapping to ignore old events.

1# components/presence/stream.tf
2resource "aws_lambda_event_source_mapping" "presence_stream" {
3 enabled = true
4 event_source_arn = var.connections_table.stream_arn
5 function_name = aws_lambda_function.function.arn
6
7 starting_position = "LATEST"
8 maximum_retry_attempts = 5 # Retry for five times
9 maximum_record_age_in_seconds = 60 # Ignore Offline/Online events older than 1minutes
10}

We also need to add the required DynamoDB/SNS permissions to Lambda Function role so it can Query DynamoDB, Publish to the SNS topic and SendMessage to the SQS queue:

1# components/presence/iam.tf
2data "aws_iam_policy_document" "custom_policy_doc" {
3 statement {
4 effect = "Allow"
5 actions = [
6 "dynamodb:DescribeStream",
7 "dynamodb:GetRecords",
8 "dynamodb:GetShardIterator",
9 "dynamodb:ListStreams"
10 ]
11 resources = [
12 "*",
13 ]
14 }
15
16 statement {
17 actions = [
18 "dynamodb:Query",
19 ]
20
21 resources = [
22 var.connections_table.arn,
23 ]
24 }
25
26 statement {
27 actions = [
28 "sns:Publish",
29 ]
30
31 resources = [
32 var.messages_topic.arn
33 ]
34 }
35
36 statement {
37 actions = [
38 "sqs:SendMessage",
39 ]
40
41 resources = [
42 var.messages_queue.arn
43 ]
44 }
45}

Finally, we create our Lambda Function:

1data "archive_file" "lambda_zip" {
2 type = "zip"
3 output_path = "${path.module}/sources/dist.zip"
4 source_dir = "${path.module}/sources"
5}
6resource "aws_lambda_function" "function" {
7 function_name = local.prefix
8 role = aws_iam_role.role.arn
9 # runtime
10 runtime = var.runtime
11 handler = var.handler
12 # resources
13 memory_size = var.memory_size
14 timeout = var.timeout
15 # package
16 filename = data.archive_file.lambda_zip.output_path
17 source_code_hash = data.archive_file.lambda_zip.output_base64sha256
18
19 environment {
20 variables = {
21 CONNECTIONS_TABLE = var.connections_table.name
22 MESSAGES_TOPIC_ARN = var.messages_topic.arn
23 MESSAGES_QUEUE_URL = var.messages_queue.url
24 PRESENCE_SOURCE = var.presence_source
25 }
26 }
27
28 tags = merge(
29 local.common_tags,
30 {description = var.description}
31 )
32 depends_on = [data.archive_file.lambda_zip]
33}

Websocket Messages Pusher

Source: AWS API Gateway Websocket Pusher

Pusher

We have built the Websocket API for clients to send messages to backend applications. In addition to that, we will need to provide a Websocket Notifications Pusher for backend applications to push messages to connected clients. to achieve this we will need:

  • Notifications SNS Topic: where external applications will publish the notification requests.
  • Notifications SQS Queue: where external applications will send notification requests.
  • Connections Table: a tables with all active users connections.
  • Python AsyncIO Lambda Function: an async lambda function that will receive notification requests from SNS/SQS and send the notifications to users asynchronously.

We already have the SNS Topic, SQS Queue and the Connections Table, so we only need to build the Lambda function, subscribe it to SNS Topic and hook it to SQS event source.

We’ve picked Python AsyncIO because we want to take advantage of the event loop, this comes with two benefits:

  • Fast Notification: we can’t instruct AWS API Gateway to send a message to multiple users because API Gateway forces us to post a message to one connection at a time. and we can’t get connections of a list of users from Dynamo DB as we can query only by a single user_id. post_to_connection and query are IO operations, and we don’t want to block our pusher awaiting the response from API Gateway and DynamoDB.however, we can send many requests asynchronously without waiting by using AsyncIO event loop. this will lead us to a huge difference and notifications will be delivered faster than the sequential processing approach.

  • Cost effective: when using sequential processing, you are paying to AWS the computing cost and the waiting cost between each postToConnection and query requests. however, when using async processing you are paying just the computing cost and the actual Lambda runtime.

AWS: If you develop an AWS Lambda function with Node.js, you can call multiple web services without waiting for a response due to its asynchronous nature. All requests are initiated almost in parallel, so you can get results much faster than a series of sequential calls to each web service. Considering the maximum execution duration for Lambda, it is beneficial for I/O bound tasks to run in parallel.

We will follow AWS recommendation, but we will do it the Python way 😎.

1import asyncio
2import json
3import os
4from datetime import time, timedelta
5from typing import List, Set
6import time
7
8import aioboto3
9from boto3.dynamodb.conditions import Key
10from botocore.errorfactory import ClientError
11
12session = aioboto3.Session()
13
14
15def handler(event, context):
16 """
17 :param event: SQS message
18 :param context: Lambda Context
19 :return: AsyncIO loop
20 """
21 print(event)
22 parser_func = parse_sns_record if "Sns" in event["Records"][0] else parse_sqs_record
23 pusher = Pusher(
24 event["Records"],
25 parser_func
26 )
27 loop = asyncio.get_event_loop()
28 return loop.run_until_complete(pusher.notify_all_records())
29
30
31class Pusher:
32 """
33 Asynchronous Batch Notifications Pusher
34 """
35
36 def __init__(self, records: List, parse_record_func):
37 """
38 :param records: SQS Records (Notifications Tasks)
39 """
40 self.endpoint_url = os.environ["APIGW_ENDPOINT"]
41 self.connections_table_name = os.environ["CONNECTIONS_TABLE_NAME"]
42 self.records = records
43 self.start_time = time.time()
44 self.stale_connections = []
45 self.total_notified_connections = 0
46 self.deleted_stale_connections = 0
47 self.parse_record_func = parse_record_func
48
49 @staticmethod
50 async def retrieve_all_users_connections(table, exclude_users_ids: List[str]):
51 """
52 Coroutine to retrieve single user connections
53 :param table: connections table
54 :param exclude_users_ids: list users to exclude
55 :return: List of connections
56 """
57 params = get_exclusion_filter(exclude_users_ids)
58 result = await table.scan(**params)
59 connections = result.get("Items")
60 while result.get("LastEvaluatedKey"):
61 result = await table.scan(ExclusiveStartKey=result["LastEvaluatedKey"], **params)
62 connections.extend(result["Items"])
63 return connections
64
65 @staticmethod
66 async def retrieve_user_connections(table, user_id: str):
67 """
68 Coroutine to retrieve single user connections
69 :param table: connections table
70 :param user_id: the user id (Hash Key)
71 :return: List of connections
72 """
73 result = await table.query(
74 KeyConditionExpression=Key("user_id").eq(user_id)
75 )
76 return result.get("Items", [])
77
78 async def notify_connection(self, apigw, user_id: str, connection_id: str, data: str):
79 """
80 Coroutine to notify single user connection
81 :param apigw: APIGatewayManagementAPI client
82 :param user_id: the user id (Hash Key)
83 :param connection_id: API Gateway connection id
84 :param data: binary data
85 """
86 try:
87 await apigw.post_to_connection(
88 Data=data,
89 ConnectionId=connection_id
90 )
91 self.total_notified_connections += 1
92 except ClientError as error:
93 if error.response['Error']['Code'] == 'GoneException':
94 self.stale_connections.append(
95 {'user_id': user_id, 'connection_id': connection_id}
96 )
97
98 else:
99 print(error)
100
101 async def notify_user(self, table, apigw, user_id: str, data):
102 """
103 Coroutine to notify all connections of a single user
104 :param table: connections table
105 :param apigw: APIGatewayManagementAPI client
106 :param user_id: user_id
107 :param data: binary data
108 :return: binary data to send to single user
109 """
110 connections = await self.retrieve_user_connections(table, user_id)
111 # If user has active connections (Online), then notify.
112 if len(connections):
113 notifications = [
114 self.notify_connection(
115 apigw, user_id, connection["connection_id"], data
116 )
117 for connection in connections
118 ]
119
120 await asyncio.wait(notifications)
121
122 async def notify_selected_users(self, table, apigw, users_ids: Set[str], data):
123 """
124 Coroutine to notify all connections of selected users
125 :param table: connections table
126 :param apigw: APIGatewayManagementAPI client
127 :param users_ids: List of users' ids
128 :param data: binary data to send to all users
129 """
130 notifications = [
131 self.notify_user(table, apigw, user_id, data) for user_id in users_ids
132 ]
133
134 await asyncio.wait(notifications)
135
136 async def notify_all_users(self, table, apigw, exclude_users_ids: List[str], data):
137 """
138 Coroutine to notify all connections of all users
139 :param table: connections table
140 :param apigw: APIGatewayManagementAPI client
141 :param exclude_users_ids: APIGatewayManagementAPI client
142 :param data: binary data to send to all users
143 """
144 connections = await self.retrieve_all_users_connections(table, exclude_users_ids)
145 if len(connections):
146 notifications = [
147 self.notify_connection(
148 apigw, connection["user_id"], connection["connection_id"], data
149 )
150 for connection in connections
151 ]
152
153 await asyncio.wait(notifications)
154
155 async def delete_all_stale_connections(self, table):
156 """
157 Coroutine to delete all stale connections
158 :param table: connections table
159 """
160 async with table.batch_writer() as batch:
161 for stale_connection in self.stale_connections:
162 await batch.delete_item(Key=stale_connection)
163
164 async def notify_all_records(self):
165 """
166 Coroutine to notify all connections of all or selected users in all SQS batch records
167 """
168 async with session.resource("dynamodb") as ddb:
169 table = await ddb.Table(self.connections_table_name)
170 async with session.client("apigatewaymanagementapi", endpoint_url=self.endpoint_url) as apigw:
171 notifications = []
172 for record in self.records:
173 users, exclude_users, data = self.parse_record_func(record)
174 if users:
175 notifications.append(self.notify_selected_users(table, apigw, users, data))
176 else:
177 notifications.append(self.notify_all_users(table, apigw, exclude_users, data))
178 await asyncio.wait(notifications)
179 await self.delete_all_stale_connections(table)
180 await self.print_stats()
181
182 async def print_stats(self):
183 elapsed = (time.time() - self.start_time)
184 total_elapsed_human = str(timedelta(seconds=elapsed))
185 print(f"[STATS] Processed {len(self.records)} SQS records")
186 print(f"[STATS] Notified {self.total_notified_connections} connections")
187 print(f"[STATS] Finished in {total_elapsed_human}")
188 print(f"[STATS] Deleted {len(self.stale_connections)} stale connections")
189
190
191####################
192# Helpers Functions
193####################
194def get_unique_users(users: List[str]):
195 return set(users or [])
196
197
198def parse_sqs_record(record):
199 body = json.loads(record["body"])
200 users = get_unique_users(body.get("users", []))
201 exclude_users = get_unique_users(body.get("exclude_users", []))
202 data = json.dumps(body["data"])
203 return users, exclude_users, data
204
205
206def parse_sns_record(record):
207 message = json.loads(record["Sns"]["Message"])
208 users = get_unique_users(message.get("users", []))
209 exclude_users = get_unique_users(message.get("exclude_users", []))
210 data = json.dumps(message["data"])
211 return users, exclude_users, data
212
213
214def get_exclusion_filter(exclude_users_ids):
215 if exclude_users_ids:
216 excluded_users = ', '.join([f":id{idx}" for idx, _ in enumerate(exclude_users_ids)])
217 return dict(
218 ExpressionAttributeNames={
219 "#user_id": "user_id"
220 },
221 FilterExpression=f"NOT(#user_id in ({excluded_users}))",
222 ExpressionAttributeValues={f":id{idx}": user_id for idx, user_id in enumerate(exclude_users_ids)}
223 )
224 else:
225 return {}

Let’s deep dive into what we have built and what it can offer:

  • Multiple sources: capable of receiving notification requests from SQS and SNS, which make it suitable for processing single notification request from SNS and high number of notification requests in batches when polling from SQS

  • Multicast Notifications: the pusher can send messages to only a subset of users (eg: users in a chat room).

  • Broadcast Notifications: sometimes you just want to send the same message for all connected users (eg: broad announcements)

  • Exclusion Notifications: the pusher can broadcast messages to all users except a list of excluded users (eg: online/offline presence events can be sent to all except the originator)

  • Stale Connections Pruning: capable of detecting and deleting stale connections from DynamoDB connections store in case API Gateway missed cleaning them.

  • Asynchronous Processing: the pusher is using AsyncIO to notify multiple users/connections concurrently to not wait for inflight requests to DynamoDB/API Gateway so you don’t pay AWS the waiting time 😎

  • Batch Processing: when using SQS as event source the Pusher will be able to process batches of notification requests, also concurrently.

  • Duplicate Users Detection: able to detect duplicate users in a notification requests and make them unique set of users. to avoid double notifications.

Now that we have the pusher Lambda function, we have to configure it to receive messages from SQS/SNS. for SQS we will need an event source mapping. we’ve set the batch size to 10, and we’ve set the maximum_batching_window_in_seconds to 0 to not wait the batch size to be exactly 10 to start consuming:

1# components/pusher/sqs_event_source.tf
2resource "aws_lambda_event_source_mapping" "_" {
3 enabled = true
4 batch_size = 10
5 event_source_arn = var.notifications_queue_arn
6 function_name = module.pusher.lambda["alias_arn"]
7 maximum_batching_window_in_seconds = 0 # Do not wait until batch size is fulfilled
8}

For SNS, we will create a subscription and give SNS permission to invoke the Lambda Function:

1# components/pusher/sns_subscription.tf
2resource "aws_sns_topic_subscription" "_" {
3 topic_arn = var.notifications_topic_arn
4 protocol = "lambda"
5 endpoint = module.pusher.lambda["alias_arn"]
6}
7
8resource "aws_lambda_permission" "_" {
9 statement_id = "AllowExecutionFromSNS"
10 action = "lambda:InvokeFunction"
11 function_name = module.pusher.lambda["arn"]
12 qualifier = module.pusher.lambda["alias"]
13 principal = "sns.amazonaws.com"
14 source_arn = var.notifications_topic_arn
15}

In addition to the Lambda SQS event source, we need to give the Lambda access permissions to DynamoDB, API Gateway and SQS:

1# components/pusher/iam.tf
2data "aws_iam_policy_document" "custom_policy_doc" {
3 statement {
4 actions = [
5 "dynamodb:Query",
6 "dynamodb:Scan",
7 "dynamodb:DeleteItem",
8 "dynamodb:BatchWriteItem"
9 ]
10
11 resources = [
12 var.connections_table.arn,
13 ]
14 }
15
16 statement {
17 actions = [
18 "execute-api:ManageConnections",
19 ]
20
21 resources = [
22 "${var.agma_arn}/*"
23 ]
24 }
25
26 statement {
27 actions = [
28 "sqs:ChangeMessageVisibility",
29 "sqs:ChangeMessageVisibilityBatch",
30 "sqs:DeleteMessage",
31 "sqs:DeleteMessageBatch",
32 "sqs:GetQueueAttributes",
33 "sqs:ReceiveMessage"
34 ]
35
36 resources = [
37 var.notifications_queue_arn
38 ]
39 }
40}

That’s it, we have built a fast asynchronouse AWS API Gateway websocket notifications pusher.

Expose it!

Source: AWS API Gateway APIs Exposer

Expose

After deploying SUMU, now we need to expose it to the outer world with a beautiful domain name instead of the ugly one generated by AWS. for that we will use the module from Part 3.

We need these prerequisites before exposing our API:

  • AWS route53 or cloudflare zone.
  • AWS ACM Certificate for the subdomain that we will use with our API.
  • An A record for the APEX domain (the custom domain creation will fail otherwise).

If you already have these requirements let’s create our custom API Gateway domain which will replace the default invoke URL provided by API gateway:

1module "gato" {
2 source = "git::https://github.com/obytes/terraform-aws-gato//modules/core-route53"
3 prefix = local.prefix
4 common_tags = local.common_tags
5
6 # DNS
7 r53_zone_id = aws_route53_zone.prerequisite.zone_id
8 cert_arn = aws_acm_certificate.prerequisite.arn
9 domain_name = "kodhive.com"
10 sub_domains = {
11 stateless = "api"
12 statefull = "live"
13 }
14
15 # Rest APIS
16 http_apis = []
17
18 ws_apis = [
19 {
20 id = module.sumu.ws_api_id
21 key = "push"
22 stage = module.sumu.ws_api_stage_name
23 }
24 ]
25}

We have created an API Gateway Mapping to map the deployed API Stage mvp with the custom domain name live.kodhive.com, and we have chosen push as the mapping key for SUMU. so, the exposed URL will be: https://live.kodhive.com/push

Demo Application

Source: AWS Sumu Demo

Demo

In this section, we are going to build a demo application implementing the following features:

  • Sign in using Google SSO with Firebase.
  • Connects to SUMU websocket API Gateway.
  • Disconnect from SUMU websocket API Gateway.
  • Periodic KeepAlive (Ping/Pong) to keep connections active.
  • Publishing messages to the messages SNS Topic through API Gateway.
  • Sending messages to the messages SQS Queue through API Gateway.
  • Retrieve the list of connected users.
  • Receives users ONLINE/OFFLINE events and update the connected users list accordingly.
  • Implement multiple publish and send push modes:
    • UNICAST: sends an echo message that will be received by just the sender.
    • MULTICAST: sends a message to multiple selected users except the sender.
    • BROADCAST: sends a message to all connected users except the sender.

The live demo application is located at https://sumu.kodhive.com/.

Server

The server is a simple Lambda Function that can process SNS and SQS messages coming from the clients and send notifications back to clients, it can handle presence event, publish/send events and return users connections to requesters.

1#
2import json
3import os
4from typing import Callable, Any
5
6import boto3
7
8
9class Channels:
10
11 def __init__(self, records):
12 self.sns = boto3.client("sns")
13 self.sqs = boto3.client("sqs")
14 self.records = records
15
16 def publish_message(self, message):
17 self.sns.publish(
18 TargetArn=os.environ.get("NOTIFICATIONS_TOPIC_ARN"),
19 Message=json.dumps(message),
20 )
21
22 def send_message(self, message):
23 self.sqs.send_message(
24 QueueUrl=os.environ.get("NOTIFICATIONS_QUEUE_URL"),
25 MessageBody=json.dumps(message),
26 )
27
28 def parse_message(self):
29 if "Sns" in self.records[0]:
30 value_key = "Value"
31 payload = self.records[0]["Sns"]
32 sxs_message = json.loads(payload["Message"])
33 attributes = payload["MessageAttributes"]
34 push = self.publish_message
35 else:
36 value_key = "stringValue"
37 payload = self.records[0]
38 sxs_message = json.loads(payload["body"])
39 attributes = payload["messageAttributes"]
40 push = self.send_message
41
42 attrs = {key: item[value_key] for key, item in attributes.items()}
43 return sxs_message, attrs, push
44
45
46def handler(event, context):
47 """
48 Demo processor that does not have any real utility
49 Just used to illustrate how backend applications can interact with SUMU
50 :param event: events coming from SUMU integration (SNS or SQS)
51 :param context: Lambda Context
52 """
53 print(event)
54 channels = Channels(event["Records"])
55 handle_event(*channels.parse_message())
56
57
58def handle_presence(message: Any, originator_id):
59 """
60 Broadcast the presence event to all users except originator
61 :param message: message
62 :param originator_id: originator user
63 """
64 return {
65 "exclude_users": [originator_id],
66 "data": {
67 "type": "presence",
68 "message": message,
69 }
70 }
71
72
73def handle_message(message: Any, message_timestamp: int, sender_id: str):
74 """
75 Send a chat message from originator client to receivers clients
76 When push mode is:
77 1 - UNICAST: echo message to its sender.
78 2 - MULTICAST: send message to multiple clients except originators
79 3 - BROADCAST: send message to all clients except originators
80 :param message: message to push
81 :param message_timestamp: messages timestamp
82 :param sender_id: sender id
83 :return:
84 """
85 # Decide push mode
86 push_mode = message["push_mode"]
87 users = None
88 exclude_users = None
89 if push_mode == "UNICAST":
90 users = [sender_id]
91 elif push_mode == "MULTICAST":
92 try:
93 users = message["users"]
94 except KeyError:
95 print("[WARNING] Push mode is multicast. however, no users provided. Skip!")
96 return
97 elif push_mode == "BROADCAST":
98 exclude_users = [sender_id, ]
99 return {
100 "users": users,
101 "exclude_users": exclude_users,
102 "data": {
103 "type": "message",
104 "message": {
105 "text": message["text"],
106 "user_id": sender_id,
107 "timestamp": message_timestamp
108 }
109 }
110 }
111
112
113def handle_connections_request(requester_id):
114 """
115 Get all distinct connected users except requester user
116 :param requester_id: requester
117 :return: message
118 """
119 connections = get_connections([requester_id])
120 connected_users = list(set([connection["user_id"] for connection in connections]))
121 return {
122 "users": [requester_id],
123 "data": {
124 "type": "connected_users",
125 "message": {
126 "users": connected_users,
127 }
128 }
129 }
130
131
132def handle_event(sxs_message: Any, attributes: dict, push: Callable):
133 """
134 Presence/Routes events, could be:
135 1 - coming from APIGW->DDB->Watchdog-SNS (Subscription)
136 2 - coming from APIGW->DDB->Watchdog-SQS (Polling)
137 3 - directly from APIGW->DDB (Stream)
138 Broadcast the event to all users except originator
139 1 - SNS->PUSHER-APIGW
140 2 - SQS->PUSHER-APIGW
141 :param sxs_message:
142 :param attributes:
143 :param push: channel push method
144 """
145 caller_id = attributes["user_id"]
146 event_source = attributes["source"]
147 message = sxs_message["message"]
148 m_type = sxs_message["type"]
149 m = None
150
151 if event_source == "lambda.presence.watchdog" and m_type == "presence":
152 # Handle events coming from presence watchdog
153 m = handle_presence(message, caller_id)
154 elif event_source.startswith("apigw.route."):
155 # Handle events coming from "apigw.route.publish" and "apigw.route.send"
156 if m_type == "message":
157 m = handle_message(message, int(attributes["timestamp"]), caller_id)
158 elif m_type == "get_connected_users":
159 m = handle_connections_request(caller_id)
160 if m:
161 push(m)
162
163
164def get_connections(exclude_users_ids):
165 excluded_users = ', '.join([f":id{idx}" for idx, _ in enumerate(exclude_users_ids)])
166 params = dict(
167 ExpressionAttributeNames={
168 "#user_id": "user_id"
169 },
170 FilterExpression=f"NOT(#user_id in ({excluded_users}))",
171 ExpressionAttributeValues={f":id{idx}": user_id for idx, user_id in enumerate(exclude_users_ids)}
172 )
173 resource = boto3.resource("dynamodb")
174 connections_table = resource.Table(os.environ["CONNECTIONS_TABLE"])
175 result = connections_table.scan(**params)
176 connections = result.get("Items")
177 while result.get("LastEvaluatedKey"):
178 result = connections_table.scan(ExclusiveStartKey=result["LastEvaluatedKey"], **params)
179 connections.extend(result["Items"])
180 return connections

Client

The client is a React Application that integrates with Firebase for authentication and with SUMU Websocket API:

1// app/client/src/index.tsx
2import React from 'react';
3import ReactDOM from 'react-dom';
4import App from './App';
5import {AuthProvider} from "./context/AuthProvider";
6import {SumuProvider} from "./context/SumuProvider";
7
8ReactDOM.render(
9 <React.StrictMode>
10 <AuthProvider>
11 <SumuProvider>
12 <App />
13 </SumuProvider>
14 </AuthProvider>
15 </React.StrictMode>,
16 document.getElementById('root')
17);

The auth provider will be responsible for authenticating users using firebase, it will return the auth component if the user is still not connected. otherwise, it will return the child component which is SumuProvider:

1// app/client/src/context/AuthProvider.tsx
2import React, {useState, useEffect} from 'react';
3import {initializeApp} from 'firebase/app';
4import {getAuth, signInWithPopup, GoogleAuthProvider, signOut, onAuthStateChanged} from "firebase/auth";
5import {Spin, message} from 'antd';
6import Auth from '../containers/Auth'
7
8initializeApp({
9 "apiKey": process.env.REACT_APP_FIREBASE_API_KEY,
10 "authDomain": process.env.REACT_APP_FIREBASE_AUTH_DOMAIN,
11 "projectId": process.env.REACT_APP_FIREBASE_PROJECT_ID,
12 "measurementId": process.env.REACT_APP_FIREBASE_MEASUREMENT_ID
13});
14
15
16interface AuthContextData {
17 user: any;
18 loading: boolean;
19
20 login(): void;
21
22 logout(): void;
23}
24
25const initial = {
26 user: null,
27};
28
29const AuthContext = React.createContext<AuthContextData>(initial as AuthContextData);
30const auth = getAuth()
31
32function AuthProvider({children}: any) {
33 const [loading, setLoading] = useState<boolean>(true);
34 const [bootstrapping, setBootstrapping] = useState<boolean>(true);
35 const [user, setUser] = useState<any>(null);
36
37 function login() {
38 setLoading(true);
39 const provider = new GoogleAuthProvider();
40 signInWithPopup(auth, provider).then(function (result) {
41 setUser(result.user);
42 setLoading(false)
43 }).catch(function (error: any) {
44 console.log(error.message);
45 message.error("Unable to sign in");
46 setLoading(false)
47 });
48 }
49
50 function logout() {
51 signOut(auth).then(function () {
52 // Sign-out successful.
53 }).catch(function (error: any) {
54 console.log(error)
55 });
56 }
57
58 /** ======================
59 * Hooks
60 ---------------------- */
61 useEffect(() => {
62 setBootstrapping(true);
63 setLoading(true);
64 onAuthStateChanged(auth, (user: any) => {
65 setUser(user);
66 setBootstrapping(false);
67 setLoading(false)
68 });
69 }, []);
70
71 return (
72 <AuthContext.Provider value={
73 {
74 user: user,
75 loading: loading,
76 login: login,
77 logout: logout
78 }
79 }>
80 {
81 user ? children : bootstrapping ?
82 <Spin spinning={loading} size="large" style={{
83 width: "100%",
84 height: "100vh",
85 lineHeight: "100vh"
86 }}/> :
87 <Auth/>
88 }
89 </AuthContext.Provider>
90 )
91}
92
93const useAuth = () => React.useContext(AuthContext);
94export {AuthProvider, useAuth}

SumuProvider is a wrapper around SUMU functions, it will always return the main application. and provide it with the connect, disconnect, ping, send and publish functions. In addition to that it will provide it with connecting, connected and connectedUsers states:

1// app/client/src/context/SumuProvider.tsx
2import React, {useState, useEffect} from 'react'
3import {message, notification} from 'antd';
4import Sockette from "sockette";
5
6import {useAuth} from "./AuthProvider";
7
8interface SumuContextData {
9 connecting: boolean;
10 connected: boolean;
11 connectedUsers: any;
12
13 connect(): void;
14
15 disconnect(raise?: boolean): void;
16
17 ping(): void;
18
19 send(type: string, message: {}): void;
20
21 publish(type: string, message: {}): void;
22}
23
24const initial = {
25 connecting: false,
26 connected: false
27};
28
29const SumuContext = React.createContext<SumuContextData>(initial as SumuContextData);
30let keepAliveInterval: any = null;
31
32function SumuProvider({children}: any) {
33
34 const [connected, setConnected] = useState<boolean>(false);
35 const [connecting, setConnecting] = useState<boolean>(false);
36 const [ws, setWS] = useState<Sockette | null>(null);
37 const {user} = useAuth();
38 const [connectedUsers, setConnectedUsers] = useState(new Set());
39
40 function connect(): void {
41 if (connected || connecting) {
42 message.error("Already connected!")
43 } else {
44 // Initiate connection through react hook
45 setConnecting(true);
46 }
47 }
48
49 function disconnect(raise = true): void {
50 if (ws && connected && !connecting) {
51 clearInterval(keepAliveInterval);
52 console.log("Closing connections");
53 ws.close()
54 } else {
55 if (raise) {
56 message.error("Already disconnected!")
57 }
58 }
59 }
60
61 function send(type: string, msg: {}): void {
62 if (ws && connected) {
63 console.log("Send message");
64 ws.json({
65 action: 'send',
66 message: {
67 type: type,
68 message: msg
69 }
70 });
71 message.success("Message sent!");
72 } else message.error("Not yet connected!")
73 }
74
75 function publish(type: string, msg: {}): void {
76 if (ws && connected) {
77 console.log("Publish message");
78 ws.json({
79 action: 'publish',
80 message: {
81 type: type,
82 message: msg
83 }
84 });
85 message.success("Message published!");
86 } else message.error("Not yet connected!")
87 }
88
89
90 function ping() {
91 if (ws && connected) {
92 console.log("Send ping")
93 ws.json({action: 'ping'});
94 } else message.error("Not yet connected!")
95 }
96
97 function keepAlive() {
98 if (ws && connected) {
99 console.log("Keep alive")
100 let interval = 3 * 60 * 1000 // Every 3 minutes
101 clearInterval(keepAliveInterval)
102 keepAliveInterval = setInterval(ping, interval)
103 } else message.error("Not yet connected!")
104 }
105
106 /** ======================
107 * Hooks
108 ---------------------- */
109 useEffect(() => {
110 if (connected && !connecting) {
111 keepAlive();
112 publish("get_connected_users", {usage: "contact"});
113 }
114 // eslint-disable-next-line react-hooks/exhaustive-deps
115 }, [connected, connecting]);
116
117
118 useEffect(() => {
119 return () => {
120 if (ws) {
121 console.log("Tear down")
122 clearInterval(keepAliveInterval);
123 ws.close();
124 }
125 };
126 }, [ws]);
127
128 useEffect(() => {
129 if (connecting) {
130 user.getIdToken().then((accessToken: string) => {
131 let endpoint = `${process.env.REACT_APP_WEBSOCKET_URL}?authorization=${accessToken}`;
132 let sumuWebsocket = new Sockette(
133 endpoint,
134 {
135 timeout: 5e3,
136 maxAttempts: 5,
137 onopen: e => {
138 notification.success({
139 message: "Connected",
140 placement: 'bottomLeft'
141 });
142 setConnected(true)
143 setConnecting(false)
144 },
145 onmessage: messageHandler,
146 onreconnect: e => {
147 notification.warning({
148 message: "Reconnecting...",
149 placement: "bottomLeft"
150 });
151 },
152 onmaximum: e => {
153 notification.error({
154 message: "Could not connect to server, stop attempting!",
155 placement: "bottomLeft"
156 });
157 setConnected(false)
158 },
159 onclose: e => {
160 console.log("Closed!", e);
161 notification.error({
162 message: "Disconnected!",
163 placement: 'bottomLeft'
164 });
165 setConnected(false)
166 },
167 onerror: e => {
168 console.log("Error:", e);
169 setConnected(false)
170 },
171 }
172 );
173 setWS(sumuWebsocket)
174 });
175 }
176 }, [connecting])
177
178 const messageHandler = (e: any) => {
179 let payload = JSON.parse(e.data);
180 let m = payload.message;
181 switch (payload.type) {
182 case "connected_users":
183 setConnectedUsers(new Set(m["users"]))
184 break;
185 case "message":
186 notification.warning({
187 message: "New message",
188 description: m.text,
189 placement: "topRight"
190 });
191 break;
192 case "presence":
193 let presence = `${m.user_id} is ${m.status}`;
194 if (m.status === "OFFLINE") {
195 notification.error({
196 message: "Presence",
197 description: presence,
198 placement: "topRight"
199 });
200 const newConnected = new Set(connectedUsers);
201 newConnected.delete(m.user_id)
202 setConnectedUsers(newConnected);
203 } else if (m.status === "ONLINE") {
204 notification.success({
205 message: "Presence",
206 description: presence,
207 placement: "topRight"
208 });
209 const newConnected = new Set(connectedUsers).add(m.user_id)
210 console.log(newConnected)
211 setConnectedUsers(newConnected);
212 }
213
214
215 break;
216 case "pong":
217 notification.warning({
218 message: "Keep Alive",
219 description: "Received Pong from API Gateway",
220 placement: "bottomLeft"
221 });
222 break;
223 default:
224 break;
225 }
226 }
227
228 return (
229 <SumuContext.Provider value={
230 {
231 connecting: connecting,
232 connected: connected,
233 connectedUsers: connectedUsers,
234 connect: connect,
235 disconnect: disconnect,
236 ping: ping,
237 send: send,
238 publish: publish
239 }
240 }>
241 {
242 children
243 }
244 </SumuContext.Provider>
245 )
246}
247
248const useSumu = () => React.useContext(SumuContext);
249export {SumuProvider, useSumu}

Finally, we have the main application that uses all the elements provided by SumuProvider and AuthProvider:

1import React, {useEffect, useState} from 'react';
2import {Col, Row, Layout, Button, Card, Radio, Input, Typography, Divider, message, Select} from "antd";
3import {CheckCircleOutlined, CloseCircleOutlined, SyncOutlined} from "@ant-design/icons";
4import './App.less';
5
6import {useAuth} from "./context/AuthProvider";
7import {useSumu} from "./context/SumuProvider";
8
9import Code from "./containers/Code";
10
11const {Content, Header} = Layout;
12const {TextArea} = Input;
13const {Paragraph, Text} = Typography;
14
15const App = (props: any) => {
16
17 const {logout} = useAuth();
18 const {connect, disconnect, publish, send, ping, connected, connecting, connectedUsers} = useSumu()
19 const [pushMode, setPushMode] = useState<string>("UNICAST");
20 const [msg, setMsg] = useState<string>("");
21 const [users, setUsers] = useState<string[] | undefined>(undefined);
22
23 useEffect(() => {
24 connect()
25 // eslint-disable-next-line react-hooks/exhaustive-deps
26 }, []);
27
28 function doPublish() {
29 if (pushMode === "MULTICAST" && !(users && users.length !== 0)){
30 message.error("Push mode is multicast, please select users!")
31 return
32 }
33
34 if (msg) {
35 publish("message", {
36 text: msg,
37 push_mode: pushMode,
38 users: users
39 })
40 setMsg("")
41 } else
42 message.error("Please enter the message!")
43 }
44
45 function doSend() {
46 if (pushMode === "MULTICAST" && !(users && users.length !== 0)){
47 message.error("Push mode is multicast, please select users!")
48 return
49 }
50 if (msg) {
51 send("message", {
52 text: msg,
53 push_mode: pushMode,
54 users: users
55 })
56 setMsg("")
57 } else
58 message.error("Please enter the message!")
59 }
60
61 function doPing() {
62 ping()
63 }
64
65 function doConnect() {
66 connect()
67 }
68
69 function doDisconnect() {
70 disconnect()
71 }
72
73 function stateColor() {
74 if (connected && !connecting) return "#00BFA6";
75 else if (!connected && !connecting) return "#F50057";
76 else if (connecting) return "#00B0FF";
77 }
78
79 function connectedUsersOptions() {
80 const options:any = []
81 connectedUsers.forEach((user:any) => {
82 options.push({
83 label: user,
84 value: user
85 })
86 });
87 return options;
88 }
89
90
91 return (<>
92 <Layout>
93 <Header
94 style={{
95 backgroundColor: "#141414",
96 borderBottom: `1px solid ${stateColor()}`
97 }}
98 >
99 <Row align={"middle"} justify={"space-between"} style={{width: "100%", height: "100%"}}>
100 {connected && !connecting && <CheckCircleOutlined style={{fontSize: 25, color: stateColor()}}/>}
101 {!connected && !connecting && <CloseCircleOutlined style={{fontSize: 25, color: stateColor()}}/>}
102 {connecting && <SyncOutlined spin style={{fontSize: 25, color: stateColor()}}/>}
103
104 {connectedUsers.size>0 && <Text style={{color: "#00BFA6"}}>{`${connectedUsers.size}`} Users Connected</Text>}
105 {connectedUsers.size===0 && <Text style={{color: "#F50057"}}>No User Connected</Text>}
106
107 <Button
108 onClick={e => logout()}
109 type="primary"
110 ghost
111 >
112 Logout
113 </Button>
114 </Row>
115 </Header>
116
117 <Content
118 style={{
119 padding: '0 50px',
120 paddingTop: 10,
121 backgroundColor: "#282c34"
122 }}
123 >
124 <Row>
125 <Row style={{width: "100%" , marginBottom: 16}} align={"middle"} gutter={16}>
126
127
128 <Col span={8} style={{display: 'flex'}}>
129 <Code
130 title={"Publish message"} code={"publish"}
131 onClick={doPublish}
132 disabled={!connected}
133 />
134 </Col>
135
136 <Col span={8} style={{display: 'flex'}}>
137 {/*<Row style={{width: "100%"}}>*/}
138 {/* <Image src={"/arch.svg"}/>*/}
139 {/*</Row>*/}
140
141 <Card title={"Message Params"} style={{width: "100%"}}>
142 <Row style={{width: "100%"}} justify={"center"}>
143 <Radio.Group
144 defaultValue="UNICAST"
145 onChange={e => setPushMode(e.target.value)}
146 style={{marginTop: 16, marginBottom: 16}}
147 >
148 <Radio.Button value="UNICAST">UNICAST</Radio.Button>
149 <Radio.Button value="MULTICAST">MULTICAST</Radio.Button>
150 <Radio.Button value="BROADCAST">BROADCAST</Radio.Button>
151 </Radio.Group>
152 </Row>
153 <Row>
154 {pushMode === "UNICAST" &&
155 <Paragraph type={"secondary"}>The message will be sent to <Text strong>just
156 you!</Text></Paragraph>}
157 {pushMode === "MULTICAST" &&
158 <Paragraph type={"secondary"}>The message will be sent to <Text strong>selected
159 users!</Text></Paragraph>}
160 {pushMode === "BROADCAST" &&
161 <Paragraph type={"secondary"}>The message will be sent to <Text strong>all
162 users except you!</Text></Paragraph>}
163 <TextArea rows={2}
164 maxLength={150}
165 allowClear
166 showCount
167 autoSize={{minRows: 3, maxRows: 3}}
168 placeholder={"Your message"}
169 onChange={e => setMsg(e.target.value)}
170 style={{width: "100%"}}
171 value={msg}
172 />
173 </Row>
174 {pushMode === "MULTICAST" &&
175 <>
176 <Divider/>
177 <Select
178 mode="multiple"
179 maxTagCount="responsive"
180 placeholder="Select users..."
181 options={connectedUsersOptions()}
182 value={users}
183 style={{width: "100%"}}
184 onChange={(selectedUsers: string[]) => {setUsers(selectedUsers);}}
185 />
186 </>
187 }
188 </Card>
189
190 </Col>
191
192
193 <Col span={8} style={{display: 'flex'}}>
194 <Code
195 title={"Send message"} code={"send"}
196 onClick={doSend}
197 disabled={!connected}
198 />
199 </Col>
200 </Row>
201
202 <Row style={{width: "100%"}} gutter={16}>
203 <Col span={8} style={{display: 'flex'}}>
204 <Code
205 title={"Connect"} code={"connect"}
206 onClick={doConnect}
207 loading={connecting}
208 disabled={connected}
209 />
210 </Col>
211
212 <Col span={8} style={{display: 'flex'}}>
213 <Code
214 title={"Disconnect"} code={"disconnect"}
215 onClick={doDisconnect}
216 disabled={!connected}
217 />
218 </Col>
219
220 <Col span={8} style={{display: 'flex'}}>
221 <Code
222 title={"Ping"} code={"ping"}
223 onClick={doPing}
224 disabled={!connected}
225 />
226 </Col>
227 </Row>
228 </Row>
229 </Content>
230 </Layout>
231 </>)
232
233}
234
235export default App;

What’s next?

Share

Throughout this article we’ve seen how we can leverage AWS serverless technologies to build a reusable websocket stack.

We’ve also built a realtime and reactive demo web application that leverages SUMU and implement all the features provided by SUMU.

We didn’t tell you how we’ve deployed the demo client web application, so this will be the topic for our the next article (It’s not deployed on Netlify 😉).

Share if you like the article and stay tuned for the next article!

More articles from Obytes

GO Serverless! Part 3 - Deploy HTTP API to AWS Lambda and Expose it via API Gateway

Deploy Low Budget Serverless HTTP API to AWS Lambda and Expose it via AWS API Gateway

November 2nd, 2021 · 10 min read

Getting started with GraphQL in Python with FastAPI and Ariadne

Generate a FullStack playground using FastAPI, GraphQL and Ariadne

October 5th, 2021 · 5 min read

ABOUT US

Our mission and ambition is to challenge the status quo, by doing things differently we nurture our love for craft and technology allowing us to create the unexpected.