Personalise your emails with recommenders (part1).

This is the first post of our Smarter Segments series where we will explore and implement the stepping stones needed to achieve personalised product recommendations integrated into existing email marketing campaigns. We will use AWS Personalize to create the recommendations and AWS Pinpoint to deliver the emails but the techniques here should work with any ESP

In order to create a recommendation engine that you can use for your email marketing campaigns, we need to 1) set up the recommendation engine 2) to query it.

What we doing ?

A multi-part series to make a minimum viable ML pipeline that will satisfy a basic use case:

The Use Case

You are an e-commerce store, with a 100+ daily active users that have made in total 10K+ interactions with your products (purchased, viewed, clicked_for_info).

You’re about to send your monthly newsletter to your users. Only this time you want to add a section in the email where you’ll display a list of products that you’l recommend to them specifically – helping your emails become more personalised therefore better conversions.

The main component that constitute your recommender engine is a Dataset Group; this component will manage the access to your data, and the recommender; this component will contain the analytical method to achieve different types recommendation.

The steps needed

  • Data acquisition: we will begin with acquiring the data, specifically we will need 3 sets of data.
    • The Interaction data: This is the most important set. This set of data will contain the columns like the user id, item id, and interaction type.
    • The items data: this is where we keep the details of the items stored
    • The user data: this is where we store users details.
  • Data cleaning and processing: we will add and remove columns appropriately before
  • Creating recommender: this engine will ingest the prepared data and do the ML number crunching.
  • Now that we have the recommender we now need to have it somewhere where we can use it to provide recommendations for our users.
  • Once we obtain the recommendations we need to imbed them in the newsletter email for that user before sending the email altogether (this will be in part 2)

Data acquisition

We will be using the code example for the AWS Personalize Samples as starting points and we’ll modify later to customize. The data we will be using are sample data provided by AWS

Our demo data can be found and accessed using the aws cli. Setting up the AWS CLI is beyond the scope of this blog. With Jupyter Notebook create a file and add the following

!aws s3 cp s3://retail-demo-store-us-east-1/csvs/items.csv .
!aws s3 cp s3://retail-demo-store-us-east-1/csvs/interactions.csv .

Lets download and inspect the files (and do the imports)

import boto3
import json
import numpy as np
import pandas as pd
import time
import datetime

df = pd.read_csv('./interactions.csv')

What events have we defined as interactions ?

df.EVENT_TYPE.value_counts()

The only house keeping we’ll do to the interactions data is drop the column discount since we’re not going to use.

test=df.drop(columns=['DISCOUNT'])
df=test
df.sample(10)

thats all the cleaning that we going to do. We’ve only dropped one column, we’re now ready to save the cleaned file.

df.to_csv("cleaned_training_data.csv")

Thats the only file that we going to upload onto S3 to be used by the recommender to train model for us. items.csv we will keep locally and we’ll use it to query for product details by ID (the recommender will provide us the recommended products by ID)

Lets now create the S3 bucket to upload our cleaned data to – using the our boto client

Python
# Configure the SDK to Personalize:
personalize = boto3.client('personalize')

region = "eu-west-1"
s3 = boto3.client('s3')
account_id = boto3.client('sts').get_caller_identity().get('Account')
bucket_name = account_id + "-" + region + "-" + "user-recommendations"
print('bucket_name:', bucket_name)

try: 
    s3.create_bucket(
        Bucket = bucket_name,
        CreateBucketConfiguration={'LocationConstraint': region})

except s3.exceptions.BucketAlreadyOwnedByYou:
    print("Bucket already exists. Using bucket", bucket_name)
Python

We then assign a policy to the bucket to allow AWS Personalize to access it

Python
s3 = boto3.client("s3")
policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket_name),
                "arn:aws:s3:::{}/*".format(bucket_name)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy))

Once the bucket is created we can upload to it the interaction.csv we created earlier. The file will be downloaded and used by AWS Personalize service to do the number crunching.

Python
boto3.Session().resource('s3').Bucket(bucket_name).Object(clean_training_data_file_name).upload_file(clean_training_data_file_name)
interactions_s3DataPath = "s3://"+bucket_name+"/"+clean_training_data_file_name

After data cleaning/housekeeping we’re ready to assemble the AWS component needed to achieve basic recommendations. The main components are the The dataset group and the recommender.

core AWS Personalize components

And they can be put together like this…

Creating the dataset group. we simply state the domain to which the dataset group belong. Certain dataset groups will require different schemas. We use the resulted arn to query until we know that the group as has been created.

The Dataset Group

Python
response = personalize.create_dataset_group(
    name='personalize_ecomemerce_ds_group',
    domain='ECOMMERCE'
)

dataset_group_arn = response['datasetGroupArn']

The Schema: The only schema relevant here is the one that describes the interactions data set.

Python
interactions_schema = schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        },
        {
            "name": "EVENT_TYPE",
            "type": "string"
            
        }
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = "personalize-ecommerce-interatn_group",
    domain = "ECOMMERCE",
    schema = json.dumps(interactions_schema)
)

interaction_schema_arn = create_schema_response['schemaArn']

Now that we have the Dataset Group and a Schema to describe a database we can can create the Dataset. We specify the dataset type that we want to create. In this case is of type “INTERACTIONS”.

Python
dataset_type = "INTERACTIONS"

create_dataset_response = personalize.create_dataset(
    name = "personalize_ecommerce_demo_interactions",
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = interaction_schema_arn
)

interactions_dataset_arn = create_dataset_response['datasetArn']

Create the personalize role

Python
iam = boto3.client("iam")

role_name = "PersonalizeRoleEcommerceDemoRecommender"
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

# Now add S3 support
iam.attach_role_policy(
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess',
    RoleName=role_name
)
time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]

Now that we have the Role, the Dataset and the S3 bucket where the interaction data is kept, we can create the Dataset import job

Python
create_interactions_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "personalize_ecommerce_demo_interactions_import",
    datasetArn = interactions_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket_name, interactions_file_path)
    },
    roleArn = role_arn
)

dataset_interactions_import_job_arn = create_interactions_dataset_import_job_response['datasetImportJobArn']

Create the recommender

Python
create_recommender_response = personalize.create_recommender(
  name = 'viewed_x_also_viewed_demo',
  recipeArn = 'arn:aws:personalize:::recipe/aws-ecomm-customers-who-viewed-x-also-viewed',
  datasetGroupArn = dataset_group_arn
)
viewed_x_also_viewed_arn = create_recommender_response["recommenderArn"]

Query the recommender: once the recommender has been created, this means that we’re finally able to use it.

Lets create a helper function to get us a description of a product based on ID.

Python
# reading the original data in order to have a dataframe that has both item_ids 
# and the corresponding titles to make out recommendations easier to read.
items_df = pd.read_csv('./items.csv')
items_df.sample(10)
def get_item_by_id(item_id, item_df):
    """
    This takes in an item_id from a recommendation in string format,
    converts it to an int, and then does a lookup in a default or specified
    dataframe and returns the item description.
    
    A really broad try/except clause was added in case anything goes wrong.
    
    Feel free to add more debugging or filtering here to improve results if
    you hit an error.
    """
    try:
        return items_df.loc[items_df["ITEM_ID"]==str(item_id)]['PRODUCT_DESCRIPTION'].values[0]
    except:
        print (item_id)
        return "Error obtaining item description"

Querying the “Customers who viewed X also viewed” Recommender:

Python
# First pick a user
test_user_id = "777"

# Select a random item
test_item_id = "8fbe091c-f73c-4727-8fe7-d27eabd17bea" # a random item: 8fbe091c-f73c-4727-8fe7-d27eabd17bea

# Get recommendations for the user for this item
get_recommendations_response = personalize_runtime.get_recommendations(
    recommenderArn = viewed_x_also_viewed_arn,
    itemId = test_item_id,
    userId = test_user_id,
    numResults = 10
)

# Build a new dataframe for the recommendations
item_list = get_recommendations_response['itemList']
recommendation_list = []

for item in item_list:
    item = get_item_by_id(item['itemId'], items_df)
    recommendation_list.append(item)

user_recommendations_df = pd.DataFrame(recommendation_list, columns = [get_item_by_id(test_item_id, items_df)])

pd.options.display.max_rows =10
display(user_recommendations_df)

and there you have it we can now query our engine to get recommendations for a specific user.

clean up you resources on aws.

Make sure you clean up the resources that use to not incure unnecessary costs.