119 lines
4.3 KiB
Python
119 lines
4.3 KiB
Python
"""
|
|
# The MIT License
|
|
Copyright (c) 2025 Nobody
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy of this software
|
|
and associated documentation files (the “Software”), to deal in the Software without restriction,
|
|
including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so,
|
|
subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in all copies or substantial
|
|
portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
|
|
NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
|
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
|
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
|
|
|
|
This function is designed to send a reminder through SNS
|
|
when secretsmanager automatic rotation event arrives. It
|
|
does not rotate any secret.
|
|
|
|
Secretsmanager initiate rotation in 4 steps [1], and some of
|
|
these steps must be implemented even if the purpose of this
|
|
function is not to rotate any secret.
|
|
|
|
This lambda function requires the following permission:
|
|
{
|
|
"Effect": "Allow",
|
|
"Action": [
|
|
"SNS:Publish",
|
|
"secretsmanager:DescribeSecret",
|
|
"secretsmanager:ListSecretVersionIds",
|
|
"secretsmanager:UpdateSecretVersionStage",
|
|
"secretsmanager:GetSecretValue",
|
|
"secretsmanager:PutSecretValue"
|
|
],
|
|
"Resource": "*"
|
|
}
|
|
|
|
[1] https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotate-secrets_lambda-functions.html
|
|
"""
|
|
import boto3
|
|
import os
|
|
|
|
|
|
# Read sns topic from lambda environment variable
|
|
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
|
|
|
|
# lambda handler
|
|
def lambda_handler(event, context):
|
|
secret_id = event['SecretId']
|
|
token = event['ClientRequestToken']
|
|
step = event['Step']
|
|
|
|
# Secretsmanager sends 4 rotation events, but we will only use the createSecret event
|
|
# and the finishSecret event
|
|
if step == "createSecret":
|
|
# send notification and create a new secret from existing secret
|
|
send_notification(secret_id, token)
|
|
elif step == "finishSecret":
|
|
# set new secret with version AWSCURRENT
|
|
swap_current_version(secret_id, token)
|
|
else:
|
|
print(f"Steps other than createSecret and finishSecret will be ignored: {step}")
|
|
return True
|
|
|
|
def send_notification(secret_id: str, token: str) -> None:
|
|
print(f"Clone secret and send notification for {secret_id}")
|
|
sm_client = boto3.client('secretsmanager')
|
|
"""
|
|
A new secret version is required by rotation workflow
|
|
We will simply copy existing version to a new version
|
|
label it AWSPENDING
|
|
"""
|
|
orig_secret = sm_client.get_secret_value(
|
|
SecretId=secret_id,
|
|
VersionStage='AWSCURRENT'
|
|
)['SecretString']
|
|
|
|
sm_client.put_secret_value(
|
|
SecretId=secret_id,
|
|
ClientRequestToken=token,
|
|
SecretString=orig_secret,
|
|
VersionStages=['AWSPENDING']
|
|
)
|
|
# Send out reminder about secret rotation
|
|
sns_client = boto3.client('sns')
|
|
sns_client.publish(
|
|
TopicArn=SNS_TOPIC_ARN,
|
|
Message=f'Your secret {secret_id} is due for update. Please change it on secretsmanager and on your applications.',
|
|
Subject='Secret rotation reminder for ' + secret_id.split(":")[6]
|
|
)
|
|
|
|
def swap_current_version(secret_id: str, token: str) -> None:
|
|
print("Point AWSCURRENT to new secret version")
|
|
sm_client = boto3.client('secretsmanager')
|
|
metadata = sm_client.describe_secret(SecretId=secret_id)
|
|
current_version = None
|
|
for version in metadata["VersionIdsToStages"]:
|
|
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
|
|
current_version = version
|
|
break
|
|
sm_client.update_secret_version_stage(
|
|
SecretId=secret_id,
|
|
VersionStage="AWSCURRENT",
|
|
MoveToVersionId=token,
|
|
RemoveFromVersionId=current_version
|
|
)
|
|
|
|
print("Remove AWSPENDING staging label")
|
|
sm_client.update_secret_version_stage(
|
|
SecretId=secret_id,
|
|
VersionStage='AWSPENDING',
|
|
RemoveFromVersionId=token
|
|
)
|