AWS infrastructure for a clickstream data analysis

Problem statement


Analysing the user's behaviour is very important for making right decisions in application development and maximising business value.

AWS provides many services useful for data streaming, transformation and analysis. In this blog post I’m describing a Proof Of Concept project where I tried to simulate clickstream activity, ingest it to AWS, store, transform, aggregate and analyse.


The proposed solution


The following high level diagram shows AWS infrastructure that was built.

Next I will go through every component and explain its role in the system.


Data ingestion


We expect that clickstream events incoming to our system will be small pieces of data that are generated continuously with high speed and volume. We might need to analyse it in near real time, so Amazon Kinesis fits perfectly for this case.


Amazon Kinesis Data Streams


Amazon Kinesis Data Streams is a scalable and durable real-time data streaming service that can continuously capture gigabytes of data per second from hundreds of thousands of sources.


We can send events directly to a kinesis data stream using Kinesis Client Library or AWS API/SDK , but we can also make it a bit easier for our customers by exposing Kinesis through AWS API Gateway and they will be able to send usual HTTP requests, moreover we can take advantage of requests validation or protection with AWS Web Application Firewall (WAF). An example of API provided by AWS.


Amazon API Gateway will automatically scale to handle the amount of traffic your API receives.


I have developed a simple python script that generates JSON payload with some user’s information and sends it to API Gateway endpoint. Each user has id, some randomly generated IP address, event_name as an action (Search, AddToCart, ViewContent, Purchase)

{
  "timestamp": "2021-07-12 12:01:58.732726",
  "user_id": "35",
  "pixel_id": "wjgao4w1oi",
  "click_id": "a5cf179b9c9d483abf6d424d44a293be",
  "insertion_timestamp": "2021-07-12 12:01:58.732754",
  "event_name": "Search",
  "user_ip": "111.33.64.227",
  "additional_data": {
    "time_on_data": 79,
    "percent_viewed": 39.7,
    "product_id": 557246,
    "price": 417.97
  }
}

Field “user_id” was also used as a partition key for Kinesis Data Stream to segregate and route records to different shards of a data stream.


A single shard can ingest up to 1 MB of data per second (including partition keys) or 1,000 records per second for writes. The maximum size of the data payload of a record before base64-encoding is up to 1 MB.

Kinesis data stream doesn’t scale automatically, but we can scale up the number of shards using CloudWatch + Lambda (UpdateShardCount API) to scale a stream to 5,000 shards, the stream will be able to ingest up to 5 GB per second or 5 million records per second.


Amazon Kinesis Data Analytics


Amazon Kinesis Data Analytics is the easiest way to transform and analyze streaming data in real time. We can use either standard SQL or Apache Flink libraries for complex event processing.


For example, let’s catch all “ViewContent” events where "percent_viewed">60. Later we can use it for some special offerings for a customer, who is possibly interested in particular “product_id”.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"COL_timestamp" TIMESTAMP, 
"event_name" VARCHAR(16), 
"percent_viewed" DECIMAL(1,1), 
"product_id" INTEGER,
"price" REAL
);

CREATE OR REPLACE PUMP "STREAM_PUMP"AS INSERT INTO "DESTINATION_SQL_STREAM"

SELECT STREAM "COL_timestamp", "event_name", "percent_viewed", "product_id", "price"
FROM "SOURCE_SQL_STREAM_001"
WHERE "event_name"='ViewContent'
AND "percent_viewed">60;

Output data can be sent to another Kinesis Data Stream, Kinesis Firehose Delivery stream or Lambda function for further processing.


Amazon Kinesis Data Firehose


Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics services. It can capture, transform, and deliver streaming data to Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, generic HTTP endpoints.

Moreover, Kinesis Data Firehose can transform source records with AWS Lambda, convert record format and backup raw data that will come in handy.

Raw data is being stored in JSON format in a dedicated bucket partitioned by year, month, day and hour.

Data is being enriched and converted to Apache Parquet format with Snappy compression by transformation lambda and stored in a separate bucket.

Converting and compression is very important for cost effectiveness of storage and speed of further queries. As we can see, converted files are several times smaller than raw.


AWS provides calculation and comparison of storing text files vs. Parquet.


Lambda functions and Redis


Lambda “Last access” is being triggered by Kinesis Data stream, reading batches of 10 events and updating data in the Elasticache Redis cluster with the latest access timestamp and user’s location.



Data update” lambda is regularly executed by schedule in CloudWatch events rule, gets a name of user from Mysql Aurora and sends it to Redis. This Lambda can also be invoked by Aurora, when a new user has signed up and information appears in the database.


Enrichment” lambda does several things. Data is being buffered by Kinesis firehose up to 3 MB or 60 seconds (900 seconds max), whatever occurs first.

1. The lambda reads the user's IP address from payload.

2. The lambda needs to get the country, region and city which IP belongs to. First of all it looks for this data in Redis. If it exists in Redis, a “country” field is added to raw JSON. If the Redis key does not exist, Lambda calls https://ipinfo.io/ API, receives the country, saves it in Redis and adds fields “country”, “region” and “city” to the output JSON.

3. The lambda needs to get the user's full name. Redis already has a name of the user, updated by the “Data update” function earlier. Full name is also added to output JSON.


An example of enriched events is provided below.

{
  "timestamp": "2021-07-12 20:24:49.536462",
  "user_id": "20",
  "pixel_id": "fytil06mrm",
  "click_id": "57ae98d7ffd94dc6a47fd756ab9c1cde",
  "insertion_timestamp": "2021-07-12 20:24:49.536488",
  "event_name": "AddToCart",
  "user_ip": "169.235.214.44",
  "additional_data": {
    "price": 694.83,
    "product_id": 361482,
    "time_on_data": 83,
    "percent_viewed": 87
  },
  "name": "Semyon Ryan",
  "country": "US",
  "region": "California",
  "city": "Riverside"
}


Elasticache Redis cluster acts as a caching layer for reducing a number of requests to external resources like “ipinfo” as well as load on the database server, where we store critical userdata. All data in the Redis is reproducible.


Transformation and analytics


AWS Glue


AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. There are a few use cases that can be useful for us in the current system, for example metastore and ETL jobs.


Data catalog and crawler

The AWS Glue Data Catalog contains references to data that is used as sources and targets of your ETL jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location, schema, and runtime metrics of your data. You use the information in the Data Catalog to create and monitor your ETL jobs. Information in the Data Catalog is stored as metadata tables, where each table specifies a single data store. Typically, you run a crawler to take inventory of the data in your data stores, but there are other ways to add metadata tables into your Data Catalog. The following workflow diagram shows how AWS Glue crawlers interact with data stores and other elements to populate the Data Catalog.

The following is the general workflow for how a crawler populates the AWS Glue Data Catalog:

  1. A crawler runs any custom classifiers that you choose to infer the format and schema of your data. You provide the code for custom classifiers, and they run in the order that you specify. The first custom classifier to successfully recognize the structure of your data is used to create a schema. Custom classifiers lower in the list are skipped.

  2. If no custom classifier matches your data's schema, built-in classifiers try to recognize your data's schema. An example of a built-in classifier is one that recognizes JSON.

  3. The crawler connects to the data store. Some data stores require connection properties for crawler access.

  4. The inferred schema is created for your data.

  5. The crawler writes metadata to the Data Catalog. A table definition contains metadata about the data in your data store. The table is written to a database, which is a container of tables in the Data Catalog. Attributes of a table include classification, which is a label created by the classifier that inferred the table schema.

In the process of crawler creation I point it to the S3 bucket with enriched data and run it.


As a result I get a table with an automatically generated schema.


Record Format Conversion of Kinesis Firehose requires a Glue table as a schema to determine how to interpret that data.