feat: LambdaAccessKey module
This commit is contained in:
@@ -0,0 +1,69 @@
|
||||
import json
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
import base64
|
||||
import hashlib
|
||||
# from cryptography.fernet import Fernet
|
||||
|
||||
def decrypt_data(encrypted_data: str, secret_key: str) -> str:
|
||||
key_hash = hashlib.sha256(secret_key.encode()).digest()
|
||||
encrypted_bytes = base64.b64decode(encrypted_data.encode())
|
||||
|
||||
decrypted = bytes(b ^ key_hash[i % len(key_hash)] for i, b in enumerate(encrypted_bytes))
|
||||
return decrypted.decode()
|
||||
|
||||
def lambda_handler(event, context):
|
||||
# 1. Extract parameters from the incoming Lambda event payload
|
||||
role_arn = "${target_role}"
|
||||
session_name = "AssumedRole"
|
||||
|
||||
# Validation: Ensure the Role ARN was provided
|
||||
if not role_arn:
|
||||
return {
|
||||
"statusCode": 400,
|
||||
"body": json.dumps(
|
||||
{"error": "Missing required parameter: 'role_arn'"}
|
||||
),
|
||||
}
|
||||
|
||||
# 2. Initialize the STS client
|
||||
# Note: Lambda uses its own Execution Role to make this call.
|
||||
# Ensure the Lambda role has the 'sts:AssumeRole' permission for the target ARN.
|
||||
sts_client = boto3.client("sts")
|
||||
|
||||
try:
|
||||
# 3. Assume the target role
|
||||
response = sts_client.assume_role(
|
||||
RoleArn=role_arn,
|
||||
RoleSessionName=session_name,
|
||||
ExternalId='${external_id}'
|
||||
)
|
||||
|
||||
# Extract the credentials block
|
||||
credentials = response["Credentials"]
|
||||
plainText = f"export AWS_ACCESS_KEY_ID={credentials["AccessKeyId"]} AWS_SECRET_ACCESS_KEY={credentials["SecretAccessKey"]} AWS_SESSION_TOKEN={credentials["SessionToken"]}"
|
||||
|
||||
# Encrypt the credentials
|
||||
key_hash = hashlib.sha256('${encryption_pass}'.encode()).digest()
|
||||
encrypted = bytes(b ^ key_hash[i % len(key_hash)] for i, b in enumerate(plainText.encode()))
|
||||
|
||||
# 4. Return the standard Lambda proxy response containing the JSON payload
|
||||
return {
|
||||
"statusCode": 200,
|
||||
"body": json.dumps(
|
||||
{
|
||||
"result" : base64.b64encode(encrypted).decode()
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
except ClientError as e:
|
||||
return {
|
||||
"statusCode": 500,
|
||||
"body": json.dumps(
|
||||
{
|
||||
"error": "Failed to assume role",
|
||||
"details": e.response["Error"]["Message"],
|
||||
}
|
||||
),
|
||||
}
|
||||
Reference in New Issue
Block a user