AWS infrastructure for a clickstream data analysis

Problem statement


Analysing the user's behaviour is very important for making right decisions in application development and maximising business value.

AWS provides many services useful for data streaming, transformation and analysis. In this blog post I’m describing a Proof Of Concept project where I tried to simulate clickstream activity, ingest it to AWS, store, transform, aggregate and analyse.


The proposed solution


The following high level diagram shows AWS infrastructure that was built.

Next I will go through every component and explain its role in the system.


Data ingestion


We expect that clickstream events incoming to our system will be small pieces of data that are generated continuously with high speed and volume. We might need to analyse it in near real time, so Amazon Kinesis fits perfectly for this case.


Amazon Kinesis Data Streams


Amazon Kinesis Data Streams is a scalable and durable real-time data streaming service that can continuously capture gigabytes of data per second from hundreds of thousands of sources.


We can send events directly to a kinesis data stream using Kinesis Client Library or AWS API/SDK , but we can also make it a bit easier for our customers by exposing Kinesis through AWS API Gateway and they will be able to send usual HTTP requests, moreover we can take advantage of requests validation or protection with AWS Web Application Firewall (WAF). An example of API provided by AWS.


Amazon API Gateway will automatically scale to handle the amount of traffic your API receives.


I have developed a simple python script that generates JSON payload with some user’s information and sends it to API Gateway endpoint. Each user has id, some randomly generated IP address, event_name as an action (Search, AddToCart, ViewContent, Purchase)

{
  "timestamp": "2021-07-12 12:01:58.732726",
  "user_id": "35",
  "pixel_id": "wjgao4w1oi",
  "click_id": "a5cf179b9c9d483abf6d424d44a293be",
  "insertion_timestamp": "2021-07-12 12:01:58.732754",
  "event_name": "Search",
  "user_ip": "111.33.64.227",
  "additional_data": {
    "time_on_data": 79,
    "percent_viewed": 39.7,
    "product_id": 557246,
    "price": 417.97
  }
}

Field “user_id” was also used as a partition key for Kinesis Data Stream to segregate and route records to different shards of a data stream.


A single shard can ingest up to 1 MB of data per second (including partition keys) or 1,000 records per second for writes. The maximum size of the data payload of a record before base64-encoding is up to 1 MB.

Kinesis data stream doesn’t scale automatically, but we can scale up the number of shards using CloudWatch + Lambda (UpdateShardCount API) to scale a stream to 5,000 shards, the stream will be able to ingest up to 5 GB per second or 5 million records per second.


Amazon Kinesis Data Analytics


Amazon Kinesis Data Analytics is the easiest way to transform and analyze streaming data in real time. We can use either standard SQL or Apache Flink libraries for complex event processing.


For example, let’s catch all “ViewContent” events where "percent_viewed">60. Later we can use it for some special offerings for a customer, who is possibly interested in particular “product_id”.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"COL_timestamp" TIMESTAMP, 
"event_name" VARCHAR(16), 
"percent_viewed" DECIMAL(1,1), 
"product_id" INTEGER,
"price" REAL
);

CREATE OR REPLACE PUMP "STREAM_PUMP"AS INSERT INTO "DESTINATION_SQL_STREAM"

SELECT STREAM "COL_timestamp", "event_name", "percent_viewed", "product_id", "price"
FROM "SOURCE_SQL_STREAM_001"
WHERE "event_name"='ViewContent'
AND "percent_viewed">60;

Output data can be sent to another Kinesis Data Stream, Kinesis Firehose Delivery stream or Lambda function for further processing.


Amazon Kinesis Data Firehose


Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics services. It can capture, transform, and deliver streaming data to Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, generic HTTP endpoints.

Moreover, Kinesis Data Firehose can transform source records with AWS Lambda, convert record format and backup raw data that will come in handy.

Raw data is being stored in JSON format in a dedicated bucket partitioned by year, month, day and hour.

Data is being enriched and converted to Apache Parquet format with Snappy compression by transformation lambda and stored in a separate bucket.

Converting and compression is very important for cost effectiveness of storage and speed of further queries. As we can see, converted files are several times smaller than raw.


AWS provides calculation and comparison of storing text files vs. Parquet.


Lambda functions and Redis


Lambda “Last access” is being triggered by Kinesis Data stream, reading batches of 10 events and updating data in the Elasticache Redis cluster with the latest access timestamp and user’s location.



Data update” lambda is regularly executed by schedule in CloudWatch events rule, gets a name of user from Mysql Aurora and sends it to Redis. This Lambda can also be invoked by Aurora, when a new user has signed up and information appears in the database.


Enrichment” lambda does several things. Data is being buffered by Kinesis firehose up to 3 MB or 60 seconds (900 seconds max), whatever occurs first.

1. The lambda reads the user's IP address from payload.

2. The lambda needs to get the country, region and city which IP belongs to. First of all it looks for this data in Redis. If it exists in Redis, a “country” field is added to raw JSON. If the Redis key does not exist, Lambda calls https://ipinfo.io/ API, receives the country, saves it in Redis and adds fields “country”, “region” and “city” to the output JSON.

3. The lambda needs to get the user's full name. Redis already has a name of the user, updated by the “Data update” function earlier. Full name is also added to output JSON.


An example of enriched events is provided below.

{
  "timestamp": "2021-07-12 20:24:49.536462",
  "user_id": "20",
  "pixel_id": "fytil06mrm",
  "click_id": "57ae98d7ffd94dc6a47fd756ab9c1cde",
  "insertion_timestamp": "2021-07-12 20:24:49.536488",
  "event_name": "AddToCart",
  "user_ip": "169.235.214.44",
  "additional_data": {
    "price": 694.83,
    "product_id": 361482,
    "time_on_data": 83,
    "percent_viewed": 87
  },
  "name": "Semyon Ryan",
  "country": "US",
  "region": "California",
  "city": "Riverside"
}


Elasticache Redis cluster acts as a caching layer for reducing a number of requests to external resources like “ipinfo” as well as load on the database server, where we store critical userdata. All data in the Redis is reproducible.


Transformation and analytics


AWS Glue


AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. There are a few use cases that can be useful for us in the current system, for example metastore and ETL jobs.


Data catalog and crawler

The AWS Glue Data Catalog contains references to data that is used as sources and targets of your ETL jobs in AWS Glue. To create your data warehouse or data lake, you must catalog this data. The AWS Glue Data Catalog is an index to the location, schema, and runtime metrics of your data. You use the information in the Data Catalog to create and monitor your ETL jobs. Information in the Data Catalog is stored as metadata tables, where each table specifies a single data store. Typically, you run a crawler to take inventory of the data in your data stores, but there are other ways to add metadata tables into your Data Catalog. The following workflow diagram shows how AWS Glue crawlers interact with data stores and other elements to populate the Data Catalog.

The following is the general workflow for how a crawler populates the AWS Glue Data Catalog:

  1. A crawler runs any custom classifiers that you choose to infer the format and schema of your data. You provide the code for custom classifiers, and they run in the order that you specify. The first custom classifier to successfully recognize the structure of your data is used to create a schema. Custom classifiers lower in the list are skipped.

  2. If no custom classifier matches your data's schema, built-in classifiers try to recognize your data's schema. An example of a built-in classifier is one that recognizes JSON.

  3. The crawler connects to the data store. Some data stores require connection properties for crawler access.

  4. The inferred schema is created for your data.

  5. The crawler writes metadata to the Data Catalog. A table definition contains metadata about the data in your data store. The table is written to a database, which is a container of tables in the Data Catalog. Attributes of a table include classification, which is a label created by the classifier that inferred the table schema.

In the process of crawler creation I point it to the S3 bucket with enriched data and run it.


As a result I get a table with an automatically generated schema.


Record Format Conversion of Kinesis Firehose requires a Glue table as a schema to determine how to interpret that data.


Glue Job

An AWS Glue job encapsulates a script that connects to your source data, processes it, and then writes it out to your data target. Typically, a job runs extract, transform, and load (ETL) scripts. Jobs can also run general-purpose Python scripts (Python shell jobs.) AWS Glue triggers can start jobs based on a schedule or event, or on demand. You can monitor job runs to understand runtime metrics such as completion status, duration, and start time.

You can use scripts that AWS Glue generates or you can provide your own. Given a source schema and target location or schema, the AWS Glue code generator can automatically create an Apache Spark API (PySpark) script. You can use this script as a starting point and edit it to meet your goals.

The most simple and evident use case for Glue Job is Flatten enriched nested JSON. First of all let's execute a simple query in Amazon Athena. The field “additional_data” contains several attributes and we need to have them separated.


We can create a Glue Job just in a few clicks. No coding is required in this case.

AWS Glue has generated an ETL script for us. Run job.


The job succeeded.


Then a new crawler scans transformed data, creating a new table. Querying the new table:

The “additional_data” has been splitted.


RedShift Spectrum and data aggregation


Raw data or even Enriched data is not very useful for making business decisions. First of all data should be aggregated, this will help us with further analysis. Aggregation can be done using different tools and AWS services, like Elastic Map Reduce (EMR), Data Pipeline in some cases, custom batch jobs or Redshift Spectrum.

Using Amazon Redshift Spectrum, you can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables. Redshift Spectrum queries employ massive parallelism to execute very fast against large datasets. Much of the processing occurs in the Redshift Spectrum layer, and most of the data remains in Amazon S3. Multiple clusters can concurrently query the same dataset in Amazon S3 without the need to make copies of the data for each cluster.

Amazon Redshift Spectrum resides on dedicated Amazon Redshift servers that are independent of your cluster. Redshift Spectrum pushes many compute-intensive tasks, such as predicate filtering and aggregation, down to the Redshift Spectrum layer.

The first thing we need to do is create an external schema. We can refer to the existing Glue database.

CREATE EXTERNAL SCHEMA spectrum_schema FROM data catalog
database 'data-sandbox'
iam_role 'arn:aws:iam::394******563:role/RedShiftSpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

Now we can query data and take advantage of the Redshift engine.

SELECT "event_name", count(*) FROM spectrum_schema."2021" GROUP BY "event_name";

Also we can “UNLOAD” results to S3 bucket and use them for further analytics.

unload ('SELECT "country", "event_name", count(*) FROM spectrum_schema."2021" GROUP BY "country", "event_name" ORDER BY "country";')
to 's3://ait-aggregation/events_by_country/'
iam_role 'arn:aws:iam::394*******63:role/RedShiftSpectrumRole'
CSV
DELIMITER AS '|'
header
parallel off;


Athena and federated queries


Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

The most obvious way of using Athena is querying S3, but we can also execute federated queries using Athena Data Source Connectors that run on AWS Lambda. AWS has open source data source connectors for Amazon DynamoDB, Apache HBase, Amazon DocumentDB, Amazon Redshift, AWS CloudWatch, AWS CloudWatch Metrics, and JDBC-compliant relational databases such as MySQL, and PostgreSQL.


Let’s try to query existing S3, Redis and a Mysql database and JOIN data in order to get some useful report.

First of all we need to create data sources for Redis and Mysql.

We can find required lambda functions in the serverless app repository.


When Lambda deployment and Athena data source creation are completed, we can start querying.


WITH s3 AS 
    (SELECT timestamp,
         user_id,
         name,
         event_name
 FROM "data-sandbox"."2021"), redis AS 
    (SELECT SUBSTRING(_key_, 6, 4) AS user_id,
         last_access,
         location
 FROM "data-sandbox-redis"."redis"."redis"), mysql AS 
    (SELECT customer_id AS user_id,
         pixel_id,
         name,
         price_per_click
 FROM "mysql"."datasandbox"."customers")
SELECT s3.user_id, 
       mysql.name,
       location,
       event_name,
       last_access,
       price_per_click
FROM s3
LEFT JOIN redis
 ON redis.user_id = s3.user_id
LEFT JOIN mysql
 ON CAST(mysql.user_id AS VARCHAR(5)) = s3.user_id
WHERE event_name='Search'
ORDER BY  last_access DESC 

In this query I take different pieces of data from different sources, join them and get a report.

Last_access’ and ‘location’ come from Redis, ‘name’ and ‘price_per_click’ come from Mysql, and other fields are retrieved from S3. In the result we have the last “Search” request from every user.


Quicksite and visualization


Amazon QuickSight is a scalable, serverless, embeddable, machine learning-powered business intelligence (BI) service built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include Machine Learning-powered insights.

Amazon QuickSight connects to your data in the cloud and combines data from many different sources.

In the example below I use Athena data source. Amazon QuickSight offers a range of visual types that you can use to display your data, for example bar charts, donut charts, filled maps, pie charts, line charts, histograms, tables as visuals and many more. Let’s build a donut chart for the ratio of different events in the system.


Filled map with a number of requests from different US states.


or even from different cities


Table with an aggregation of the number of different actions by every customer.


Top 10 regions by the number of requests.

As we can see AWS Quicksite has great potential. Anybody without any technical background can create visualization just in a few clicks.


Conclusion


In this POC project I considered using AWS with a big variety of data related services, tried to build a system for ingesting a large amount of real-time data, streaming analysing, enrichment, com