paint-brush
How to Build a Scalable Data Mesh in AWS with Lake Formationby@digvijayvaghela
153 reads

How to Build a Scalable Data Mesh in AWS with Lake Formation

by Digvijay WaghelaFebruary 17th, 2025
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Data Mesh is a modern approach that decentralizes ownership and treats data as a product managed by domain teams. The architecture follows a Data Mesh design that is concerned with two AWS accounts: Account A (Producer) and Account B (Consumer) The objective is to transfer AWS Glue Data Catalog tables from Account A to Account B in a secure manner.
featured image - How to Build a Scalable Data Mesh in AWS with Lake Formation
Digvijay Waghela HackerNoon profile picture
0-item
1-item


In today's world, where data is power, traditional centralized designs often become chokepoints that impede both access to data and innovation. Data Mesh is a modern approach that decentralizes ownership and treats data as a product managed by domain teams. AWS Lake Formation (LF) simplifies secure, cross-account sharing and governance, including all three: control of what users can do across accounts, and keeping track of compliance with regulations and standards. Leveraging Lake Formation, AWS Lambda, SQS, IAM, and S3 an organization can now implement a truly scalable Data Mesh architecture fostering self-serve analytics interoperability and federated governance without compromising security.

Architecture Overview


 Data Mesh using Lake Formation


The architecture follows a Data Mesh design that is concerned with two AWS accounts: Account A (Producer) and Account B (Consumer). The objective is to transfer AWS Glue Data Catalog tables from Account A to Account B in a secure manner with the help of services like AWS Lake Formation (LF), S3, Lambda, and Amazon SQS.


Intake Process and Manifest File

Here, a manifest.json file is the single most important system configuration file and makes it clear who has access to what, such as role, database name, account ID, and permissions granted to them. Within our firm, the service intake procedure is run using ServiceNow. A requester raises a ServiceNow (SNOW) ticket with each relevant piece of information arranged within structured forms. The resulting manifest.json file is then generated and placed into an S3 bucket located in Account A after the ticket is approved within our backend systems.

Data Sharing Process

  1. Producer Lambda in Account A
    • An event in AWS Lambda (producer.py) is scheduled to occur whenever the manifest.json file is dropped in Account A S3 bucket.

    • The producer Lambda checks the validity of the request and verifies the following:

      • If the request is for same-account or cross-account access.
      • Whether the S3 bucket is registered in AWS Lake Formation (LF).
    • Once validation is complete, the producer Lambda sends a message to an Amazon SQS queue in Account B.

    • This message includes details about AWS Resource Access Manager (RAM), which facilitates cross-account resource sharing.


  2. Consumer Lambda in Account B
    • Upon receiving the SQS message, a consumer Lambda function (consumer.py) in Account B is triggered.
    • It processes the request and grants the necessary permissions in Lake Formation for Account B’s role to access the shared Glue Data Catalog tables from Account A.
    • Once access is granted, a corresponding database/table entry will be created in Account B by the consumer lambda for users to query against this shared data.


This provides automated, scalable architecture for secure, governed, and efficient sharing of data among AWS accounts while allowing compliance utilizing AWS Lake Formation, IAM roles, and AWS Glue Data Catalog.


Below are the provided sample JSON, Producer Code, and Consumer Code which can be used to create and run the above architecture.


Manifest.JSON

{
    "Records": [{
        "AccessType": "grant",
        "Principal": "arn of IAM user/role in account a (if granting same account)",

        "Table": {
            "DatabaseName": "database name",
            "Name": "table name",
            "Wildcard":false
          },

        "Permissions": ["SELECT"],
        "Cross_account": true,
        "AccountID": "112233445566",
        "Cross_role":"arn of cross account IAM user/role (if granting cross account)"
    }]
}


Producer Code - producer.py

import boto3
import json
import logging
import os
from botocore.exceptions import ClientError

# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Environment Variables
ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN']
LF_REGISTER_ROLE = os.environ['LF_ROLE']
QUEUE_URL = os.environ['CROSS_QUEUE_URL']
S3_POLICY_ARN = os.environ['S3_POLICY_ARN']
S3_POLICY_NAME = os.environ['S3_POLICY_NAME']


def get_ram_invite(ram_client):
    """Retrieve Resource Access Manager (RAM) invite for cross-account sharing."""
    try:
        response = ram_client.get_resource_share_associations(
            associationType='PRINCIPAL', associationStatus='ASSOCIATING'
        )
        return response['resourceShareAssociations'][0]['resourceShareArn']
    except Exception as e:
        logger.error(f"Error retrieving RAM invite: {e}")
        raise


def delete_oldest_policy_version(iam_client, policy_arn):
    """Deletes the oldest version of an IAM policy (max 5 versions allowed)."""
    try:
        versions = iam_client.list_policy_versions(PolicyArn=policy_arn)['Versions']
        non_default_versions = [v['VersionId'] for v in versions if not v['IsDefaultVersion']]

        if non_default_versions:
            oldest_version = min(non_default_versions)
            iam_client.delete_policy_version(PolicyArn=policy_arn, VersionId=oldest_version)
    except Exception as e:
        logger.error(f"Error deleting old policy version: {e}")
        raise


def update_lf_s3_policy(iam_client, iam_resource, bucket_name):
    """Modifies the Lake Formation IAM policy to include S3 paths."""
    try:
        account_id = boto3.client('sts').get_caller_identity()['Account']
        policy_arn = f'arn:aws:iam::{account_id}:policy/{S3_POLICY_NAME}'
        policy = iam_resource.Policy(policy_arn)
        policy_json = policy.default_version.document

        s3_arn = f'arn:aws:s3:::{bucket_name}'
        updated = False

        for statement in policy_json['Statement']:
            if s3_arn not in statement['Resource']:
                statement['Resource'].append(f'{s3_arn}/*')
                updated = True

        if updated:
            delete_oldest_policy_version(iam_client, S3_POLICY_ARN)
            iam_client.create_policy_version(
                PolicyArn=policy_arn,
                PolicyDocument=json.dumps(policy_json),
                SetAsDefault=True
            )
    except Exception as e:
        logger.error(f"Error updating LF S3 policy: {e}")
        raise


def register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource, is_table=True):
    """Registers an S3 location with Lake Formation."""
    try:
        s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else \
                      glue_client.get_database(Name=database)['Database']['LocationUri']

        bucket_name = s3_location.split('/')[2]
        registered_buckets = [res['ResourceArn'].split(':::')[1] for res in lf_client.list_resources()['ResourceInfoList']]

        if bucket_name not in registered_buckets:
            lf_client.register_resource(
                ResourceArn=f'arn:aws:s3:::{bucket_name}',
                UseServiceLinkedRole=False,
                RoleArn=LF_REGISTER_ROLE
            )
            update_lf_s3_policy(iam_client, iam_resource, bucket_name)
    except ClientError as e:
        logger.error(f"Error registering S3 location: {e}")
        raise


def grant_data_location_permissions(lf_client, glue_client, principal, database, table, is_table=True):
    """Grants Data Location Permissions to a principal."""
    try:
        s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else \
                      glue_client.get_database(Name=database)['Database']['LocationUri']

        bucket_name = s3_location.split('/')[2]

        lf_client.grant_permissions(
            Principal={'DataLakePrincipalIdentifier': principal},
            Resource={'DataLocation': {'ResourceArn': f'arn:aws:s3:::{bucket_name}'}},
            Permissions=['DATA_LOCATION_ACCESS'],
            PermissionsWithGrantOption=['DATA_LOCATION_ACCESS']
        )
    except ClientError as e:
        logger.error(f"Error granting Data Location Permissions: {e}")


def create_resource(database, table=None, wildcard=False):
    """Creates a resource dictionary for granting permissions."""
    if database and table:
        return {'Table': {'DatabaseName': database, 'Name': table}}
    elif database and wildcard:
        return {'Table': {'DatabaseName': database, 'TableWildcard': {}}}
    elif database:
        return {'Database': {'Name': database}}
    return None


def revoke_permission(lf_client, principal, permissions, database, table, wildcard):
    """Revokes permissions from a principal."""
    try:
        resource = create_resource(database, table, wildcard)
        lf_client.revoke_permissions(
            Principal={'DataLakePrincipalIdentifier': principal},
            Resource=resource,
            Permissions=permissions
        )
    except Exception as e:
        logger.error(f"Error revoking permissions for {principal}: {e}")
        raise


def lambda_handler(event, context):
    """Lambda function to process S3 event and manage Lake Formation permissions."""
    try:
        sts_client = boto3.client('sts')
        assume_role_response = sts_client.assume_role(
            RoleArn=ADMIN_ROLE_ARN,
            RoleSessionName='LFSession'
        )

        aws_session = boto3.session.Session(
            aws_access_key_id=assume_role_response['Credentials']['AccessKeyId'],
            aws_secret_access_key=assume_role_response['Credentials']['SecretAccessKey'],
            aws_session_token=assume_role_response['Credentials']['SessionToken']
        )

        s3_client = aws_session.client("s3")
        bucket_name = event['Records'][0]['s3']['bucket']['name']
        file_key = event['Records'][0]['s3']['object']['key']
        obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
        json_content = json.loads(obj["Body"].read().decode('utf-8'))

        # Extracting manifest file details
        record = json_content['Records'][0]
        access_type = record['AccessType']
        principal = record['Principal']
        database = record['Table']['DatabaseName']
        table = record['Table']['Name']
        permissions = record['Permissions']
        cross_account = record['Cross_account']

        glue_client = aws_session.client('glue')
        lf_client = aws_session.client('lakeformation')
        iam_client = aws_session.client('iam')
        iam_resource = aws_session.resource('iam')

        if access_type == 'revoke':
            revoke_permission(lf_client, principal, permissions, database, table, wildcard=False)
        else:
            register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource)
            grant_data_location_permissions(lf_client, glue_client, principal, database, table)

    except Exception as e:
        logger.error(f"Lambda execution error: {e}")
        raise



Consumer Code - consumer.py

import boto3
import json
import logging
import os
from botocore.exceptions import ClientError

# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Environment Variables
ACCOUNT_A = os.environ['SRC_ACC_NUM']
ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN']


def assume_role(role_arn):
    """Assume AWS IAM Role and return a session with temporary credentials."""
    sts_client = boto3.client('sts')
    try:
        response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="LFSession")
        return boto3.session.Session(
            aws_access_key_id=response['Credentials']['AccessKeyId'],
            aws_secret_access_key=response['Credentials']['SecretAccessKey'],
            aws_session_token=response['Credentials']['SessionToken']
        )
    except ClientError as e:
        logger.error(f"Error assuming role {role_arn}: {e}")
        raise


def get_ram_invite(ram_client, ram_arn):
    """Retrieve a Resource Access Manager (RAM) invitation."""
    try:
        response = ram_client.get_resource_share_invitations(resourceShareArns=[ram_arn])
        return response['resourceShareInvitations'][0]['resourceShareInvitationArn']
    except ClientError as e:
        logger.error(f"Error retrieving RAM invite: {e}")
        raise


def accept_ram_invite(ram_client, ram_invite):
    """Accept a RAM invitation."""
    try:
        ram_client.accept_resource_share_invitation(resourceShareInvitationArn=ram_invite)
    except ClientError:
        logger.info("RAM invite already accepted")


def create_database(glue_client, database_name):
    """Create a Glue database if it does not already exist."""
    try:
        glue_client.create_database(DatabaseInput={'Name': database_name})
        logger.info(f"Created database: {database_name}")
    except ClientError:
        logger.info(f"Database {database_name} already exists")


def create_resource_link_database(glue_client, rl_name, source_db, account_id):
    """Create a resource link for a shared Glue database."""
    try:
        glue_client.create_database(DatabaseInput={
            'Name': rl_name,
            "TargetDatabase": {
                "CatalogId": account_id,
                "DatabaseName": source_db
            }
        })
        logger.info(f"Created resource link database: {rl_name}")
    except ClientError:
        logger.info(f"Resource link {rl_name} already exists")


def create_resource_link_table(glue_client, rl_db, rl_table, source_db, source_table, account_id):
    """Create a resource link for a shared Glue table."""
    try:
        glue_client.create_table(
            DatabaseName=rl_db,
            TableInput={
                "Name": rl_table,
                "TargetTable": {
                    "CatalogId": account_id,
                    "DatabaseName": source_db,
                    "Name": source_table
                }
            }
        )
        logger.info(f"Created resource link table: {rl_table}")
    except ClientError:
        logger.info(f"Resource link table {rl_table} already exists")


def grant_permissions(lf_client, principal, resource, permissions):
    """Grant permissions to a principal on a specified resource."""
    try:
        lf_client.grant_permissions(
            Principal={"DataLakePrincipalIdentifier": principal},
            Resource=resource,
            Permissions=permissions,
            PermissionsWithGrantOption=permissions
        )
    except ClientError as e:
        logger.error(f"Error granting permissions to {principal}: {e}")
        raise


def revoke_permissions(lf_client, principal, resource, permissions):
    """Revoke permissions from a principal."""
    try:
        lf_client.revoke_permissions(
            Principal={"DataLakePrincipalIdentifier": principal},
            Resource=resource,
            Permissions=permissions
        )
    except ClientError as e:
        logger.error(f"Error revoking permissions from {principal}: {e}")
        raise


def construct_resource(database, table=None, wildcard=False, catalog_id=None):
    """Construct the resource dictionary for permissions."""
    if table:
        return {"Table": {"DatabaseName": database, "Name": table, **({"CatalogId": catalog_id} if catalog_id else {})}}
    elif wildcard:
        return {"Table": {"DatabaseName": database, "TableWildcard": {}}}
    else:
        return {"Database": {"Name": database}}


def lambda_handler(event, context):
    """Lambda function to process SQS messages and manage Lake Formation permissions."""
    try:
        records = [json.loads(record["body"]) for record in event['Records']]
    except (json.JSONDecodeError, KeyError) as e:
        logger.error(f"Error processing event data: {e}")
        return

    aws_session = assume_role(ADMIN_ROLE_ARN)

    # AWS Clients
    lf_client = aws_session.client('lakeformation')
    glue_client = aws_session.client('glue')
    ram_client = aws_session.client('ram')

    for record in records:
        ram_arn = record.get('ram_url')
        principal = record.get('cross_role')
        database = record.get('db_name')
        table = record.get('table_name')
        permissions = record.get('permissions', [])
        wildcard = record.get('wild_card', False)
        access_type = record.get('access_type')

        rl_database = f'rl_{database}'
        db_target = f'{database}_shared'
        table_target = f'rl_{table}'

        if access_type == 'grant':
            try:
                ram_invite = get_ram_invite(ram_client, ram_arn)
                accept_ram_invite(ram_client, ram_invite)
            except Exception as e:
                logger.error(f"Error accepting RAM invite: {e}")

        # Handle Database/Table Creation
        if database and table:
            create_database(glue_client, db_target)
            create_resource_link_table(glue_client, db_target, table_target, database, table, ACCOUNT_A)
        elif database:
            create_resource_link_database(glue_client, rl_database, database, ACCOUNT_A)

        # Handle Permissions
        try:
            resource_db = construct_resource(db_target)
            resource_table = construct_resource(db_target, table_target)

            if access_type == 'grant':
                if database and table:
                    grant_permissions(lf_client, principal, resource_db, ['ALL'])
                    grant_permissions(lf_client, principal, resource_table, permissions)
                elif database:
                    resource_wildcard = construct_resource(database, wildcard=True)
                    grant_permissions(lf_client, principal, resource_wildcard, permissions)
            else:
                if database and table:
                    revoke_permissions(lf_client, principal, resource_db, ['ALL'])
                    revoke_permissions(lf_client, principal, resource_table, permissions)
                elif database:
                    resource_wildcard = construct_resource(rl_database, wildcard=True)
                    revoke_permissions(lf_client, principal, resource_wildcard, permissions)
        except Exception as e:
            logger.error(f"Error modifying permissions: {e}")


Runtime Notes:

For producer script:

  1. Create Lambda Function and upload the producer.py code
  2. Add an environment variable called ADMIN_ROLE_ARN and add the Data Lake Admin role ARN as the value
  3. Add an environment variable called CROSS_QUEUE_URL and add the consumer queue URL as the value
  4. Add an environment variable called LF_ROLE and add Lake Formation Service Role arn for Account A
  5. Add an environment variable called S3_POLICY_ARN and add s3 custom policy arn as the value


For consumer script:

  1. Create an AWS Lambda Function and upload the consumer.py code
  2. Add an environment variable called SRC_ACC_NUM and provide the source AWS account number as the value
  3. Add an environment variable called ADMIN_ROLE_ARN and add the Data Lake Admin Role arn as the value


Conclusion

Using AWS's Lake Formation, Glue Data Catalog, IAM, and S3 to put a Data Mesh into action gives you a way to spread out data ownership that's both flexible and safe, all while keeping a close eye on things. With the help of Lambda, SQS, and AWS Resource Access Manager (RAM), sharing data across different accounts can be automated, making it simpler for organizations to control access and let different teams handle their own data products without a hitch. This setup lets people do their own data analysis while still following the rules for compliance and security. As the world of data keeps changing, embracing a method like this, which is both unified and well-regulated, can make data easier to get to, boost teamwork, and lead to better decisions throughout a company.


References