Note a verson of this article was published on the Equinox tech blog in Aug 2019.
Every day across the 100 (and counting!) Equinox Fitness locations our members perform hundreds of thousands of user events, including facilities check-ins and participation in Personal Training, Pilates and Group Fitness classes. We love when they enjoy our facilities and services, getting one step closer to reaching their fitness goals.
To recognize their commitment to themselves and Equinox, we’ve launched a new Membership Recognition program. The program awards members for reaching certain milestones in coming to the gym, taking a 1-on-1 session with a personal trainer, and attending a Pilates class. Or as we call them: Checkins, PT, and Pilates.
Being similar to loyalty or rewards programs found at other companies, it is not a radical new business strategy. It does however, present a nice opportunity to show appreciation to our members, motivate their continued engagement, and implement some cool tech solutions in the process!
Here’s a diagram of the PT/Pilates sessions piece of the pipeline that powers this project:
If the icons above are familiar, you probably can tell that this project was built atop AWS services with Lambda, SQS, and DynamoDB in the starring roles. I’d be lying if I said there was one and only one possible design for such a project, even when limiting oneself to the AWS ecosystem. So amongst the potential options, what factors drove us to this solution?
First and foremost we wanted it to be fast. To amplify the reward’s impact, we believed it meaningful for the reward and complementary notification to be sent as soon as possible to the action that triggers it. This requirement led us to an event-based streaming architecture that processes events immediately. As we will see, the recently-ish released feature provided by AWS for Lambda functions to trigger off SQS queues, is an effective way to achieve this.
Abandoning a sense of humility for a second—we’re proud of building this project from scratch. A quick Google search will inform there are many SaaS reward program products out there. With the requisite capabilities, there’s no substitute to the cost-savings and flexibility that come from building something yourself.
With that said, we did outsource one piece of functionality—notifying members of their newly awarded reward via a personalized in-app notification. This feature leverages an integration with Braze, a product that supports, well, the sending of personalized in-app notifications (among other features). These notifications are delivered by campaigns configured in Braze and triggered via calls to a Messages API they expose.
So how do we know when member No. 1031 just completed their 100thPT Session and should be sent a gift card to use at one of our apparel shops? There are several critical steps to perform in real-time:
- Subscribe to a stream of PT & Pilates session events
- Calculate a member’s lifetime No. of sessions
- Make POST requests to Equinox’s internal Rewards Service and Braze API
Let’s dive into some of the details that make this pipeline tick, shall we?
Step #1 – Building a Sessions Publisher
Unfortunately, no publisher of PT & Pilates session events existed, so we were tasked with creating our own. The first step was to figure out the best place to source the data, of which there were two options.
Option #1: Go straight to the source SQL Server DB and look for updated records in transactional data (or subscribe to a Write-Ahead Logs stream). This would be the fastest option, but also require additional processing to appropriately enrich the raw data.
Option #2: Find updated records in the Sessions Datamart of the Redshift Data Warehouse. This Datamart takes the transactional data from Option #1, enriches and standardizes it, and stores it in a Redshift table for use in reporting and ad-hoc analysis.
Although Option #2 introduces additional latency as the data refreshes a few times per hour, the simplification provided from the friendlier data format led us to that route.
“Streaming” Data from Redshift
We’ve found our data source; how do we go about streaming events from it? Such a process doesn’t fit the bill of functionality Redshift was designed for, but that hardly means it cannot be done.
Here’s a diagram of our solution:
As shown in the diagram, the steps to get updated sessions records on a queue are:
- Save snapshot of relevant columns of Redshift table to S3 via UNLOAD query
- Add partition to Spectrum snapshot table, leveraging S3 key format: dt=YYYY-MM-DD-HH-MM
- Execute query (shown below) comparing consecutive snapshots to identify differences
- Group changed records by member, save in temporary S3 file, and place message on SQS queue pointing to S3 files.
A basic “except” query is used to find records changed between two snapshots.
SELECT member_id, session_id, session_date, session_desc, revenue_count FROM spectrum_table.session_utilization_snap WHERE dt = '2019-07-21 13:00:00' EXCEPT SELECT member_id, session_id, session_date, session_desc, revenue_count FROM spectrum_table.session_utilization_snap WHERE dt = '2019-07-21 12:00:00'
Beyond the basic functionality of finding updated rows in Redshift, we like this process for a few other reasons. First, it allows for identifying changes in the Redshift table on a specific subset of columns, not the whole table. Next, it allows for easy replaying of records from specific time frames via controlling the dt partitions used in the comparison query. Lastly, the S3 pointer technique makes it resilient to the 256 kilobyte SQS message size limit.
Step #2 – Lifetime PT/Pilates Session Calculations
Whenever an Equinox member finishes a PT session, we need to be able to determine how many sessions he or she took in the past and whether the new lifetime total crosses one of the defined milestones.
Our approach to answering this question was to store all PT & Pilates sessions ever taken into a DynamoDB table*. Serverless, easily scalable, and most importantly queried via a simple boto3 connection (pre-installed in the Python Lambda environment), Dynamo is an attractive datastore option.
*As a side note, while it may seem unnecessary to use Dynamo given the same data already exists in Redshift, we covered in a previous post the dangers of using Redshift for transactional workloads.
The Dynamo table storing this data is called session_utilization.
We use member_id as the Partition Key of table to allow querying for a member’s lifetime session total efficiently. And session_id as the Sort Key to prevent session duplicates from populating the table.
From the example data above, member 123 has 1.5 total lifetime PT sessions (summing the revenue_count column). The following code snippet shows how we can query the table to arrive at this answer:
import boto3 dynamo = boto3.resource("dynamodb") table = dynamo.Table("session_utilization") key_cond_exp = Key("member_id").eq("123") filter_exp = (Attr("session_desc").eq("PT")) response = table.query(KeyConditionExpression=key_cond_exp, FilterExpression=filter_exp) pt_records = response['Items'] lifetime_pt_count = sum([float(pt_rec['revenue_count']) for pt_rec in pt_records])
Updating the session_utilization datastore is the purpose of the first Lambda function in the pipeline, named milestone_session_util. Before the table can be used for lifetime calculations though, an important first step is to backfill it with all historical sessions. This collection constitutes tens-of-millions of records.
In the prequel post to this article, we covered loading 200 million(!) historical checkins into a Dynamo table in under an hour. Luckily the same recursive S3 pointer strategy used to accomplish the checkin backfill also works for the PT & Pilates session data. The details won’t be rehashed here, so if interested be sure to check out that post.
Bursty data sources and DynamoDB
A typical chart of events emitted from the PT/Pilates Pub looks like this:
Every hour a burst of a few hundred events is emitted from the Publisher over the span of several seconds. After those events are processed, everything sits idly until the next hour’s burst.
To prevent throttled requests, one provisioning strategy for a Dynamo table is to statically set the Read & Write Capacity Unit (RCU/WCU) settings at the level of throughput peaks, plus a small cushion for safety. That’s how it was done until two different features were released for DynamoDB allowing for capacity to elastically mirror traffic: auto scaling and on-demand (released in mid 2017 and late 2018 respectively).
Auto scaling offers the best cost efficiency potential but requires setting specific scaling parameters. On-demand meanwhile takes a black box “pay-for-what-you-use” approach, making it an appetizingly low-effort choice at the expense of some extra cost.
Naïve and ambitious—we tried configuring auto scaling to our infrequent, bursty traffic to less-than-stellar effect:
Here we see auto scaling attempting to adjust the provisioned capacity of the table (red line) to match the traffic (blue line), but failing to respond quickly enough. After a certain number of scaling attempts, the maximum number of scaling activities in a 24 hour period were reached, causing flatlining at 100 capacity units for the rest of the day.
On-Demand to the Rescue
Trying on-demand next, we had better luck handling the traffic bursts. Since on-demand will provision a table to a level that is double that of the previously recorded traffic peak (exactly how far back it looks is a small mystery), we were able to process records without fear of throttling.
In general, we recommend using on-demand as the capacity setting for all Dynamo tables to start. Until your traffic can be observed to fit one of the textbook auto-scaling cases, on-demand will likely save you both money and developer time.
Step #3 – Granting Rewards, Idempotent Execution, and
Once it’s been calculated that a member has reached a PT/Pilates milestone, the exciting time has come to send a reward! Tragically, the same Lambda-triggered computing strategy used thus far will no longer work.
If there’s one concept to grasp when using Lambda with SQS triggers, it’s this: any message may be processed multiple times due to SQS’s “at-least once” delivery guarantee. It is therefore imperative to consciously design your architecture to be unaffected by potential duplicate processing.
For example, when storing records of milestones reached by members, one of the keys of the milestone_activity Dynamo table is a md5 hash of three datapoints: member id, milestone id, and milestone timestamp (to monthly granularity). The effect of duplicate processing at this step in the pipeline is a harmless overwrite of an existing record in the table since both will have the same hash.
When making requests to the Rewards and Braze APIs however, we did not want to place the burden on those services to prevent members from receiving duplicate rewards.
Our solution to this problem was to keep a cache of recently processed milestones. The hashkey stored in the cache is simply the milestone_guid hash also used to prevent Dynamo table duplicates.
Consequently, the process followed by the rewards worker (a python script set to continuously poll a queue deployed as an ECS service) follows these steps:
- Perform an EXISTS operation for miletone_guid hashkey
- If key exists, then assume reward was already processed and cease processing
- If key does not exist, then store appropriate hashkey in cache and continue processing to send reward
With this logic in place, now duplicate delivery by the SQS queue is no longer a concern. In general, when no inherent property of a process can be exploited to make it idempotent, this caching strategy is a lightweight method to guarantee idempotency.
Given the possibility of an error occurring that grants a torrent of unearned rewards—costing Equinox money or creating an embarrassing PR situation —we also utilized Redis to implement a few circuit breaker-style fail-safes.
Specifically for every reward processed, we increment a hashkey in Redis that keeps track of how many total rewards were sent in the current day, and how many were sent to the specific member. If either of those values exceed a threshold, no rewards will be given and a CloudWatch alarm will be triggered.
While not an essential piece of functionality, it’s a nice way to keep the team sleeping a bit more soundly at night.
With the groundwork complete for the Member Recognition program, we’re excited by the new avenues of interaction it opens up with our members. Already, our design team has begun work on a new screen in the Equinox app that displays progress toward the next potential reward.
And that is just the tip of the iceberg for what can be done.