Files
aws.ssm.documents/ListCloudtrailsInMemberAccounts.yaml
T
2026-01-28 18:46:26 +08:00

107 lines
3.6 KiB
YAML

schemaVersion: '0.3'
parameters:
ExecutionRole:
type: String
assumeRole: '{{ ExecutionRole }}'
mainSteps:
- name: GetParameter
action: aws:executeAwsApi
nextStep: ConvertInputToList
isEnd: false
inputs:
Service: ssm
Api: GetParameter
Name: SsmParamaterWhichContainsCommaSeparatedAccountIds
outputs:
- Name: Accounts
Selector: $.Parameter.Value
Type: String
# Convert csv to a list, which is requred by aws:loop
- name: ConvertInputToList
action: aws:executeScript
nextStep: Loop
isEnd: false
inputs:
Runtime: python3.11
Handler: script_handler
InputPayload:
accounts: '{{ GetParameter.Accounts }}'
Script: |
def script_handler(events, context):
return events['accounts'].split(',')
outputs:
- Name: AccountList
Selector: $.Payload
Type: StringList
- name: Loop
action: aws:loop
isEnd: true
inputs:
Iterators: '{{ ConvertInputToList.AccountList }}'
IteratorDataType: StringList
Steps:
- name: ListTrails
action: aws:executeScript
isEnd: true
inputs:
Runtime: python3.11
Handler: script_handler
InputPayload:
accountId: '{{Loop.CurrentIteratorValue}}'
Script: |
import boto3
import botocore.exceptions
from botocore.exceptions import ClientError
def assume_role(role_arn, role_session_name="CloudTrailSession"):
"""
Assume an IAM role and return boto3 session with temporary credentials
"""
sts_client = boto3.client('sts')
try:
response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=role_session_name
)
credentials = response['Credentials']
return boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
except ClientError as e:
print(f"Error assuming role: {e}")
return None
def describe_cloudtrails(session):
"""
Describe all CloudTrail trails using the assumed role session
"""
try:
cloudtrail = session.client('cloudtrail')
response = cloudtrail.describe_trails()
trails = response['trailList']
if not trails:
print("No CloudTrail trails found.")
return
for trail in trails:
print(f"ARN: {trail['TrailARN']}")
except ClientError as e:
print(f"Error describing CloudTrails: {e}")
def script_handler(events, context):
ROLE_ARN = f"arn:aws:iam::{events.get('accountId')[0]}:role/OrganizationReadOnlyRole"
session = assume_role(ROLE_ARN)
if session:
describe_cloudtrails(session)
return {"accountId": events.get('accountId')}