In today's world, where data is power, traditional centralized designs often become chokepoints that impede both access to data and innovation. Data Mesh is a modern approach that decentralizes ownership and treats data as a product managed by domain teams. AWS Lake Formation (LF) simplifies secure, cross-account sharing and governance, including all three: control of what users can do across accounts, and keeping track of compliance with regulations and standards. Leveraging Lake Formation, AWS Lambda, SQS, IAM, and S3 an organization can now implement a truly scalable Data Mesh architecture fostering self-serve analytics interoperability and federated governance without compromising security.
The architecture follows a Data Mesh design that is concerned with two AWS accounts: Account A (Producer) and Account B (Consumer). The objective is to transfer AWS Glue Data Catalog tables from Account A to Account B in a secure manner with the help of services like AWS Lake Formation (LF), S3, Lambda, and Amazon SQS.
Here, a manifest.json file is the single most important system configuration file and makes it clear who has access to what, such as role, database name, account ID, and permissions granted to them. Within our firm, the service intake procedure is run using ServiceNow. A requester raises a ServiceNow (SNOW) ticket with each relevant piece of information arranged within structured forms. The resulting manifest.json file is then generated and placed into an S3 bucket located in Account A after the ticket is approved within our backend systems.
An event in AWS Lambda (producer.py) is scheduled to occur whenever the manifest.json file is dropped in Account A S3 bucket.
The producer Lambda checks the validity of the request and verifies the following:
Once validation is complete, the producer Lambda sends a message to an Amazon SQS queue in Account B.
This message includes details about AWS Resource Access Manager (RAM), which facilitates cross-account resource sharing.
This provides automated, scalable architecture for secure, governed, and efficient sharing of data among AWS accounts while allowing compliance utilizing AWS Lake Formation, IAM roles, and AWS Glue Data Catalog.
Below are the provided sample JSON, Producer Code, and Consumer Code which can be used to create and run the above architecture.
Manifest.JSON
{
"Records": [{
"AccessType": "grant",
"Principal": "arn of IAM user/role in account a (if granting same account)",
"Table": {
"DatabaseName": "database name",
"Name": "table name",
"Wildcard":false
},
"Permissions": ["SELECT"],
"Cross_account": true,
"AccountID": "112233445566",
"Cross_role":"arn of cross account IAM user/role (if granting cross account)"
}]
}
Producer Code - producer.py
import boto3
import json
import logging
import os
from botocore.exceptions import ClientError
# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Environment Variables
ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN']
LF_REGISTER_ROLE = os.environ['LF_ROLE']
QUEUE_URL = os.environ['CROSS_QUEUE_URL']
S3_POLICY_ARN = os.environ['S3_POLICY_ARN']
S3_POLICY_NAME = os.environ['S3_POLICY_NAME']
def get_ram_invite(ram_client):
"""Retrieve Resource Access Manager (RAM) invite for cross-account sharing."""
try:
response = ram_client.get_resource_share_associations(
associationType='PRINCIPAL', associationStatus='ASSOCIATING'
)
return response['resourceShareAssociations'][0]['resourceShareArn']
except Exception as e:
logger.error(f"Error retrieving RAM invite: {e}")
raise
def delete_oldest_policy_version(iam_client, policy_arn):
"""Deletes the oldest version of an IAM policy (max 5 versions allowed)."""
try:
versions = iam_client.list_policy_versions(PolicyArn=policy_arn)['Versions']
non_default_versions = [v['VersionId'] for v in versions if not v['IsDefaultVersion']]
if non_default_versions:
oldest_version = min(non_default_versions)
iam_client.delete_policy_version(PolicyArn=policy_arn, VersionId=oldest_version)
except Exception as e:
logger.error(f"Error deleting old policy version: {e}")
raise
def update_lf_s3_policy(iam_client, iam_resource, bucket_name):
"""Modifies the Lake Formation IAM policy to include S3 paths."""
try:
account_id = boto3.client('sts').get_caller_identity()['Account']
policy_arn = f'arn:aws:iam::{account_id}:policy/{S3_POLICY_NAME}'
policy = iam_resource.Policy(policy_arn)
policy_json = policy.default_version.document
s3_arn = f'arn:aws:s3:::{bucket_name}'
updated = False
for statement in policy_json['Statement']:
if s3_arn not in statement['Resource']:
statement['Resource'].append(f'{s3_arn}/*')
updated = True
if updated:
delete_oldest_policy_version(iam_client, S3_POLICY_ARN)
iam_client.create_policy_version(
PolicyArn=policy_arn,
PolicyDocument=json.dumps(policy_json),
SetAsDefault=True
)
except Exception as e:
logger.error(f"Error updating LF S3 policy: {e}")
raise
def register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource, is_table=True):
"""Registers an S3 location with Lake Formation."""
try:
s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else \
glue_client.get_database(Name=database)['Database']['LocationUri']
bucket_name = s3_location.split('/')[2]
registered_buckets = [res['ResourceArn'].split(':::')[1] for res in lf_client.list_resources()['ResourceInfoList']]
if bucket_name not in registered_buckets:
lf_client.register_resource(
ResourceArn=f'arn:aws:s3:::{bucket_name}',
UseServiceLinkedRole=False,
RoleArn=LF_REGISTER_ROLE
)
update_lf_s3_policy(iam_client, iam_resource, bucket_name)
except ClientError as e:
logger.error(f"Error registering S3 location: {e}")
raise
def grant_data_location_permissions(lf_client, glue_client, principal, database, table, is_table=True):
"""Grants Data Location Permissions to a principal."""
try:
s3_location = glue_client.get_table(DatabaseName=database, Name=table)['Table']['StorageDescriptor']['Location'] if is_table else \
glue_client.get_database(Name=database)['Database']['LocationUri']
bucket_name = s3_location.split('/')[2]
lf_client.grant_permissions(
Principal={'DataLakePrincipalIdentifier': principal},
Resource={'DataLocation': {'ResourceArn': f'arn:aws:s3:::{bucket_name}'}},
Permissions=['DATA_LOCATION_ACCESS'],
PermissionsWithGrantOption=['DATA_LOCATION_ACCESS']
)
except ClientError as e:
logger.error(f"Error granting Data Location Permissions: {e}")
def create_resource(database, table=None, wildcard=False):
"""Creates a resource dictionary for granting permissions."""
if database and table:
return {'Table': {'DatabaseName': database, 'Name': table}}
elif database and wildcard:
return {'Table': {'DatabaseName': database, 'TableWildcard': {}}}
elif database:
return {'Database': {'Name': database}}
return None
def revoke_permission(lf_client, principal, permissions, database, table, wildcard):
"""Revokes permissions from a principal."""
try:
resource = create_resource(database, table, wildcard)
lf_client.revoke_permissions(
Principal={'DataLakePrincipalIdentifier': principal},
Resource=resource,
Permissions=permissions
)
except Exception as e:
logger.error(f"Error revoking permissions for {principal}: {e}")
raise
def lambda_handler(event, context):
"""Lambda function to process S3 event and manage Lake Formation permissions."""
try:
sts_client = boto3.client('sts')
assume_role_response = sts_client.assume_role(
RoleArn=ADMIN_ROLE_ARN,
RoleSessionName='LFSession'
)
aws_session = boto3.session.Session(
aws_access_key_id=assume_role_response['Credentials']['AccessKeyId'],
aws_secret_access_key=assume_role_response['Credentials']['SecretAccessKey'],
aws_session_token=assume_role_response['Credentials']['SessionToken']
)
s3_client = aws_session.client("s3")
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_key = event['Records'][0]['s3']['object']['key']
obj = s3_client.get_object(Bucket=bucket_name, Key=file_key)
json_content = json.loads(obj["Body"].read().decode('utf-8'))
# Extracting manifest file details
record = json_content['Records'][0]
access_type = record['AccessType']
principal = record['Principal']
database = record['Table']['DatabaseName']
table = record['Table']['Name']
permissions = record['Permissions']
cross_account = record['Cross_account']
glue_client = aws_session.client('glue')
lf_client = aws_session.client('lakeformation')
iam_client = aws_session.client('iam')
iam_resource = aws_session.resource('iam')
if access_type == 'revoke':
revoke_permission(lf_client, principal, permissions, database, table, wildcard=False)
else:
register_s3_location(iam_client, s3_client, glue_client, lf_client, database, table, iam_resource)
grant_data_location_permissions(lf_client, glue_client, principal, database, table)
except Exception as e:
logger.error(f"Lambda execution error: {e}")
raise
Consumer Code - consumer.py
import boto3
import json
import logging
import os
from botocore.exceptions import ClientError
# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Environment Variables
ACCOUNT_A = os.environ['SRC_ACC_NUM']
ADMIN_ROLE_ARN = os.environ['ADMIN_ROLE_ARN']
def assume_role(role_arn):
"""Assume AWS IAM Role and return a session with temporary credentials."""
sts_client = boto3.client('sts')
try:
response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="LFSession")
return boto3.session.Session(
aws_access_key_id=response['Credentials']['AccessKeyId'],
aws_secret_access_key=response['Credentials']['SecretAccessKey'],
aws_session_token=response['Credentials']['SessionToken']
)
except ClientError as e:
logger.error(f"Error assuming role {role_arn}: {e}")
raise
def get_ram_invite(ram_client, ram_arn):
"""Retrieve a Resource Access Manager (RAM) invitation."""
try:
response = ram_client.get_resource_share_invitations(resourceShareArns=[ram_arn])
return response['resourceShareInvitations'][0]['resourceShareInvitationArn']
except ClientError as e:
logger.error(f"Error retrieving RAM invite: {e}")
raise
def accept_ram_invite(ram_client, ram_invite):
"""Accept a RAM invitation."""
try:
ram_client.accept_resource_share_invitation(resourceShareInvitationArn=ram_invite)
except ClientError:
logger.info("RAM invite already accepted")
def create_database(glue_client, database_name):
"""Create a Glue database if it does not already exist."""
try:
glue_client.create_database(DatabaseInput={'Name': database_name})
logger.info(f"Created database: {database_name}")
except ClientError:
logger.info(f"Database {database_name} already exists")
def create_resource_link_database(glue_client, rl_name, source_db, account_id):
"""Create a resource link for a shared Glue database."""
try:
glue_client.create_database(DatabaseInput={
'Name': rl_name,
"TargetDatabase": {
"CatalogId": account_id,
"DatabaseName": source_db
}
})
logger.info(f"Created resource link database: {rl_name}")
except ClientError:
logger.info(f"Resource link {rl_name} already exists")
def create_resource_link_table(glue_client, rl_db, rl_table, source_db, source_table, account_id):
"""Create a resource link for a shared Glue table."""
try:
glue_client.create_table(
DatabaseName=rl_db,
TableInput={
"Name": rl_table,
"TargetTable": {
"CatalogId": account_id,
"DatabaseName": source_db,
"Name": source_table
}
}
)
logger.info(f"Created resource link table: {rl_table}")
except ClientError:
logger.info(f"Resource link table {rl_table} already exists")
def grant_permissions(lf_client, principal, resource, permissions):
"""Grant permissions to a principal on a specified resource."""
try:
lf_client.grant_permissions(
Principal={"DataLakePrincipalIdentifier": principal},
Resource=resource,
Permissions=permissions,
PermissionsWithGrantOption=permissions
)
except ClientError as e:
logger.error(f"Error granting permissions to {principal}: {e}")
raise
def revoke_permissions(lf_client, principal, resource, permissions):
"""Revoke permissions from a principal."""
try:
lf_client.revoke_permissions(
Principal={"DataLakePrincipalIdentifier": principal},
Resource=resource,
Permissions=permissions
)
except ClientError as e:
logger.error(f"Error revoking permissions from {principal}: {e}")
raise
def construct_resource(database, table=None, wildcard=False, catalog_id=None):
"""Construct the resource dictionary for permissions."""
if table:
return {"Table": {"DatabaseName": database, "Name": table, **({"CatalogId": catalog_id} if catalog_id else {})}}
elif wildcard:
return {"Table": {"DatabaseName": database, "TableWildcard": {}}}
else:
return {"Database": {"Name": database}}
def lambda_handler(event, context):
"""Lambda function to process SQS messages and manage Lake Formation permissions."""
try:
records = [json.loads(record["body"]) for record in event['Records']]
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Error processing event data: {e}")
return
aws_session = assume_role(ADMIN_ROLE_ARN)
# AWS Clients
lf_client = aws_session.client('lakeformation')
glue_client = aws_session.client('glue')
ram_client = aws_session.client('ram')
for record in records:
ram_arn = record.get('ram_url')
principal = record.get('cross_role')
database = record.get('db_name')
table = record.get('table_name')
permissions = record.get('permissions', [])
wildcard = record.get('wild_card', False)
access_type = record.get('access_type')
rl_database = f'rl_{database}'
db_target = f'{database}_shared'
table_target = f'rl_{table}'
if access_type == 'grant':
try:
ram_invite = get_ram_invite(ram_client, ram_arn)
accept_ram_invite(ram_client, ram_invite)
except Exception as e:
logger.error(f"Error accepting RAM invite: {e}")
# Handle Database/Table Creation
if database and table:
create_database(glue_client, db_target)
create_resource_link_table(glue_client, db_target, table_target, database, table, ACCOUNT_A)
elif database:
create_resource_link_database(glue_client, rl_database, database, ACCOUNT_A)
# Handle Permissions
try:
resource_db = construct_resource(db_target)
resource_table = construct_resource(db_target, table_target)
if access_type == 'grant':
if database and table:
grant_permissions(lf_client, principal, resource_db, ['ALL'])
grant_permissions(lf_client, principal, resource_table, permissions)
elif database:
resource_wildcard = construct_resource(database, wildcard=True)
grant_permissions(lf_client, principal, resource_wildcard, permissions)
else:
if database and table:
revoke_permissions(lf_client, principal, resource_db, ['ALL'])
revoke_permissions(lf_client, principal, resource_table, permissions)
elif database:
resource_wildcard = construct_resource(rl_database, wildcard=True)
revoke_permissions(lf_client, principal, resource_wildcard, permissions)
except Exception as e:
logger.error(f"Error modifying permissions: {e}")
For producer script:
For consumer script:
Using AWS's Lake Formation, Glue Data Catalog, IAM, and S3 to put a Data Mesh into action gives you a way to spread out data ownership that's both flexible and safe, all while keeping a close eye on things. With the help of Lambda, SQS, and AWS Resource Access Manager (RAM), sharing data across different accounts can be automated, making it simpler for organizations to control access and let different teams handle their own data products without a hitch. This setup lets people do their own data analysis while still following the rules for compliance and security. As the world of data keeps changing, embracing a method like this, which is both unified and well-regulated, can make data easier to get to, boost teamwork, and lead to better decisions throughout a company.