Amazon announced the availability of Amazon Managed Workflows for Apache Airflow (MWAA), a fully managed service that makes it easy to run Apache Airflow on AWS and to build data processing workflows in the cloud.
Apache Airflow is an open-source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as “workflows”.
Amazon Personalize enables developers to build applications with the same machine learning (ML) technology used by Amazon.com for real-time personalized recommendations — no ML expertise required.
This article shows how we can build and manage an ML workflow using Google BigQuery, Amazon MWAA, and Amazon Personalize. We’ll build a session-based recommender system to predict the most popular items for an e-commerce website based on the traffic data of the product pages tracked by Google Analytics.
BigQuery is an enterprise data warehouse that solves this problem by enabling super-fast SQL queries using the processing power of Google’s infrastructure.
We’ll start by extracting the data, transforming the data, and building, training, deploying a solution version (a trained Amazon Personalize recommendation model) and deploying a campaign.
These tasks will be plugged into a workflow that can be orchestrated and automated through Apache Airflow integration with Amazon Personalize and Google BigQuery.
The diagram below represents the workflow we’ll implement for building the recommender system:
The workflow consists of the following tasks:
Before implementing the solution, you have to create an Airflow Environment Using Amazon MWAA;
“extra packages” should be included while creating an environment, please don’t include BigQuery Client in the below
requirements.txt
, we will install BigQuery Client in the next step:boto >= 2.49.0
httplib2
awswrangler
google-api-python-client
When the new Airflow environment is ready to be used, attach the Personalize policies to the IAM role of your environment, run CLI as below:
$aws iam put-role-policy - role-name AmazonMWAA-MyAirflowEnvironment-em53Wv - policy-name AirflowPersonalizePolicy - policy-document file://airflowPersonalizePolicy.json
{
"Version": "2012-10-17",
"Id": "AirflowPersonalizePolicy",
"Statement": [
{
"Sid": "PersonalizeAccessPolicy",
"Effect": "Allow",
"Action": [
"personalize:*"
],
"Resource": [
"*"
]
},
{
"Sid": "S3BucketAccessPolicy",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutBucketPolicy"
],
"Resource": [
"arn:aws:s3:::airflow-demo-personalise",
"arn:aws:s3:::airflow-demo-personalise/*"
]
},
{
"Sid": "IAMPolicy",
"Effect": "Allow",
"Action": [
"iam:CreateRole",
"iam:AttachRolePolicy"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"iam:GetRole",
"iam:PassRole"
],
"Resource": "arn:aws:iam::*:role/PersonalizeS3Role-*"
}
]
}
MWAA currently doesn't support Google Cloud Bigquery client (
google-cloud-bigquery
) and pandas-gbq
with grpc > 1.20
. We are not able to install Bigquery Client through
requirements.txt
, if you put the above dependencies into requiremnts.txt, pip installation won’t install any dependencies in requirements.txt
, and you will meet error No module named “httplib2”
when running DAG.To resolve this issue, we can:
I created a bash file and requirements.txt for the above steps; run the following command:
$bash setup.sh
setup.sh
#!/bin/bash
virtualenv -p python3.7 venv
source venv/bin/activate
pip install -r requirements.txt
cd venv/lib/python3.7
zip -r site-packages.zip site-packages/
mv site-packages.zip ../../../site-packages.zip
cd ../../../
deactivate
aws s3 cp site-packages.zip s3://airflow-demo-personalise/
requirements.txt
cachetools==4.2.0
certifi==2020.12.5
cffi==1.14.4
chardet==3.0.4
google-api-core==1.24.0
google-auth==1.24.0
google-auth-oauthlib==0.4.2
google-cloud-bigquery==2.6.1
google-cloud-bigquery-storage==2.1.0
google-cloud-core==1.5.0
google-crc32c==1.1.0
google-resumable-media==1.2.0
googleapis-common-protos==1.52.0
grpcio==1.34.0
idna==2.10
libcst==0.3.15
mypy-extensions==0.4.3
numpy==1.19.4
oauthlib==3.1.0
pandas==1.1.5
pandas-gbq==0.14.1
proto-plus==1.13.0
protobuf==3.14.0
pyarrow==2.0.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.20
pydata-google-auth==1.1.0
python-dateutil==2.8.1
pytz==2020.4
PyYAML==5.3.1
requests==2.25.0
requests-oauthlib==1.3.0
rsa==4.6
six==1.15.0
typing-extensions==3.7.4.3
typing-inspect==0.6.0
urllib3==1.26.2
Then copy the following code to the DAG task to import google modules dynamically:
s3 = boto3.resource('s3')
logger.info(
"Download google bigquery and google client dependencies from S3")
s3.Bucket(BUCKET_NAME).download_file(LIB_KEY, '/tmp/site-packages.zip')
with zipfile.ZipFile('/tmp/site-packages.zip', 'r') as zip_ref:
zip_ref.extractall('/tmp/python3.7/site-packages')
sys.path.insert(1, "/tmp/python3.7/site-packages/site-packages/")
# Import google bigquery and google client dependencies
import pyarrow
from google.cloud import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
Next, We will create Google Cloud Connection in Airflow Airflow UI.
Now, we will be able to use Google BigQuery in Amazon Managed Airflow workers; let’s begin to create workflow tasks.
First, export session and hit data from a Google Analytics 360 account to BigQuery, use SQL to query Analytics data into Pandas data frame with Personalize format.
To prepare an interaction dataset for Personalize, we need to extract the following data from BigQuery Google Analytics:
USER_ID
, In this example, we don’t have user data of the e-commerce website, and there is no user interaction data from the website database. However, we can use the client id provided by Google Analytics. The client id (cid
) is a unique identifier for a browser–device pair that helps Google Analytics link user actions on a site. By default, Google Analytics determines unique users using this parameter. The client ID format is a randomly generated 31-bit integer followed by a dot (“.”
) followed by the current time in seconds. Hence we only need the 31-bit integer before the dot. BigQuery provides regular expression support, which we can put REGEXP_EXTRACT(USER_ID, r’(\d+)\.’) AS USER_ID
in Big Query to extract the session id as session-based User Id. ITEM_ID
, Google Analytics provides page location (page_location
), so we can extract product pages (product slug) by WHERE Clause page_location LIKE ‘%/product/%’
and make it as Item Id.TIMESTAMP
, timestamp data must be in UNIX epoch time format, use TIMESTAMP_TRUNC(TIMESTAMP_MICROS(event_timestamp)
to convert Analytics event_timestamp to correct format.device.category
AS DEVICE
.geo.country
AS LOCATION
.event_name
AS EVENT_NAME
.SQL query in BigQuery as below:
BQ_SQL = """
SELECT REGEXP_EXTRACT(USER_ID, r'(\d+)\.') AS USER_ID, UNIX_SECONDS(EVENT_DATE) AS TIMESTAMP, REGEXP_EXTRACT(page_location,r'product/([^?&#]*)') as ITEM_ID, LOCATION, DEVICE, EVENT_NAME AS EVENT_TYPE
FROM
(
SELECT user_pseudo_id AS USER_ID, (SELECT value.string_value FROM UNNEST(event_params)
WHERE key = "page_location") as page_location, TIMESTAMP_TRUNC(TIMESTAMP_MICROS(event_timestamp),
MINUTE) AS EVENT_DATE, device.category AS DEVICE, geo.country AS LOCATION, event_name AS EVENT_NAME
FROM `lively-metrics-295911.analytics_254171871.events_intraday_*`
WHERE
_TABLE_SUFFIX = FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY))
)
WHERE page_location LIKE '%/product/%'
GROUP BY USER_ID, EVENT_DATE, page_location, LOCATION, DEVICE,EVENT_NAME
ORDER BY EVENT_DATE DESC
"""
Next, write the data frame to the CSV file directory to S3 using AWS DataWragler.
The following PythonOperator snippet in the DAG defines
BigQuery to S3
task.def bq_to_s3():
s3 = boto3.resource('s3')
logger.info(
"Download google bigquery and google client dependencies from S3")
s3.Bucket(BUCKET_NAME).download_file(LIB_KEY, '/tmp/site-packages.zip')
with zipfile.ZipFile('/tmp/site-packages.zip', 'r') as zip_ref:
zip_ref.extractall('/tmp/python3.7/site-packages')
sys.path.insert(1, "/tmp/python3.7/site-packages/site-packages/")
# Import google bigquery and google client dependencies
import pyarrow
from google.cloud import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
bq_hook = BigQueryHook(
bigquery_conn_id="bigquery_default", use_legacy_sql=False)
bq_client = bigquery.Client(project=bq_hook._get_field(
"project"), credentials=bq_hook._get_credentials())
events_df = bq_client.query(BQ_SQL).result().to_dataframe(
create_bqstorage_client=False)
logger.info(
f'google analytics events dataframe head - {events_df.head()}')
wr.s3.to_csv(events_df, OUTPUT_PATH, index=False)
t_export_bq_to_s3 = PythonOperator(task_id='export_bq_to_s3',
python_callable=bq_to_s3,
dag=dag,
retries=1)
In this section, we will build a Personalize solution to identify the most popular items for an e-commerce website integrated with Google Analytics.
We will use Popularity-Count Recipe for training our model. Although Personalize supports importing interactions incrementally, we will retrain the model base on daily interaction data to get more relevant recommendations.
What we’ll cover:
check_s3_for_key
(S3KeySensor): check if the dataset CSV file exists.t_check_dataset_group
(BranchPythonOperator): check if the Personalize dataset group exists. If Yes, trigger t_init_personalize
, else trigger t_skip_init_personalize
.t_init_personalize
(DummyOperator): trigger parallels tasks if dataset group doesn't exist(t_create_dataset_group
,t_create_schema
,t_put_bucket_policies
,t_create_iam_role
).t_create_dataset_group
(PythonOperator): Create a Personalize dataset group if it doesn’t exist.t_create_schema
(PythonOperator): Create an Interaction schema for our data if the schema doesn’t exist.t_put_bucket_policies
(PythonOperator): Attach an Amazon S3 policy to your Amazon Personalize role if it doesn’t exist.t_create_iam_role
(PythonOperator): Create a Personalize role that has the right permissions if it doesn’t exist.t_create_dataset_type
(PythonOperator): Create an ‘Interactions’ dataset type if it doesn’t exist.t_skip_init_personalize
(DummyOperator): Downstream task of BranchOperator task.t_create_import_dataset_job
(PythonOperator): Create your Dataset import jobs.t_update_solution
(PythonOperator): Create / Update Solution.t_update_campagin
(PythonOperator): Create/ Update Campaign.def create_dataset_group(**kwargs):
create_dg_response = personalize.create_dataset_group(
name=DATASET_GROUP_NAME
)
dataset_group_arn = create_dg_response["datasetGroupArn"]
status = None
max_time = time.time() + 2*60*60 # 2 hours
while time.time() < max_time:
describe_dataset_group_response = personalize.describe_dataset_group(
datasetGroupArn=dataset_group_arn
)
status = describe_dataset_group_response["datasetGroup"]["status"]
logger.info(f"DatasetGroup: {status}")
if status == "ACTIVE" or status == "CREATE FAILED":
break
time.sleep(20)
if status == "ACTIVE":
kwargs['ti'].xcom_push(key="dataset_group_arn",
value=dataset_group_arn)
if status == "CREATE FAILED":
raise AirflowFailException(
f"DatasetGroup {DATASET_GROUP_NAME} create failed")
def check_dataset_group(**kwargs):
dg_response = personalize.list_dataset_groups(
maxResults=100
)
demo_dg = next((datasetGroup for datasetGroup in dg_response["datasetGroups"]
if datasetGroup["name"] == DATASET_GROUP_NAME), False)
if not demo_dg:
return "init_personalize"
else:
kwargs['ti'].xcom_push(key="dataset_group_arn",
value=demo_dg["datasetGroupArn"])
return "skip_init_personalize"
def create_schema():
schema_response = personalize.list_schemas(
maxResults=100
)
interaction_schema = next((schema for schema in schema_response["schemas"]
if schema["name"] == INTERCATION_SCHEMA_NAME), False)
if not interaction_schema:
create_schema_response = personalize.create_schema(
name=INTERCATION_SCHEMA_NAME,
schema=json.dumps({
"type": "record",
"name": "Interactions",
"namespace": "com.amazonaws.personalize.schema",
"fields": [
{
"name": "USER_ID",
"type": "string"
},
{
"name": "ITEM_ID",
"type": "string"
},
{
"name": "TIMESTAMP",
"type": "long"
},
{
"name": "LOCATION",
"type": "string",
"categorical": True
},
{
"name": "DEVICE",
"type": "string",
"categorical": True
},
{
"name": "EVENT_TYPE",
"type": "string"
}
]
}))
logger.info(json.dumps(create_schema_response, indent=2))
schema_arn = create_schema_response["schemaArn"]
return schema_arn
return interaction_schema["schemaArn"]
def put_bucket_policies():
s3 = boto3.client("s3")
policy = {
"Version": "2012-10-17",
"Id": "PersonalizeS3BucketAccessPolicy",
"Statement": [
{
"Sid": "PersonalizeS3BucketAccessPolicy",
"Effect": "Allow",
"Principal": {
"Service": "personalize.amazonaws.com"
},
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::{}".format(BUCKET_NAME),
"arn:aws:s3:::{}/*".format(BUCKET_NAME)
]
}
]
}
s3.put_bucket_policy(Bucket=BUCKET_NAME, Policy=json.dumps(policy))
def create_iam_role(**kwargs):
role_name = f"PersonalizeS3Role-{suffix}"
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "personalize.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
try:
create_role_response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy_document)
)
iam.attach_role_policy(
RoleName=role_name,
PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)
role_arn = create_role_response["Role"]["Arn"]
# sometimes need to wait a bit for the role to be created
time.sleep(30)
return role_arn
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
role_arn = iam.get_role(RoleName=role_name)['Role']['Arn']
time.sleep(30)
return role_arn
else:
raise AirflowFailException(f"PersonalizeS3Role create failed")
def create_dataset_type(**kwargs):
ti = kwargs['ti']
schema_arn = ti.xcom_pull(key="return_value", task_ids='create_schema')
print(schema_arn)
dataset_group_arn = ti.xcom_pull(key="dataset_group_arn",
task_ids='create_dataset_group')
dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
datasetType=dataset_type,
datasetGroupArn=dataset_group_arn,
schemaArn=schema_arn,
name=f"DEMO-metadata-dataset-interactions-{suffix}"
)
interactions_dataset_arn = create_dataset_response['datasetArn']
logger.info(json.dumps(create_dataset_response, indent=2))
return interactions_dataset_arn
def import_dataset(**kwargs):
ti = kwargs['ti']
interactions_dataset_arn = ti.xcom_pull(key="return_value",
task_ids='create_dataset_type')
role_arn = ti.xcom_pull(key="return_value",
task_ids='create_iam_role')
create_dataset_import_job_response = personalize.create_dataset_import_job(
jobName="DEMO-dataset-import-job-"+suffix,
datasetArn=interactions_dataset_arn,
dataSource={
"dataLocation": OUTPUT_PATH
},
roleArn=role_arn
)
dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
logger.info(json.dumps(create_dataset_import_job_response, indent=2))
status = None
max_time = time.time() + 2*60*60 # 2 hours
while time.time() < max_time:
describe_dataset_import_job_response = personalize.describe_dataset_import_job(
datasetImportJobArn=dataset_import_job_arn
)
dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
if "latestDatasetImportJobRun" not in dataset_import_job:
status = dataset_import_job["status"]
logger.info("DatasetImportJob: {}".format(status))
else:
status = dataset_import_job["latestDatasetImportJobRun"]["status"]
logger.info("LatestDatasetImportJobRun: {}".format(status))
if status == "ACTIVE" or status == "CREATE FAILED":
break
time.sleep(60)
if status == "ACTIVE":
return dataset_import_job_arn
if status == "CREATE FAILED":
raise AirflowFailException(
f"Dataset import job create failed")
def update_solution(**kwargs):
recipe_arn = "arn:aws:personalize:::recipe/aws-popularity-count"
ti = kwargs['ti']
dataset_group_arn = ti.xcom_pull(key="dataset_group_arn",
task_ids='create_dataset_group')
list_solutions_response = personalize.list_solutions(
datasetGroupArn=dataset_group_arn,
maxResults=100
)
demo_solution = next((solution for solution in list_solutions_response["solutions"]
if solution["name"] == SOLUTION_NAME), False)
if not demo_solution:
create_solution_response = personalize.create_solution(
name=SOLUTION_NAME,
datasetGroupArn=dataset_group_arn,
recipeArn=recipe_arn
)
solution_arn = create_solution_response['solutionArn']
logger.info(json.dumps(create_solution_response, indent=2))
else:
solution_arn = demo_solution["solutionArn"]
kwargs['ti'].xcom_push(key="solution_arn",
value=solution_arn)
create_solution_version_response = personalize.create_solution_version(
solutionArn=solution_arn,
trainingMode='FULL'
)
solution_version_arn = create_solution_version_response['solutionVersionArn']
status = None
max_time = time.time() + 2*60*60 # 2 hours
while time.time() < max_time:
describe_solution_version_response = personalize.describe_solution_version(
solutionVersionArn=solution_version_arn
)
status = describe_solution_version_response["solutionVersion"]["status"]
logger.info(f"SolutionVersion: {status}")
if status == "ACTIVE" or status == "CREATE FAILED":
break
time.sleep(60)
if status == "ACTIVE":
return solution_version_arn
if status == "CREATE FAILED":
raise AirflowFailException(
f"Solution version create failed")
def update_campagin(**kwargs):
ti = kwargs['ti']
solution_version_arn = ti.xcom_pull(key="return_value",
task_ids='update_solution')
solution_arn = ti.xcom_pull(key="solution_arn",
task_ids='update_solution')
list_campagins_response = personalize.list_campaigns(
solutionArn=solution_arn,
maxResults=100
)
demo_campaign = next((campaign for campaign in list_campagins_response["campaigns"]
if campaign["name"] == CAMPAIGN_NAME), False)
if not demo_campaign:
create_campaign_response = personalize.create_campaign(
name=CAMPAIGN_NAME,
solutionVersionArn=solution_version_arn,
minProvisionedTPS=2,
)
campaign_arn = create_campaign_response['campaignArn']
logger.info(json.dumps(create_campaign_response, indent=2))
else:
campaign_arn = demo_campaign["campaignArn"]
personalize.update_campaign(
campaignArn=campaign_arn,
solutionVersionArn=solution_version_arn,
minProvisionedTPS=2
)
status = None
max_time = time.time() + 2*60*60 # 2 hours
while time.time() < max_time:
describe_campaign_response = personalize.describe_campaign(
campaignArn=campaign_arn
)
status = describe_campaign_response["campaign"]["status"]
print("Campaign: {}".format(status))
if status == "ACTIVE" or status == "CREATE FAILED":
break
time.sleep(60)
if status == "ACTIVE":
return campaign_arn
if status == "CREATE FAILED":
raise AirflowFailException(
f"Campaign create/update failed")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ml-pipeline',
default_args=default_args,
description='A simple ML data pipeline DAG',
schedule_interval='@daily',
)
t_check_dataset_group = BranchPythonOperator(
task_id='check_dataset_group',
provide_context=True,
python_callable=check_dataset_group,
retries=1,
dag=dag,
)
t_init_personalize = DummyOperator(
task_id="init_personalize",
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag,
)
t_create_dataset_group = PythonOperator(
task_id='create_dataset_group',
provide_context=True,
python_callable=create_dataset_group,
retries=1,
dag=dag,
)
t_create_schema = PythonOperator(
task_id='create_schema',
python_callable=create_schema,
retries=1,
dag=dag,
)
t_put_bucket_policies = PythonOperator(
task_id='put_bucket_policies',
python_callable=put_bucket_policies,
retries=1,
dag=dag,
)
t_create_iam_role = PythonOperator(
task_id='create_iam_role',
provide_context=True,
python_callable=create_iam_role,
retries=1,
dag=dag,
)
t_create_dataset_type = PythonOperator(
task_id='create_dataset_type',
provide_context=True,
python_callable=create_dataset_type,
trigger_rule=TriggerRule.ALL_SUCCESS,
retries=1,
dag=dag,
)
t_create_import_dataset_job = PythonOperator(
task_id='import_dataset',
provide_context=True,
python_callable=import_dataset,
retries=1,
dag=dag,
)
t_skip_init_personalize = DummyOperator(
task_id="skip_init_personalize",
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag,
)
t_init_personalize_done = DummyOperator(
task_id="init_personalize_done",
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag,
)
t_update_solution = PythonOperator(
task_id='update_solution',
provide_context=True,
python_callable=update_solution,
trigger_rule=TriggerRule.ALL_SUCCESS,
retries=1,
dag=dag,
)
t_update_campagin = PythonOperator(
task_id='update_campagin',
provide_context=True,
python_callable=update_campagin,
trigger_rule=TriggerRule.ALL_SUCCESS,
retries=1,
dag=dag,
)
In the next section, we’ll see how all these tasks are stitched together to form a workflow in an Airflow DAG.
Different tasks are created in the above sections using operators like
PythonOperator
for generic Python code to run on-demand or at a scheduled interval. Now let’s set DAG with parameters; a DAG is simply a Python script that contains a set of tasks and their dependencies.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ml-pipeline',
default_args=default_args,
description='A simple ML data pipeline DAG',
schedule_interval='@daily',
)
Next, specify task dependencies:
t_export_bq_to_s3 >> check_s3_for_key >> t_check_dataset_group
t_check_dataset_group >> t_init_personalize
t_check_dataset_group >> t_skip_init_personalize >> t_init_personalize_done
t_init_personalize >> [
t_create_dataset_group,
t_create_schema,
t_put_bucket_policies,
t_create_iam_role
] >> t_create_dataset_type
t_create_dataset_type >> t_init_personalize_done
t_init_personalize_done >> t_create_import_dataset_job >> t_update_solution
t_update_solution >> t_update_campagin
After triggering the DAG on-demand or on a schedule, we can monitor DAGs and task executions and directly interact with them through Airflow UI.
In the Airflow UI, we can see a graph view of the DAG to have a clear representation of how tasks are executed:
In this article, I introduced how we can build an ML workflow using MWAA and BigQuery; You can extend the workflow by customizing the DAGs, such as extending the dataset by merging daily data in a certain period (weekly, monthly, etc.), creating parallel tasks using different recipes and retraining models by schedule or trigger with S3 Key Sensor.
I hope you have found this article useful. You can find the complete project in my GitHub repo.