112 lines
4.2 KiB
Python
112 lines
4.2 KiB
Python
"""
|
|
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
|
|
NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
|
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
|
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
|
|
|
|
This function is designed to send a reminder through SNS
|
|
when secretsmanager automatic rotation event arrives. It
|
|
does not rotate any secret.
|
|
|
|
Secretsmanager initiate rotation in 4 steps [1], and some of
|
|
these steps must be implemented even if the purpose of this
|
|
function is not to rotate any secret.
|
|
|
|
[1] https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotate-secrets_lambda-functions.html
|
|
"""
|
|
import boto3
|
|
import os
|
|
|
|
|
|
# Read sns topic from lambda environment variable
|
|
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
|
|
|
|
# lambda handler
|
|
def lambda_handler(event, context):
|
|
# debug use
|
|
# print(f"DEBUG: Event received by Lambda: {event}")
|
|
|
|
secret_id = event['SecretId']
|
|
token = event['ClientRequestToken']
|
|
step = event['Step']
|
|
|
|
# Secretsmanager sends 4 rotation events, but we will only use the createSecret event
|
|
# and the finishSecret event
|
|
if step == "createSecret":
|
|
# send notification and create a new secret from existing secret
|
|
send_notification(secret_id, token)
|
|
elif step == "finishSecret":
|
|
# set new secret with version AWSCURRENT
|
|
swap_current_version(secret_id, token)
|
|
else:
|
|
print(f"Steps other than createSecret and finishSecret will be ignored: {step}")
|
|
|
|
def send_notification(secret_id, token):
|
|
print(f"Clone secret and send notification for {secret_id}")
|
|
sm_client = boto3.client('secretsmanager')
|
|
"""
|
|
A new secret version is required by rotation workflow
|
|
We will simply copy existing version to a new version
|
|
label it AWSPENDING
|
|
"""
|
|
orig_secret = sm_client.get_secret_value(
|
|
SecretId=secret_id,
|
|
VersionStage='AWSCURRENT'
|
|
)['SecretString']
|
|
|
|
response = sm_client.put_secret_value(
|
|
SecretId=secret_id,
|
|
ClientRequestToken=token,
|
|
SecretString=orig_secret,
|
|
VersionStages=['AWSPENDING']
|
|
)
|
|
print(f"Retrieved existing secret and saved it as AWSPENDING. Version id is {response['VersionId']}")
|
|
|
|
# Send out reminder about secret rotation
|
|
sns_client = boto3.client('sns')
|
|
sns_client.publish(
|
|
TopicArn=SNS_TOPIC_ARN,
|
|
Message=f"""Hello Cloud Operation team,
|
|
|
|
The following secret is due for update. Please perform the following steps to rotate the secret:
|
|
|
|
{secret_id}
|
|
|
|
1. Open the secret on secretsmanager.
|
|
2. Click the Retrieve secret value botton to reveal the Edit button.
|
|
3. Click on Edit and change the secret string to a new one. Click save to commit your change.
|
|
4. Update the password for the underlying resource (e.g. redis or rds)
|
|
5. Optionally update your application configuration with the new credential if it does not fetch from secretsmanager automatically
|
|
|
|
""",
|
|
Subject='Secret rotation reminder for ' + secret_id.split(":")[6]
|
|
)
|
|
print(f"Notification sent to {SNS_TOPIC_ARN}")
|
|
|
|
def swap_current_version(secret_id, token):
|
|
sm_client = boto3.client('secretsmanager')
|
|
metadata = sm_client.describe_secret(SecretId=secret_id)
|
|
pending_version_id = None
|
|
current_version_id = None
|
|
for version in metadata["VersionIdsToStages"]:
|
|
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
|
|
current_version_id = version
|
|
elif "AWSPENDING" in metadata["VersionIdsToStages"][version]:
|
|
pending_version_id = version
|
|
|
|
print(f"Remove {current_version_id} from AWSCURRENT and point AWSCURRENT to {pending_version_id}")
|
|
sm_client.update_secret_version_stage(
|
|
SecretId=secret_id,
|
|
VersionStage="AWSCURRENT",
|
|
MoveToVersionId=pending_version_id,
|
|
RemoveFromVersionId=current_version_id
|
|
)
|
|
|
|
print(f"Remove AWSPENDING from {pending_version_id}")
|
|
sm_client.update_secret_version_stage(
|
|
SecretId=secret_id,
|
|
VersionStage='AWSPENDING',
|
|
RemoveFromVersionId=pending_version_id
|
|
) |