Compare commits

...

57 Commits

Author SHA1 Message Date
xpk 9aa7c7b372 feat: output in markdown table 2026-06-05 09:58:43 +08:00
xpk b48a836ce1 feat: script to create ssm patch report 2026-04-26 22:01:17 +08:00
xpk 0a5de23ae0 feat: changed output format 2026-03-15 23:45:02 +08:00
xpk 64290109a6 feat: cloudfront-usage.py which calculates the usage of past 3 months 2026-03-15 23:15:09 +08:00
xpk a732a5c86f feat: script to delete all SG rules and finally the SG itself 2026-03-05 17:40:49 +08:00
xpk 05d5813ddc chore: moved ssm documents to dedicated repo 2026-01-28 18:43:24 +08:00
xpk 1735d52396 feat: a couple of starter ssm documents which uses aws:loop 2026-01-28 16:10:13 +08:00
xpk a05fee8786 feat: my first step function 2026-01-08 15:55:47 +08:00
xpk 3baa1996c2 feat: Added several demo programs 2025-11-28 08:29:28 +08:00
xpk 9d044c20c9 feat: added examples_tutorials 2025-11-26 23:20:41 +08:00
xpk 96ef5cb42e feat: added password quality check 2025-11-26 13:14:29 +08:00
xpk 953305814e new: script to calculate positions based on golden ratio 2025-11-26 09:02:52 +08:00
xpk ca7f67dfc8 fix: accept -i as short of -insane 2025-11-25 12:27:34 +08:00
xpk 0253bfb482 fix: increased insane password length 2025-11-25 12:25:10 +08:00
xpk fc355b6b55 fix: removed hard-coded length 2025-11-25 12:24:22 +08:00
xpk c223080874 feat: added -insane mode to pwgen.py 2025-11-25 12:22:41 +08:00
xpk 52b20fa9dc feat: making pwgen.py returns longer password 2025-11-25 11:38:16 +08:00
xpk 6eedb471f2 feat: password generator using pet names and random number 2025-11-25 11:23:42 +08:00
xpk 7e6e33397d feat: python loop demo 2025-11-25 10:59:00 +08:00
xpk 6c01a4f55c style: minor changes 2025-11-24 15:40:40 +08:00
xpk 3e5bb0547a feat: added function caching and assertion 2025-11-24 11:10:13 +08:00
xpk e75abcf971 feat: experimenting with ThreadPoolExecutor 2025-11-17 21:45:20 +08:00
xpk db7d1ee205 UPD: Added counter to s3-restore-status.py 2025-11-11 21:33:24 +08:00
xpk 7bc9fd255c UPD: Modified s3-restore-status.py to use csv library 2025-11-11 19:18:30 +08:00
xpk ae71b76fc6 NEW: Script to report s3 deep archive restore status 2025-11-11 19:10:31 +08:00
xpk e474e375a9 NEW: Script to report s3 deep archive restore status 2025-11-11 19:09:22 +08:00
xpk 7020d5dc38 UPD: Increased sleep wait and added retry loop 2025-11-11 17:51:23 +08:00
xpk 30195dd4f2 UPD: Increased sleep wait 2025-11-11 17:37:38 +08:00
xpk 352d06e317 UPD: manually optimizaed code written by cursor 2025-11-11 16:23:23 +08:00
xpk b74c82943b UPD: Minor update on script output messages and documentation 2025-11-11 12:38:56 +08:00
xpk 6284be7394 NEW: Rewritten bash script in python with help from Cursor 2025-11-11 12:33:39 +08:00
xpk 8df2378b57 UPD: Added comments to s3 deep archive restore scripts 2025-11-10 14:43:41 +08:00
xpk d02ac78961 NEW: batch restore deep archive objects 2025-11-10 14:29:03 +08:00
xpk c0e9f7b8c1 NEW: Added if-usage.sh showing various use of the if statement 2025-10-30 09:50:13 +08:00
xpk 4abba069f9 NEW: Some new scripts 2025-10-25 16:52:03 +08:00
xpk 52f8735047 UPD: Added actions to high_risk_actions set 2025-09-18 09:41:23 +08:00
xpk dd3b364013 UPD: Added actions to high_risk_actions set 2025-09-18 09:37:48 +08:00
xpk 4d29313b0d FIX: Lambda and Ec2 role policy check now only scan for allowed actions 2025-09-18 09:29:23 +08:00
xpk 5572f61ab9 FIX: Examination of Lambda and EC2 roles now work 2025-09-17 21:42:24 +08:00
xpk 2942483744 UPD: Added IAM permission check for lambda and ec2 roles 2025-09-17 20:56:57 +08:00
xpk 33a976be9a DOC: updated comment for elasticache-iam-auth.py 2025-08-11 23:07:56 +08:00
xpk d5e24c4825 NEW: Elasticache and IAM auth 2025-08-11 21:21:05 +08:00
xpk 03797d6e8c NEW: various python files 2025-06-18 21:22:41 +08:00
xpk 6c764730d3 NEW: packer files and minor update to aws-assume-role.py 2025-06-10 09:49:00 +08:00
xpk af3d1fe7b7 NEW: Python script for sending secret rotation reminder 2025-05-28 17:58:52 +08:00
xpk f5dafe8cbb DOC: Added readme for RandomPassword
Provided instructions to create native executable
2025-05-13 15:56:50 +08:00
xpk 72f06362f8 NEW: bash-chacha20.sh
Script to encrypt file using chacha20
2025-04-23 09:35:26 +08:00
xpk 244e3a1726 DOC: added comment to aws/aws-inventory-from-awsconfig.py
Commenting limitation of inventory collection using AWS config
2025-04-11 08:18:08 +08:00
xpk 1dd495bab6 UPD: Added sg-delete-all-rules.sh 2025-03-31 16:13:57 +08:00
xpk f039deada8 UPD: Updated AwsEnvReview.py and showing instance name in the report 2025-02-27 09:25:02 +08:00
xpk ad638bb6fb NEW: Script to see if commit in current branch is already in master 2025-02-19 16:58:05 +08:00
xpk 3fc4f0e24b NEW: minor update 2025-02-17 16:26:19 +08:00
xpk f7ff657d47 NEW: dump iam policies 2025-02-17 16:24:54 +08:00
xpk 2a0da86de0 UPD: Added comments and cloudtrail encruption now only scan for current region 2024-12-30 11:52:47 +08:00
xpk 2594fd7b2b NEW: mysql-proxy example 2024-12-19 20:14:00 +08:00
xpk 5919febbb6 UPD: Replacing kms-create-import-key.sh 2024-11-05 22:20:22 +08:00
xpk 546871562f UPD: Added kms-external-km.sh which replaces kms-create-import-key.sh 2024-11-05 22:19:51 +08:00
58 changed files with 2229 additions and 761 deletions
+51
View File
@@ -0,0 +1,51 @@
{
"Comment": "A description of my state machine",
"StartAt": "ListAccounts",
"States": {
"ListAccounts": {
"Type": "Task",
"Parameters": {},
"Resource": "arn:aws:states:::aws-sdk:organizations:listAccounts",
"OutputPath": "$.Accounts[*].Id",
"Next": "Map"
},
"Map": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "Lambda Invoke",
"States": {
"Lambda Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-southeast-1:111122223333:function:xpk-test:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"End": true
}
}
},
"End": true,
"ItemsPath": "$"
}
},
"QueryLanguage": "JSONPath"
}
+460 -156
View File
@@ -1,4 +1,4 @@
#!/usr/bin/python3
#!/usr/bin/env python3
"""
Review AWS environment based on 6 WAR pillars, namely:
1. Operational Excellence
@@ -10,61 +10,84 @@ Review AWS environment based on 6 WAR pillars, namely:
"""
import boto3
import botocore
from botocore.exceptions import ClientError
import jmespath
import re
from pprint import pprint
from datetime import date
from mdutils.mdutils import MdUtils
import os
import json
import concurrent.futures
import functools
def printTitle(title):
print("\n")
print("=" * len(title))
print(title.upper())
print("=" * len(title))
def printTitle(level: int, title: str):
if level <= 2:
mdFile.new_header(level=level, title=title)
else:
mdFile.new_paragraph(title)
return
def printSubTitle(title):
print("\n" + title + "\n")
return
def getAllRegions(myclient):
return jmespath.search("Regions[*].RegionName", myclient.describe_regions(AllRegions=False))
@functools.cache
def getAllRegions() -> list:
tmpSession = botoSession(os.environ['AWS_DEFAULT_REGION'])
tmpClient = tmpSession.client('ec2')
regions = jmespath.search("Regions[*].RegionName", tmpClient.describe_regions(AllRegions=False))
assert type(regions) == list
return regions
def getAgeFromDate(inputDate):
today = date.today()
delta = today - inputDate.date()
return delta.days
def printResult(content: list, header: str):
header = "Index, " + header
if len(content) <= 0:
print("👍 No issue found.")
else:
print(header)
print("-" * len(header))
for count, row in enumerate(content):
print(count+1, *row, sep=", ")
mdFile.new_paragraph(" No issue found.")
return
header = "Item," + header
table = header.split(",")
tableCol = len(table)
for count, row in enumerate(content):
row.insert(0, count+1)
table.extend(row)
mdFile.new_line()
mdFile.new_table(columns=tableCol, rows=len(content)+1, text=table, text_align='left')
return
@functools.cache
def botoSession(region:str) -> boto3.Session:
session = boto3.Session(region_name=region)
return session
sts = boto3.client("sts")
aid = sts.get_caller_identity().get("Account")
client = boto3.client('ec2', region_name="us-east-1")
regions = getAllRegions(client)
print("AWS Environment Review - " + str(date.today()) + "\n\n")
printTitle("Ec2 service review")
printSubTitle("[Cost Optimization] Instances stopped for over 14 days - Consider backing up and terminate instances "
"or use AutoScalingGroup to spin up and down instances as needed.")
# initialize md file output
mdFile = MdUtils(file_name='AwsReviewReport.md', title='Aws Review ' + str(date.today()))
mdFile.write("-" * 5)
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
print("Script started. It may take 7+ minutes to run. Report will be saved to AwsReviewReport.md. Please be patient...")
printTitle(3, f"Primary region: {os.environ['AWS_DEFAULT_REGION']}\n")
# create sessions
globalSession = botoSession("us-east-1")
localSession = botoSession(os.environ['AWS_DEFAULT_REGION'])
# create clients
sts = globalSession.client("sts")
ec2Client = localSession.client("ec2")
aid = sts.get_caller_identity().get("Account")
"""Check instances stopped for long time"""
printTitle(1, "Ec2 service review")
printTitle(2, "[Cost Optimization] Instances stopped for over 14 days")
printTitle(3, "Consider backing up and terminate instances "
"or use AutoScalingGroup to spin up and down instances as needed.")
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
@@ -72,35 +95,54 @@ for r in regions:
outTable.append([r, aid, i[0].get("InstanceId"), getAgeFromDate(i[0].get("UsageOperationUpdateTime"))])
printResult(outTable, "Region, AccountID, InstanceId, DaysStopped")
printSubTitle("[Security] Insecure IDMSv1 allowed - Consider requiring IDMSv2. For more information, "
"""Check IMDS version"""
printTitle(2, "[Security] Insecure IDMSv1 allowed")
printTitle(3, "Consider requiring IDMSv2. For more information, "
"see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if i[0].get("MetadataOptions").get("HttpTokens") == "optional":
outTable.append([r, aid, i[0].get("InstanceId"), i[0].get("MetadataOptions").get("HttpTokens") ])
outTable.append([
r,
aid,
i[0].get("InstanceId"),
i[0].get("MetadataOptions").get("HttpTokens")
])
printResult(outTable, "Region, AccountID, InstanceId, IDMSv2")
printSubTitle("[Sustainability] Use of previous generation instance type - "
"Consider using current generation instances")
"""Check EC2 instance generation"""
printTitle(2,"[Sustainability] Use of early generation instance type")
printTitle(3, "Consider using current generation instances")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if re.search("^(t1|t2|m3|m1|m2|m4|c1|c2|c3|c4|r3|r4|i2)", i[0].get("InstanceType")) is not None:
outTable.append([r, aid, i[0].get("InstanceId"), i[0].get("InstanceType")])
printResult(outTable, "Region, AccountID, InstanceId, InstanceType")
if re.search("^(t1|t2|m3|m1|m2|m4|m5|c1|c2|c3|c4|r3|r4|i2)", i[0].get("InstanceType")) is not None:
outTable.append([
r,
aid,
i[0].get("InstanceId"),
jmespath.search("Tags[?Key =='Name'].Value", i[0])[0],
i[0].get("InstanceType")
])
printResult(outTable, "Region, AccountID, InstanceId, InstanceName, InstanceType")
printSubTitle("[Cost Optimization] Unattached EBS volumes - Consider backing up the volumes and delete them")
"""Check unattached EBS volumes"""
printTitle(2, "[Cost Optimization] Unattached EBS volumes")
printTitle(3, "Consider backing up the volumes and delete them")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_volumes(
Filters=[
{
@@ -113,28 +155,34 @@ for r in regions:
outTable.append([r, aid, i.get("VolumeId"), i.get("Size"), i.get("VolumeType")])
printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
printSubTitle("[Cost Optimization] EBS snapshots more than 365 days old - "
"Consider removing snapshots if no longer needed")
"""Check stale EBS snapshots"""
printTitle(2, "[Cost Optimization] EBS snapshots more than 365 days old")
printTitle(3,"Consider removing snapshots if no longer needed")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_snapshots(
OwnerIds=[aid]
)
for i in response.get("Snapshots"):
if getAgeFromDate(i.get("StartTime")) > 365 and i.get("Description") != "This snapshot is created by the AWS Backup service.":
outTable.append([r, aid, i.get("SnapshotId"), i.get("Description")[:70], getAgeFromDate(i.get("StartTime"))])
if getAgeFromDate(i.get("StartTime")) > 365 and i.get(
"Description") != "This snapshot is created by the AWS Backup service.":
outTable.append(
[r, aid, i.get("SnapshotId"), i.get("Description")[:70], getAgeFromDate(i.get("StartTime"))])
printResult(outTable, "Region, AccountID, SnapshotId, Description, SnapshotAge")
printSubTitle("[Security] Unencrypted EBS volumes - Consider replacing volume with encrypted ones. "
"""Check EBS encryption"""
printTitle(2, "[Security] Unencrypted EBS volumes")
printTitle(3, "Consider replacing volume with encrypted ones. "
"One can do so by stopping the Ec2 instance, creating snapshot for the unencrypted volume, "
"copy the snapshot to a new encrypted snapshot, create a volume from the encrypted snapshot,"
"detach the original volume and attach the encrypted volume. Remember to clean up the volumes"
"and snapshots afterwards.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_volumes(
Filters=[
{
@@ -151,25 +199,29 @@ for r in regions:
outTable.append([r, aid, i.get("VolumeId"), i.get("Size"), i.get("VolumeType")])
printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
printSubTitle("[Cost Optimization] Unused Elastic IP - Consider deleting unused EIP")
"""Check unused EIP"""
printTitle(2, "[Cost Optimization] Unused Elastic IP")
printTitle(3, "Consider deleting unused EIP")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_addresses()
for i in response.get("Addresses"):
if i.get("AssociationId") is None:
outTable.append([r, aid, i.get("PublicIp")])
printResult(outTable, "Region, AccountID, PublicIp")
printTitle("Security group review")
printSubTitle("[Security] Security group rules allowing ingress from 0.0.0.0/0 - Consider setting more restrictive rules "
"allowing access from specific sources.")
"""Check unsafe security groups"""
printTitle(1, "Security group review")
printTitle(2, "[Security] Security group rules allowing ingress from 0.0.0.0/0")
printTitle(3, "Consider setting more restrictive rules allowing access from specific sources.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_security_group_rules()
for sgr in jmespath.search("SecurityGroupRules[?IsEgress==`false`]", response):
if (not sgr.get("IsEgress")
@@ -178,15 +230,19 @@ for r in regions:
and sgr.get("ToPort") != 443
and sgr.get("FromPort") != 80
and sgr.get("ToPort") != 80):
outTable.append([r, aid, sgr.get("GroupId"), sgr.get("SecurityGroupRuleId"), sgr.get("FromPort"), sgr.get("ToPort")])
outTable.append(
[r, aid, sgr.get("GroupId"), sgr.get("SecurityGroupRuleId"), sgr.get("FromPort"), sgr.get("ToPort")])
printResult(outTable, "Region, AccountID, SecurityGroup, Rule, FromPort, ToPort")
printTitle("Rds service review")
printSubTitle("[Security] Unencrypted RDS instances - Consider encrypting RDS instances. For more detail, see "
"""Check RDS encryption setting"""
printTitle(1, "Rds service review")
printTitle(2, "[Security] Unencrypted RDS instances")
printTitle(3, "Consider encrypting RDS instances. For more detail, see "
"https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/encrypt-an-existing-amazon-rds-for-postgresql-db-instance.html")
outTable = []
for r in regions:
client = boto3.client('rds', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("rds")
response = client.describe_db_instances()
for i in response.get("DBInstances"):
if i.get("StorageEncrypted") == "False":
@@ -197,12 +253,13 @@ for r in regions:
outTable.append([r, aid, i.get("DBClusterIdentifier"), i.get("Engine")])
printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
printSubTitle("[Reliability] RDS instance running in single availability zone - "
"Consider enabling multi-az for production use.")
"""Check RDS instance in single AZ"""
printTitle(2, "[Reliability] RDS instance running in single availability zone")
printTitle(3, "Consider enabling multi-az for production use.")
outTable = []
for r in regions:
client = boto3.client('rds', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("rds")
response = client.describe_db_instances()
for i in response.get("DBInstances"):
if not i.get("MultiAZ"):
@@ -213,13 +270,15 @@ for r in regions:
outTable.append([r, aid, i.get("DBClusterIdentifier"), i.get("Engine")])
printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
printTitle("Lambda service review")
printSubTitle("[Security] Outdated Lambda runtime - Consider changing to currently supported Lambda runtime versions, "
"""Check outdated lambda runtime"""
printTitle(1, "Lambda service review")
printTitle(2, "[Security] Outdated Lambda runtime")
printTitle(3, "Consider changing to currently supported Lambda runtime versions, "
"listed on https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html")
outTable = []
for r in regions:
client = boto3.client('lambda', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("lambda")
response = client.list_functions()
for i in response.get("Functions"):
if i.get("Runtime") is not None:
@@ -227,12 +286,13 @@ for r in regions:
outTable.append([r, aid, i.get("FunctionName"), i.get("Runtime")])
printResult(outTable, "Region, AccountID, FunctionName, Runtime")
printTitle("Iam service review")
printSubTitle("[Security] Iam user access key not rotated for 180 days - Consider rotating access key")
"""Check IAM access key rotation"""
printTitle(1, "Iam service review")
printTitle(2, "[Security] Iam user access key not rotated for 180 days")
printTitle(3, "Consider rotating access key")
outTable = []
client = boto3.client('iam', region_name="us-east-1")
client = globalSession.client("iam")
listUsers = client.list_users()
users = jmespath.search("Users[*].UserName", listUsers)
for u in users:
@@ -242,12 +302,14 @@ for u in users:
outTable.append([aid, u, i.get("AccessKeyId"), getAgeFromDate(i.get("CreateDate"))])
printResult(outTable, "AccountID, UserName, AccessKeyId, AccessKeyAge")
printSubTitle("[Security] Iam AdministratorAccess policy attached - Consider granting minimum privileges "
"to users/groups/roles. AWS managed policies for job functions are recommended. See "
"https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions.html")
"""Check IAM entity with admin access"""
printTitle(2, "[Security] Iam AdministratorAccess policy attached")
printTitle(3, "Consider granting minimum privileges "
"to users/groups/roles. AWS managed policies for job functions are recommended. See "
"https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions.html")
outTable = []
client = boto3.client('iam', region_name="us-east-1")
client = globalSession.client("iam")
entityResp = client.list_entities_for_policy(
PolicyArn='arn:aws:iam::aws:policy/AdministratorAccess'
)
@@ -260,35 +322,141 @@ for role in jmespath.search("PolicyRoles[*].RoleName", entityResp):
printResult(outTable, "AccountID, Type, Name")
printTitle("Cloudwatch service review")
printSubTitle("[Cost Optimization] Cloudwatch LogGroups without retention period - Consider setting retention")
"""Check Lambda role and IAM instance profile permissions"""
printTitle(2, "[Security] Check permissions of Lamda roles and Ec2 instance roles")
printTitle(3, "Typically these roles should not have admin or iam permissions.")
outTable = []
for r in regions:
client = boto3.client('logs', region_name=r)
# Get a list of roles
client = localSession.client("lambda")
roles = set()
paginator = client.get_paginator('list_functions')
for page in paginator.paginate():
for function in page['Functions']:
role_arn = function.get('Role')
if role_arn:
roles.add(role_arn.split('/')[-1])
client = boto3.client('iam')
paginator = client.get_paginator('list_instance_profiles')
for page in paginator.paginate():
for profile in page['InstanceProfiles']:
profile_name = profile['InstanceProfileName']
instance_roles = profile.get('Roles', [])
for role in instance_roles:
roles.add(role['RoleName'])
# Need to remove non-existent roles
iam_client = globalSession.client("iam")
confirmed_roles = set()
for role in roles:
try:
iam_client.get_role(RoleName=role)
confirmed_roles.add(role)
except ClientError as e:
outTable.append([aid, role, "na", "Role does not exist anymore"])
pass
roles = confirmed_roles
printTitle(3, f"Roles to be examined: {len(confirmed_roles)}")
# predefined actions which should not be granted
high_risk_actions = {
"*",
"iam:*",
"iam:CreateUser",
"iam:AttachUserPolicy",
"iam:CreateRole",
"iam:AttachRolePolicy",
"iam:CreateInstanceProfile",
"iam:CreateLoginProfile",
"iam:CreateOpenIDConnectProvider",
"iam:CreateSAMLProvider",
"iam:CreateServiceLinkedRole"
}
# Check inline policies for each role
client = globalSession.client("iam")
for role in roles:
inline_policy_names = client.list_role_policies(RoleName=role)['PolicyNames']
for policy_name in inline_policy_names:
response = client.get_role_policy(RoleName=role, PolicyName=policy_name)
policy = response['PolicyDocument']
flat_actions = jmespath.search('Statement[?Effect=="Allow"].Action[]', policy)
if flat_actions is None:
print(json.dumps(policy))
outTable.append([aid, role, policy_name, "Single statement policy not supported by this program"])
else:
common = high_risk_actions.intersection(flat_actions)
if len(common) >= 1:
outTable.append([aid, role, policy_name, f"Inline policy contains {common}, please review it"])
# Check managed policies for each role
for role in roles:
attached_policies = client.list_attached_role_policies(RoleName=role)['AttachedPolicies']
for policy in attached_policies:
policy_arn = policy['PolicyArn']
policy_name = policy['PolicyName']
# Get the policy default version
policy_info = client.get_policy(PolicyArn=policy_arn)['Policy']
default_version_id = policy_info['DefaultVersionId']
# Get the policy document of the default version
version = client.get_policy_version(PolicyArn=policy_arn, VersionId=default_version_id)
policy_document = version['PolicyVersion']['Document']
flat_actions = jmespath.search('Statement[?Effect=="Allow"].Action[]', policy_document)
if flat_actions is None:
print(json.dumps(policy_document))
outTable.append([aid, role, policy_name, "Single statement policy not supported by this program"])
else:
common = high_risk_actions.intersection(flat_actions)
if len(common) >= 1:
outTable.append([aid, role, policy_name, f"Managed policy contains {common}, please review it"])
printResult(outTable, "AccountID, RoleName, PolicyName, Issue")
"""Check cloudwatch log group retention"""
printTitle(1, "Cloudwatch service review")
printTitle(2, "[Cost Optimization] Cloudwatch LogGroups without retention period")
printTitle(3, "Consider setting retention")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("logs")
response = client.describe_log_groups()
for i in response.get("logGroups"):
if i.get("retentionInDays") is None:
outTable.append([r, aid, i.get("logGroupName"), int(round(i.get("storedBytes")/1024/1024,0))])
outTable.append([r, aid, i.get("logGroupName"), int(round(i.get("storedBytes") / 1024 / 1024, 0))])
printResult(outTable, "Region, AccountID, LogGroup, SizeMiB")
printSubTitle("[Security] Cloudwatch LogGroups unencrypted - Consider encrypting LogGroups")
"""Check unencrypted cloudwatch log groups"""
printTitle(2, "[Security] Cloudwatch LogGroups unencrypted")
printTitle(3, "Consider encrypting LogGroups")
outTable = []
for r in regions:
client = boto3.client('logs', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("logs")
response = client.describe_log_groups()
for i in response.get("logGroups"):
if i.get("kmsKeyId") is None:
outTable.append([r, aid, i.get("logGroupName")])
printResult(outTable, "Region, AccountID, LogGroup")
printTitle("Backup service review")
printSubTitle("[Reliability] Ec2/Rds instances found but AWSBackup plan missing - "
"Consider setting up AWSBackup plans to backup AWS resources.")
"""Check AWS Backup plan"""
printTitle(1, "Backup service review")
printTitle(2, "[Reliability] Ec2/Rds instances found but AWSBackup plan missing")
printTitle(3, "Consider setting up AWSBackup plans to backup AWS resources.")
outTable = []
for r in regions:
client = boto3.client('backup', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("backup")
response = client.list_backup_plans()
if len(response.get("BackupPlansList")) <= 0:
ec2client = boto3.client("ec2", region_name=r)
@@ -302,11 +470,13 @@ for r in regions:
outTable.append([r, aid, "AWSBackup plan missing", instanceCount])
printResult(outTable, "Region, AccountID, BackupPlan, Ec2RdsInstances")
printTitle("S3 service review")
printSubTitle("[Security] S3 bucket policy missing - Consider creating bucket policy and restrict access to bucket")
"""Check S3 bucket policy"""
printTitle(1, "S3 service review")
printTitle(2, "[Security] S3 bucket policy missing")
printTitle(3, "Consider creating bucket policy and restrict access to bucket")
outTable = []
client = boto3.client('s3', region_name="us-east-1")
client = globalSession.client("s3")
response = client.list_buckets()
for i in jmespath.search("Buckets[*].Name", response):
try:
@@ -315,25 +485,48 @@ for i in jmespath.search("Buckets[*].Name", response):
outTable.append([aid, i])
printResult(outTable, "AccountID, BucketName")
printTitle("ElastiCache review")
printSubTitle("[Sustainability] ElastiCache instances on x64 platform - Consider Graviton instances "
"such as t4g/r7g to optimize your infrastructure investment.")
"""Check s3 public access block"""
printTitle(2, "[Security] S3 public access block")
printTitle(3, "Blocking public access prevents accidental data leak due to misconfigurations in bucket policy or acl")
# get account id
sts = globalSession.client("sts")
account_id = sts.get_caller_identity()["Account"]
s3control = boto3.client("s3control")
try:
response = s3control.get_public_access_block(AccountId=account_id)
config = response["PublicAccessBlockConfiguration"]
is_blocked = all(config.values())
printTitle(3, "Account-level Public Access blocked.")
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration":
printTitle(3, "Account-level Public Access not blocked.")
"""Check elasticache platform"""
printTitle(1, "ElastiCache review")
printTitle(2, "[Sustainability] ElastiCache instances on x64 platform")
printTitle(3, "Consider Graviton instances such as t4g/r7g to optimize your infrastructure investment.")
outTable = []
for r in regions:
client = boto3.client('elasticache', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("elasticache")
response = client.describe_cache_clusters()
for i in response.get("CacheClusters"):
if re.search("[0-9]g.", i.get("CacheNodeType")) is None:
outTable.append([r, aid, i.get("CacheClusterId"), i.get("CacheNodeType")])
printResult(outTable, "Region, AccountID, CacheClusterId, CacheNodeType")
printTitle("LoadBalancer service review")
printSubTitle("[Cost Optimization] LB Target group without targets - Consider removing empty target groups")
"""Check target group with no target"""
printTitle(1, "LoadBalancer service review")
printTitle(2, "[Cost Optimization] LB Target group without targets")
printTitle(3, "Consider removing empty target groups")
outTable = []
for r in regions:
client = boto3.client('elbv2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("elbv2")
response = client.describe_target_groups()
for i in response.get("TargetGroups"):
tgResp = client.describe_target_health(TargetGroupArn=i.get("TargetGroupArn"))
@@ -341,14 +534,17 @@ for r in regions:
outTable.append([r, aid, i.get("TargetGroupName")])
printResult(outTable, "Region, AccountID, TargetGroup")
printTitle("KMS service review")
printSubTitle("[Security] Customer Managed Keys do not have auto rotation enabled - "
"Consider enabling auto key rotation. When a key is rotated, previous ones "
"""Check KMS key rotation"""
printTitle(1, "KMS service review")
printTitle(2, "[Security] Customer Managed Keys do not have auto rotation enabled")
printTitle(3, "Consider enabling auto key rotation. When a key is rotated, previous ones "
"are still kept within AWS to allow data retrival.")
outTable = []
for r in regions:
client = boto3.client('kms', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("kms")
response = client.list_keys()
for i in jmespath.search("Keys[*].KeyId", response):
try:
@@ -362,40 +558,48 @@ for r in regions:
pass
printResult(outTable, "Region, AccountID, KeyId")
printTitle("ApiGateway service review")
printSubTitle("[Security] ApiGateway resource policy missing - Consider restricting access to private API with a "
"""Check API gateway resource policy"""
printTitle(1, "ApiGateway service review")
printTitle(2, "[Security] ApiGateway resource policy missing")
printTitle(3, "Consider restricting access to private API with a "
"policy. Private Api should be accessed through Vpc endpoint and a policy ensures the Api cannot "
"be accessed otherwise. For more detail, see "
"https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-resource-policies-examples.html")
outTable = []
for r in regions:
client = boto3.client('apigateway', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("apigateway")
response = client.get_rest_apis()
for i in response.get("items"):
if "PRIVATE" in i.get("endpointConfiguration").get("types") and len(i.get("policy")) <= 0:
outTable.append([r, aid, i.get("name")])
printResult(outTable, "Region, AccountID, PrivateApiName")
printTitle("Cloudtrail service review")
printSubTitle("[Security] Cloudtrail not encrypted - Consider enabling encryption for cloudtrail")
"""Check cloudtrail for encryption"""
printTitle(1, "Cloudtrail service review")
printTitle(2, "[Security] Cloudtrail not encrypted")
printTitle(3, "Consider enabling encryption for cloudtrail")
outTable = []
for r in regions:
client = boto3.client('cloudtrail', region_name=r)
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("KmsKeyId") is None:
outTable.append([r, aid, i.get("Name")])
printResult(outTable, "Region, AccountID, Trail")
#for r in regions:
client = localSession.client('cloudtrail')
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("KmsKeyId") is None:
outTable.append([aid, i.get("TrailARN")])
printResult(outTable, "AccountID, Trail")
printSubTitle("[Security] Multi-Region cloudtrail not enabled - Consider enabling Multi-Region for at least 1 cloudtrail")
"""Check cloudtrail for multi-region logging"""
printTitle(2, "[Security] Multi-Region cloudtrail not enabled")
printTitle(3, "Consider enabling Multi-Region for at least 1 cloudtrail")
outTable = []
multiRegionTrailCount = 0
for r in regions:
client = boto3.client('cloudtrail', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("cloudtrail")
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("IsMultiRegionTrail"):
@@ -405,30 +609,131 @@ for r in regions:
outTable.append([r, aid, "Missing multi region trail"])
printResult(outTable, "Region, AccountID, Status")
printTitle(1, "Vpc service review")
printTitle("Vpc service review")
printSubTitle("[Reliability] Insufficient VPN tunnels - Consider having 2 tunnels for each site VPN connection. "
"""Check VPC flow log"""
printTitle(2, "[Security] VPC flow log not enabled")
printTitle(3, "Consider enabling VPC flowlog for audit purpose or delete the default VPCs if not in use")
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpcs()
vpc_ids = [vpc['VpcId'] for vpc in response['Vpcs']]
for vpc_id in vpc_ids:
flow_logs_response = client.describe_flow_logs(
Filters=[
{
'Name': 'resource-id',
'Values': [vpc_id]
}
]
)
flow_logs = flow_logs_response.get('FlowLogs', [])
if not flow_logs:
outTable.append([r, aid, vpc_id])
printResult(outTable, "Region, AccountID, VpcId")
"""Check default VPCs"""
printTitle(2, "[Security] Default VPCs should be removed")
printTitle(3, "Consider deleting the default VPCs if not in use")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpcs(
Filters=[{'Name': 'isDefault', 'Values': ['true']}]
)
vpc_ids = [vpc['VpcId'] for vpc in response['Vpcs']]
for vpc_id in vpc_ids:
outTable.append([r, aid, vpc_id])
printResult(outTable, "Region, AccountID, DefaultVpcId")
"""Check VPC endpoints"""
def check_vpc_endpoint(region: str):
fSession = botoSession(region=region)
fClient = fSession.client("ec2")
fResponse = fClient.describe_vpc_endpoints()
if len(fResponse['VpcEndpoints']) <= 0:
return [region, aid, "Vpc endpoint not found"]
else:
return None
printTitle(2, "[Security] Use of VPC endpoints")
printTitle(3, "Consider deploying VPC endpoints and connect to AWS api endpoints privately")
outTable = []
with concurrent.futures.ThreadPoolExecutor() as executor:
# Map returns results in the same order as regions
results = executor.map(check_vpc_endpoint, getAllRegions())
for result in results:
if result:
outTable.append(result)
printResult(outTable, "Region, AccountID, VpcEndpointCount")
"""Check VPN tunnels"""
printTitle(2, "[Reliability] Insufficient VPN tunnels")
printTitle(3, "Consider having 2 tunnels for each site VPN connection. "
"AWS performs VPN tunnel endpoint maintenance rather frequently. Having 2 tunnel reduces the risk "
"of service interruption.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpn_connections()
for i in response.get("VpnConnections"):
if len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i)) < 2:
outTable.append([r, aid, i.get("VpnConnectionId"), len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i))])
outTable.append([r, aid, i.get("VpnConnectionId"),
len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i))])
printResult(outTable, "Region, AccountID, VpnConnection, TunnelCount")
"""Check CF-ALB which allows public access"""
printTitle(2, "[Security] Cloudfront origins allow public access")
printTitle(3, "Your ALB is exposed to public, which bypass edge and WAF protection. Consider restricting access from Cloudfront only")
outTable = []
printTitle("Eks service review")
printSubTitle("[Sustainability] Eks node running on AmazonLinux2 (AL2) - Consider using AmazonLinux2023. "
client = localSession.client("elbv2")
response = client.describe_load_balancers()
alb_sgs = {}
for alb in response['LoadBalancers']:
dns_name = alb.get('DNSName')
security_groups = alb.get('SecurityGroups', [])
alb_sgs[dns_name] = security_groups
client = boto3.client('cloudfront')
distributions = client.list_distributions()
for dist in distributions.get('DistributionList', {}).get('Items', []):
origins = dist['Origins']['Items']
for origin in origins:
if re.match(".*elb.*", origin['DomainName']):
for sg in alb_sgs[origin['DomainName']]:
ec2client = boto3.client('ec2')
s = ec2client.describe_security_groups(GroupIds=[sg])['SecurityGroups'][0]
for source_ip in jmespath.search('IpPermissions[*].IpRanges[*].CidrIp', s):
if '0.0.0.0/0' in source_ip:
outTable.append([aid, dist['Id'], origin['DomainName'], sg])
break
printResult(outTable, "AccountID, CFDistribution, Origin, SecurityGroup")
"""EKS node OS version"""
printTitle(1, "Eks service review")
printTitle(2, "[Sustainability] Eks node running on AmazonLinux2 (AL2)")
printTitle(3, "Consider using AmazonLinux2023. "
"AL2's end of life date is 2025-06-30. AmazonLinux2023 runs on newer kernel and libraries, "
"which offers better performance and security.")
outTable = []
for r in regions:
client = boto3.client('eks', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("eks")
response = client.list_clusters()
for cluster in response.get("clusters"):
ngsResp = client.list_nodegroups(clusterName=cluster)
@@ -441,14 +746,17 @@ for r in regions:
outTable.append([r, aid, cluster, ng, ngResp.get("nodegroup").get("amiType")])
printResult(outTable, "Region, AccountID, Cluster, NodeGroup, AmiType")
printSubTitle("[Sustainability] Eks control plane version outdated - Consider using upgrading Eks cluster. "
"""Check outdated EKS control plane"""
printTitle(2, "[Sustainability] Eks control plane version outdated")
printTitle(3, "Consider using upgrading Eks cluster. "
"Reference https://docs.aws.amazon.com/eks/latest/userguide/kubernetes-versions.html for a list "
"of current versions. Reference https://docs.aws.amazon.com/eks/latest/userguide/update-cluster.html "
"for upgrade instructions.")
outTable = []
for r in regions:
client = boto3.client('eks', region_name=r)
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("eks")
response = client.list_clusters()
for cluster in response.get("clusters"):
clusterResp = client.describe_cluster(name=cluster)
@@ -456,9 +764,5 @@ for r in regions:
outTable.append([r, aid, cluster, clusterResp.get("cluster").get("version")])
printResult(outTable, "Region, AccountID, Cluster, Version")
# TODO
"""
- config enabled for all regions
- list users/groups/roles with administrator access
"""
mdFile.create_md_file()
print("Report written to AwsReviewReport.md")
-479
View File
@@ -1,479 +0,0 @@
#!/usr/bin/python3
"""
Review AWS environment based on 6 WAR pillars, namely:
1. Operational Excellence
2. Security
3. Reliability
4. Performance Efficiency
5. Cost Optimization
6. Sustainability
"""
import boto3
import botocore
import jmespath
import re
from pprint import pprint
from datetime import date
from mdutils.mdutils import MdUtils
def printTitle(level: int, title: str):
if level <= 2:
mdFile.new_header(level=level, title=title)
else:
mdFile.new_paragraph(title)
return
def getAllRegions(myclient):
return jmespath.search("Regions[*].RegionName", myclient.describe_regions(AllRegions=False))
def getAgeFromDate(inputDate):
today = date.today()
delta = today - inputDate.date()
return delta.days
def printResult(content: list, header: str):
if len(content) <= 0:
mdFile.new_paragraph("👏 No issue found.")
return
header = "Item," + header
table = header.split(",")
tableCol = len(table)
for count, row in enumerate(content):
row.insert(0, count+1)
table.extend(row)
mdFile.new_line()
mdFile.new_table(columns=tableCol, rows=len(content)+1, text=table, text_align='left')
return
mdFile = MdUtils(file_name='AwsReviewReport.md', title='Aws Review ' + str(date.today()))
sts = boto3.client("sts")
aid = sts.get_caller_identity().get("Account")
client = boto3.client('ec2', region_name="us-east-1")
regions = getAllRegions(client)
mdFile.write("-" * 5)
printTitle(1, "Ec2 service review")
printTitle(2, "[Cost Optimization] Instances stopped for over 14 days")
printTitle(3, "Consider backing up and terminate instances "
"or use AutoScalingGroup to spin up and down instances as needed.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if i[0].get("State").get("Name") == "stopped":
outTable.append([r, aid, i[0].get("InstanceId"), getAgeFromDate(i[0].get("UsageOperationUpdateTime"))])
printResult(outTable, "Region, AccountID, InstanceId, DaysStopped")
printTitle(2, "[Security] Insecure IDMSv1 allowed")
printTitle(3, "Consider requiring IDMSv2. For more information, "
"see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if i[0].get("MetadataOptions").get("HttpTokens") == "optional":
outTable.append([r, aid, i[0].get("InstanceId"), i[0].get("MetadataOptions").get("HttpTokens")])
printResult(outTable, "Region, AccountID, InstanceId, IDMSv2")
printTitle(2,"[Sustainability] Use of early generation instance type")
printTitle(3, "Consider using current generation instances")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if re.search("^(t1|t2|m3|m1|m2|m4|c1|c2|c3|c4|r3|r4|i2)", i[0].get("InstanceType")) is not None:
outTable.append([r, aid, i[0].get("InstanceId"), i[0].get("InstanceType")])
printResult(outTable, "Region, AccountID, InstanceId, InstanceType")
printTitle(2, "[Cost Optimization] Unattached EBS volumes")
printTitle(3, "Consider backing up the volumes and delete them")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_volumes(
Filters=[
{
'Name': 'status',
'Values': ['available']
}
]
)
for i in response.get("Volumes"):
outTable.append([r, aid, i.get("VolumeId"), i.get("Size"), i.get("VolumeType")])
printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
printTitle(2, "[Cost Optimization] EBS snapshots more than 365 days old")
printTitle(3,"Consider removing snapshots if no longer needed")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_snapshots(
OwnerIds=[aid]
)
for i in response.get("Snapshots"):
if getAgeFromDate(i.get("StartTime")) > 365 and i.get(
"Description") != "This snapshot is created by the AWS Backup service.":
outTable.append(
[r, aid, i.get("SnapshotId"), i.get("Description")[:70], getAgeFromDate(i.get("StartTime"))])
printResult(outTable, "Region, AccountID, SnapshotId, Description, SnapshotAge")
printTitle(2, "[Security] Unencrypted EBS volumes")
printTitle(3, "Consider replacing volume with encrypted ones. "
"One can do so by stopping the Ec2 instance, creating snapshot for the unencrypted volume, "
"copy the snapshot to a new encrypted snapshot, create a volume from the encrypted snapshot,"
"detach the original volume and attach the encrypted volume. Remember to clean up the volumes"
"and snapshots afterwards.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_volumes(
Filters=[
{
'Name': 'encrypted',
'Values': ['false']
},
{
'Name': 'status',
'Values': ['in-use']
}
]
)
for i in response.get("Volumes"):
outTable.append([r, aid, i.get("VolumeId"), i.get("Size"), i.get("VolumeType")])
printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
printTitle(2, "[Cost Optimization] Unused Elastic IP")
printTitle(3, "Consider deleting unused EIP")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_addresses()
for i in response.get("Addresses"):
if i.get("AssociationId") is None:
outTable.append([r, aid, i.get("PublicIp")])
printResult(outTable, "Region, AccountID, PublicIp")
printTitle(1, "Security group review")
printTitle(2, "[Security] Security group rules allowing ingress from 0.0.0.0/0")
printTitle(3, "Consider setting more restrictive rules allowing access from specific sources.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_security_group_rules()
for sgr in jmespath.search("SecurityGroupRules[?IsEgress==`false`]", response):
if (not sgr.get("IsEgress")
and sgr.get("CidrIpv4") == "0.0.0.0/0"
and sgr.get("FromPort") != 443
and sgr.get("ToPort") != 443
and sgr.get("FromPort") != 80
and sgr.get("ToPort") != 80):
outTable.append(
[r, aid, sgr.get("GroupId"), sgr.get("SecurityGroupRuleId"), sgr.get("FromPort"), sgr.get("ToPort")])
printResult(outTable, "Region, AccountID, SecurityGroup, Rule, FromPort, ToPort")
printTitle(1, "Rds service review")
printTitle(2, "[Security] Unencrypted RDS instances")
printTitle(3, "Consider encrypting RDS instances. For more detail, see "
"https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/encrypt-an-existing-amazon-rds-for-postgresql-db-instance.html")
outTable = []
for r in regions:
client = boto3.client('rds', region_name=r)
response = client.describe_db_instances()
for i in response.get("DBInstances"):
if i.get("StorageEncrypted") == "False":
outTable.append([r, aid, i.get("DBInstanceIdentifier"), i.get("Engine")])
response = client.describe_db_clusters()
for i in response.get("DBClusters"):
if i.get("StorageEncrypted") == "False":
outTable.append([r, aid, i.get("DBClusterIdentifier"), i.get("Engine")])
printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
printTitle(2, "[Reliability] RDS instance running in single availability zone")
printTitle(3, "Consider enabling multi-az for production use.")
outTable = []
for r in regions:
client = boto3.client('rds', region_name=r)
response = client.describe_db_instances()
for i in response.get("DBInstances"):
if not i.get("MultiAZ"):
outTable.append([r, aid, i.get("DBInstanceIdentifier"), i.get("Engine")])
response = client.describe_db_clusters()
for i in response.get("DBClusters"):
if not i.get("MultiAZ"):
outTable.append([r, aid, i.get("DBClusterIdentifier"), i.get("Engine")])
printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
printTitle(1, "Lambda service review")
printTitle(2, "[Security] Outdated Lambda runtime")
printTitle(3, "Consider changing to currently supported Lambda runtime versions, "
"listed on https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html")
outTable = []
for r in regions:
client = boto3.client('lambda', region_name=r)
response = client.list_functions()
for i in response.get("Functions"):
if i.get("Runtime") is not None:
if re.search("python2|python3.[678]|java8|nodejs[468]|nodejs1[024]|dotnet6", i.get("Runtime")) is not None:
outTable.append([r, aid, i.get("FunctionName"), i.get("Runtime")])
printResult(outTable, "Region, AccountID, FunctionName, Runtime")
printTitle(1, "Iam service review")
printTitle(2, "[Security] Iam user access key not rotated for 180 days")
printTitle(3, "Consider rotating access key")
outTable = []
client = boto3.client('iam', region_name="us-east-1")
listUsers = client.list_users()
users = jmespath.search("Users[*].UserName", listUsers)
for u in users:
response = client.list_access_keys(UserName=u)
for i in response.get("AccessKeyMetadata"):
if getAgeFromDate(i.get("CreateDate")) > 180:
outTable.append([aid, u, i.get("AccessKeyId"), getAgeFromDate(i.get("CreateDate"))])
printResult(outTable, "AccountID, UserName, AccessKeyId, AccessKeyAge")
printTitle(2, "[Security] Iam AdministratorAccess policy attached")
printTitle(3, "Consider granting minimum privileges "
"to users/groups/roles. AWS managed policies for job functions are recommended. See "
"https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions.html")
outTable = []
client = boto3.client('iam', region_name="us-east-1")
entityResp = client.list_entities_for_policy(
PolicyArn='arn:aws:iam::aws:policy/AdministratorAccess'
)
for group in jmespath.search("PolicyGroups[*].GroupName", entityResp):
outTable.append([aid, "Group", group])
for user in jmespath.search("PolicyUsers[*].UserName", entityResp):
outTable.append([aid, "User", user])
for role in jmespath.search("PolicyRoles[*].RoleName", entityResp):
outTable.append([aid, "Role", role])
printResult(outTable, "AccountID, Type, Name")
printTitle(1, "Cloudwatch service review")
printTitle(2, "[Cost Optimization] Cloudwatch LogGroups without retention period")
printTitle(3, "Consider setting retention")
outTable = []
for r in regions:
client = boto3.client('logs', region_name=r)
response = client.describe_log_groups()
for i in response.get("logGroups"):
if i.get("retentionInDays") is None:
outTable.append([r, aid, i.get("logGroupName"), int(round(i.get("storedBytes") / 1024 / 1024, 0))])
printResult(outTable, "Region, AccountID, LogGroup, SizeMiB")
printTitle(2, "[Security] Cloudwatch LogGroups unencrypted")
printTitle(3, "Consider encrypting LogGroups")
outTable = []
for r in regions:
client = boto3.client('logs', region_name=r)
response = client.describe_log_groups()
for i in response.get("logGroups"):
if i.get("kmsKeyId") is None:
outTable.append([r, aid, i.get("logGroupName")])
printResult(outTable, "Region, AccountID, LogGroup")
printTitle(1, "Backup service review")
printTitle(2, "[Reliability] Ec2/Rds instances found but AWSBackup plan missing")
printTitle(3, "Consider setting up AWSBackup plans to backup AWS resources.")
outTable = []
for r in regions:
client = boto3.client('backup', region_name=r)
response = client.list_backup_plans()
if len(response.get("BackupPlansList")) <= 0:
ec2client = boto3.client("ec2", region_name=r)
ec2resp = ec2client.describe_instances()
ec2instances = jmespath.search("Reservations[*].Instances[*]", ec2resp)
rdsclient = boto3.client("rds", region_name=r)
rdsresp = rdsclient.describe_db_instances()
rdsinstances = rdsresp.get("DBInstances")
instanceCount = len(ec2instances) + len(rdsinstances)
if instanceCount >= 1:
outTable.append([r, aid, "AWSBackup plan missing", instanceCount])
printResult(outTable, "Region, AccountID, BackupPlan, Ec2RdsInstances")
printTitle(1, "S3 service review")
printTitle(2, "[Security] S3 bucket policy missing")
printTitle(3, "Consider creating bucket policy and restrict access to bucket")
outTable = []
client = boto3.client('s3', region_name="us-east-1")
response = client.list_buckets()
for i in jmespath.search("Buckets[*].Name", response):
try:
policyResp = client.get_bucket_policy(Bucket=i)
except:
outTable.append([aid, i])
printResult(outTable, "AccountID, BucketName")
printTitle(1, "ElastiCache review")
printTitle(2, "[Sustainability] ElastiCache instances on x64 platform")
printTitle(3, "Consider Graviton instances such as t4g/r7g to optimize your infrastructure investment.")
outTable = []
for r in regions:
client = boto3.client('elasticache', region_name=r)
response = client.describe_cache_clusters()
for i in response.get("CacheClusters"):
if re.search("[0-9]g.", i.get("CacheNodeType")) is None:
outTable.append([r, aid, i.get("CacheClusterId"), i.get("CacheNodeType")])
printResult(outTable, "Region, AccountID, CacheClusterId, CacheNodeType")
printTitle(1, "LoadBalancer service review")
printTitle(2, "[Cost Optimization] LB Target group without targets")
printTitle(3, "Consider removing empty target groups")
outTable = []
for r in regions:
client = boto3.client('elbv2', region_name=r)
response = client.describe_target_groups()
for i in response.get("TargetGroups"):
tgResp = client.describe_target_health(TargetGroupArn=i.get("TargetGroupArn"))
if len(jmespath.search("TargetHealthDescriptions[*].Target", tgResp)) == 0:
outTable.append([r, aid, i.get("TargetGroupName")])
printResult(outTable, "Region, AccountID, TargetGroup")
printTitle(1, "KMS service review")
printTitle(2, "[Security] Customer Managed Keys do not have auto rotation enabled")
printTitle(3, "Consider enabling auto key rotation. When a key is rotated, previous ones "
"are still kept within AWS to allow data retrival.")
outTable = []
for r in regions:
client = boto3.client('kms', region_name=r)
response = client.list_keys()
for i in jmespath.search("Keys[*].KeyId", response):
try:
keyResp = client.describe_key(KeyId=i)
if (keyResp.get("KeyMetadata").get("Enabled") == "True"
and keyResp.get("KeyMetadata").get("KeyManager") == "CUSTOMER"):
krResp = client.get_key_rotation_status(KeyId=i)
if krResp.get("KeyRotationEnabled") != "False":
outTable.append([r, aid, i])
except:
pass
printResult(outTable, "Region, AccountID, KeyId")
printTitle(1, "ApiGateway service review")
printTitle(2, "[Security] ApiGateway resource policy missing")
printTitle(3, "Consider restricting access to private API with a "
"policy. Private Api should be accessed through Vpc endpoint and a policy ensures the Api cannot "
"be accessed otherwise. For more detail, see "
"https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-resource-policies-examples.html")
outTable = []
for r in regions:
client = boto3.client('apigateway', region_name=r)
response = client.get_rest_apis()
for i in response.get("items"):
if "PRIVATE" in i.get("endpointConfiguration").get("types") and len(i.get("policy")) <= 0:
outTable.append([r, aid, i.get("name")])
printResult(outTable, "Region, AccountID, PrivateApiName")
printTitle(1, "Cloudtrail service review")
printTitle(2, "[Security] Cloudtrail not encrypted")
printTitle(3, "Consider enabling encryption for cloudtrail")
outTable = []
for r in regions:
client = boto3.client('cloudtrail', region_name=r)
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("KmsKeyId") is None:
outTable.append([r, aid, i.get("Name")])
printResult(outTable, "Region, AccountID, Trail")
printTitle(2, "[Security] Multi-Region cloudtrail not enabled")
printTitle(3, "Consider enabling Multi-Region for at least 1 cloudtrail")
outTable = []
multiRegionTrailCount = 0
for r in regions:
client = boto3.client('cloudtrail', region_name=r)
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("IsMultiRegionTrail"):
multiRegionTrailCount += 1
if multiRegionTrailCount <= 0:
outTable.append([r, aid, "Missing multi region trail"])
printResult(outTable, "Region, AccountID, Status")
printTitle(1, "Vpc service review")
printTitle(2, "[Reliability] Insufficient VPN tunnels")
printTitle(3, "Consider having 2 tunnels for each site VPN connection. "
"AWS performs VPN tunnel endpoint maintenance rather frequently. Having 2 tunnel reduces the risk "
"of service interruption.")
outTable = []
for r in regions:
client = boto3.client('ec2', region_name=r)
response = client.describe_vpn_connections()
for i in response.get("VpnConnections"):
if len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i)) < 2:
outTable.append([r, aid, i.get("VpnConnectionId"),
len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i))])
printResult(outTable, "Region, AccountID, VpnConnection, TunnelCount")
printTitle(1, "Eks service review")
printTitle(2, "[Sustainability] Eks node running on AmazonLinux2 (AL2)")
printTitle(3, "Consider using AmazonLinux2023. "
"AL2's end of life date is 2025-06-30. AmazonLinux2023 runs on newer kernel and libraries, "
"which offers better performance and security.")
outTable = []
for r in regions:
client = boto3.client('eks', region_name=r)
response = client.list_clusters()
for cluster in response.get("clusters"):
ngsResp = client.list_nodegroups(clusterName=cluster)
for ng in ngsResp.get("nodegroups"):
ngResp = client.describe_nodegroup(
clusterName=cluster,
nodegroupName=ng
)
if re.search("^AL2_", ngResp.get("nodegroup").get("amiType")):
outTable.append([r, aid, cluster, ng, ngResp.get("nodegroup").get("amiType")])
printResult(outTable, "Region, AccountID, Cluster, NodeGroup, AmiType")
printTitle(2, "[Sustainability] Eks control plane version outdated")
printTitle(3, "Consider using upgrading Eks cluster. "
"Reference https://docs.aws.amazon.com/eks/latest/userguide/kubernetes-versions.html for a list "
"of current versions. Reference https://docs.aws.amazon.com/eks/latest/userguide/update-cluster.html "
"for upgrade instructions.")
outTable = []
for r in regions:
client = boto3.client('eks', region_name=r)
response = client.list_clusters()
for cluster in response.get("clusters"):
clusterResp = client.describe_cluster(name=cluster)
if float(jmespath.search("cluster.version", clusterResp)) < 1.28:
outTable.append([r, aid, cluster, clusterResp.get("cluster").get("version")])
printResult(outTable, "Region, AccountID, Cluster, Version")
mdFile.create_md_file()
print("Report written to AwsReviewReport.md")
# TODO
"""
- config enabled for all regions
"""
+1
View File
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
// limitation: config recorder only provides resource id and name
import boto3
import pandas as pd
+28
View File
@@ -0,0 +1,28 @@
#!/bin/bash
# This script was developed by AWS Support
echo "Exporting customer-managed IAM policies"
policies=$(aws iam list-policies --scope Local --query 'Policies[*].[PolicyName,Arn]' --output text)
while read -r policy_name policy_arn; do
echo "Policy Name: $policy_name"
echo "Policy ARN: $policy_arn"
echo ""
echo "Policy Content:"
aws iam get-policy-version --policy-arn "$policy_arn" --version-id $(aws iam get-policy --policy-arn "$policy_arn" --query 'Policy.DefaultVersionId' --output text) --query 'PolicyVersion.Document' --output json
echo ""
echo "Attached Entities:"
# List attached users
aws iam list-entities-for-policy --policy-arn "$policy_arn" --entity-filter User --query 'PolicyUsers[*].UserName' --output text | tr '\t' '\n' | sed 's/^/User: /'
# List attached groups
aws iam list-entities-for-policy --policy-arn "$policy_arn" --entity-filter Group --query 'PolicyGroups[*].GroupName' --output text | tr '\t' '\n' | sed 's/^/Group: /'
# List attached roles
aws iam list-entities-for-policy --policy-arn "$policy_arn" --entity-filter Role --query 'PolicyRoles[*].RoleName' --output text | tr '\t' '\n' | sed 's/^/Role: /'
echo "----------------------------------------"
done <<< "$policies"
---------------------------
+57
View File
@@ -0,0 +1,57 @@
#!/usr/bin/env python3
r"""
Documentation
License: This program is released under the MIT License
"""
# Imports
import boto3
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import humanize
# Main function
def main() -> None:
# get distributions
cw = boto3.client('cloudwatch', region_name='us-east-1')
cf = boto3.client('cloudfront', region_name='us-east-1')
distros = cf.list_distributions()['DistributionList']['Items']
dist_ids = [d['Id'] for d in distros if d['Status'] == 'Deployed']
print(f"Distributions discovered: {len(dist_ids)}")
# print markdown table headers
print("| DistId | Month | NoRequests | Download |")
print("| ------ | ----- | ---------- | -------- |")
for dist_id in dist_ids:
for month_offset in range(-3, 0): # Past 3 months
now = datetime.now()
first_of_current_month = now.replace(day=1)
start = (first_of_current_month + relativedelta(months=month_offset)).replace(day=1)
end = (start + relativedelta(months=1)) - relativedelta(seconds=1) # Last day
# BytesDownloaded
bytes_resp = cw.get_metric_statistics(
Namespace='AWS/CloudFront', MetricName='BytesDownloaded',
Dimensions=[{'Name': 'DistributionId', 'Value': dist_id}, {'Name': 'Region', 'Value': 'Global'}],
StartTime=start, EndTime=end, Period=2678400, Statistics=['Sum']
)
bytes_total = bytes_resp['Datapoints'][0]['Sum'] if bytes_resp['Datapoints'] else 0
# Requests
reqs_resp = cw.get_metric_statistics(
Namespace='AWS/CloudFront', MetricName='Requests',
Dimensions=[{'Name': 'DistributionId', 'Value': dist_id}, {'Name': 'Region', 'Value': 'Global'}],
StartTime=start, EndTime=end, Period=2678400, Statistics=['Sum']
)
reqs_total = reqs_resp['Datapoints'][0]['Sum'] if reqs_resp['Datapoints'] else 0
# only interested in distro with high traffic (>10G)
# if bytes_total > 10 * 1024 ** 3:
print(f"| {dist_id} | {start:%Y-%b} | {reqs_total:,.0f} | {humanize.naturalsize(bytes_total)} |")
# Call main function
if __name__ == '__main__':
main()
+9
View File
@@ -0,0 +1,9 @@
#!/bin/bash
SG=$1
aws ec2 describe-security-group-rules --filters "Name=group-id,Values=$SG" | jq -cr '.SecurityGroupRules[] | select(.IsEgress == true) | .SecurityGroupRuleId' | parallel 'aws ec2 revoke-security-group-egress --group-id $SG --security-group-rule-ids {}'
aws ec2 describe-security-group-rules --filters "Name=group-id,Values=$SG" | jq -cr '.SecurityGroupRules[] | select(.IsEgress == false) | .SecurityGroupRuleId' | parallel 'aws ec2 revoke-security-group-ingress --group-id $SG --security-group-rule-ids {}'
aws ec2 delete-security-group --group-id $SG
-6
View File
@@ -1,6 +0,0 @@
aws iam list-policies --scope Local | jq -cr '.Policies[].Arn' | while read i; do
VER=$(aws iam get-policy --policy-arn $i | jq -cr .Policy.DefaultVersionId)
POLNAME=$(echo $i | awk -F/ '{print $NF}')
aws iam get-policy-version --policy-arn $i --version-id $VER > iam-policies/$POLNAME.json
done
+40
View File
@@ -0,0 +1,40 @@
#!/usr/bin/python3
import boto3
import json
def list_custom_iam_policies():
# Create an IAM client
iam = boto3.client('iam')
# Initialize a paginator
paginator = iam.get_paginator('list_policies')
# List of custom policies
custom_policies = []
# Iterate through all pages of policies
for page in paginator.paginate(Scope='Local'):
for policy in page['Policies']:
if policy['PolicyName'] != 'AWSManaged':
policy_version = iam.get_policy_version(
PolicyArn=policy['Arn'],
VersionId=policy['DefaultVersionId']
)
custom_policies.append({
'PolicyName': policy['PolicyName'],
'PolicyArn': policy['Arn'],
'PolicyDocument': json.dumps(policy_version['PolicyVersion']['Document'], indent=2, separators=(',', ': '))
})
return custom_policies
if __name__ == "__main__":
custom_iam_policies = list_custom_iam_policies()
for policy in custom_iam_policies:
print(f"Policy Name: {policy['PolicyName']}")
print(f"Policy ARN: {policy['PolicyArn']}")
print("Policy Document:")
print(policy['PolicyDocument'])
print("=" * 40)
-33
View File
@@ -1,33 +0,0 @@
#!/bin/bash
if [ $# -lt 2 ]; then
echo "This tool requires openssl, awscli, jq and base64."
echo "One can generate a key using openssl rand -out PlaintextKeyMaterial.bin 32"
echo "Usage: key-import.sh key-file key-alias"
exit 0
fi
keyAlias=$2
aws kms create-key --origin EXTERNAL --description "Customer managed key" | jq -cr .KeyMetadata.KeyId > keyid.txt
aws kms get-parameters-for-import --key-id $(cat keyid.txt) \
--wrapping-algorithm RSAES_OAEP_SHA_256 \
--wrapping-key-spec RSA_2048 > import.json
cat import.json | jq -cr .PublicKey | base64 -d > PublicKey.bin
cat import.json | jq -cr .ImportToken | base64 -d > ImportToken.bin
openssl pkeyutl -encrypt -in $1 -inkey PublicKey.bin -keyform DER \
-pubin -out EncryptedKeyMaterial.bin -pkeyopt rsa_padding_mode:oaep -pkeyopt rsa_oaep_md:sha256
aws kms import-key-material --key-id $(cat keyid.txt) \
--encrypted-key-material fileb://EncryptedKeyMaterial.bin \
--import-token fileb://ImportToken.bin \
--expiration-model KEY_MATERIAL_DOES_NOT_EXPIRE
aws kms create-alias --alias-name "alias/$keyAlias" --target-key-id $(cat keyid.txt)
aws kms describe-key --key-id $(cat keyid.txt)
rm -f EncryptedKeyMaterial.bin ImportToken.bin PublicKey.bin import.json keyid.txt
+34
View File
@@ -0,0 +1,34 @@
#!/bin/bash
# Script that generates symetric kms key, encrypt a self-generated key material, and import it to kms
KEYID=$(aws kms create-key --origin EXTERNAL \
--description "Customer managed key with externally generated key material" \
--tags TagKey=CreatedWith,TagValue=kms-external-km.sh | jq -cr .KeyMetadata.KeyId)
aws kms get-parameters-for-import \
--key-id $KEYID \
--wrapping-algorithm RSAES_OAEP_SHA_256 \
--wrapping-key-spec RSA_4096 > import-parameters.json
jq -cr .ImportToken import-parameters.json | base64 -d > ImportToken.bin
jq -cr .PublicKey import-parameters.json | base64 -d > WrappingPublicKey.bin
# Generate key material. Replace this with material from HSM if needed
openssl rand -out PlaintextKeyMaterial.bin 32
openssl pkeyutl \
-encrypt \
-in PlaintextKeyMaterial.bin \
-out EncryptedKeyMaterial.bin \
-inkey WrappingPublicKey.bin \
-keyform DER \
-pubin \
-pkeyopt rsa_padding_mode:oaep \
-pkeyopt rsa_oaep_md:sha256 \
-pkeyopt rsa_mgf1_md:sha256
aws kms import-key-material --key-id $KEYID \
--encrypted-key-material fileb://EncryptedKeyMaterial.bin \
--import-token fileb://ImportToken.bin \
--expiration-model KEY_MATERIAL_DOES_NOT_EXPIRE
aws kms describe-key --key-id $KEYID
rm -f WrappingPublicKey.bin ImportToken.bin PlaintextKeyMaterial.bin
+246
View File
@@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""
# S3 Batch Restore Script
# Restores objects from S3 Glacier Deep Archive using AWS S3 Batch Operations.
# Generate objectlist.csv with the following script:
# BUCKET=whk1-bea-icc-mbk-prd-s3-log-infra-log
# PREFIX=elb/alb-icc-mbk/AWSLogs/851239346925/elasticloadbalancing/ap-east-1/2025/08/11/
# aws s3 ls s3://$BUCKET/$PREFIX | awk "{print \"$BUCKET,$PREFIX\"\$NF}" | tee /tmp/objectlist.csv
"""
import sys
import json
import time
import boto3
import random
from botocore.exceptions import ClientError
def generate_random_id():
"""Generate a random alphanumeric ID of specified length."""
return random.randint(1000, 9999)
def create_trust_policy():
"""Create the trust policy document for the IAM role."""
return {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "batchoperations.s3.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
def create_iam_role(iam_client, role_name="S3BatchRestoreRole"):
"""Create IAM role for S3 batch operations."""
trust_policy = create_trust_policy()
try:
# Create role
iam_client.create_role(
RoleName=role_name,
Description="S3 batch restore role",
AssumeRolePolicyDocument=json.dumps(trust_policy)
)
print(f"Created IAM role: {role_name}")
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print(f"IAM role {role_name} already exists, skipping creation")
else:
raise
# Attach policy
try:
iam_client.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess'
)
print(f"Attached policy to role: {role_name}")
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print(f"Policy already attached to {role_name}")
else:
raise
return role_name
def create_manifest_bucket(s3_client, bucket_name, region):
"""Create S3 bucket for manifest file."""
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
print(f"Created manifest bucket: {bucket_name}")
def upload_manifest(s3_client, bucket_name, manifest_file_path, object_key="objectlist.csv"):
"""Upload manifest file to S3 and return ETag."""
try:
with open(manifest_file_path, 'rb') as f:
response = s3_client.put_object(
Bucket=bucket_name,
Key=object_key,
Body=f
)
print(f"Uploaded manifest to s3://{bucket_name}/{object_key}")
# Get ETag (remove quotes if present)
etag = response['ETag'].strip('"')
return etag
except FileNotFoundError:
print(f"Error: Manifest file '{manifest_file_path}' not found", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error uploading manifest: {e}", file=sys.stderr)
sys.exit(1)
def create_manifest_spec(bucket_name, etag, object_key="objectlist.csv"):
"""Create manifest specification for batch operation."""
return {
"Spec": {
"Format": "S3BatchOperations_CSV_20180820",
"Fields": ["Bucket", "Key"]
},
"Location": {
"ObjectArn": f"arn:aws:s3:::{bucket_name}/{object_key}",
"ETag": etag
}
}
def create_report_spec(bucket_name, prefix="batch-reports"):
"""Create report specification for batch operation."""
return {
"Bucket": f"arn:aws:s3:::{bucket_name}",
"Prefix": prefix,
"Format": "Report_CSV_20180820",
"Enabled": True,
"ReportScope": "AllTasks"
}
def create_batch_job(s3control_client, account_id, role_arn, manifest_spec, report_spec,
expiration_days=14, glacier_job_tier="STANDARD", priority=10,
description="Restore objects from Deep Archive"):
"""Create S3 batch restore job."""
operation = {
"S3InitiateRestoreObject": {
"ExpirationInDays": expiration_days,
"GlacierJobTier": glacier_job_tier
}
}
try:
response = s3control_client.create_job(
AccountId=account_id,
Operation=operation,
Manifest=manifest_spec,
Report=report_spec,
Priority=priority,
RoleArn=role_arn,
Description=description
)
job_id = response['JobId']
print(f"Submitted S3 batch job: {job_id}")
return job_id
except ClientError as e:
print(f"Error creating batch job: {e}", file=sys.stderr)
sys.exit(1)
def approve_job(s3control_client, account_id, job_id) -> bool:
"""Approve the batch job to start execution."""
try:
s3control_client.update_job_status(
AccountId=account_id,
JobId=job_id,
RequestedJobStatus='Ready'
)
print(f"Approved job: {job_id}")
return True
except ClientError as e:
print(f"Error approving job: {e}", file=sys.stderr)
return False
def get_account_id(sts_client):
"""Get AWS account ID."""
try:
response = sts_client.get_caller_identity()
return response['Account']
except ClientError as e:
print(f"Error getting account ID: {e}", file=sys.stderr)
sys.exit(1)
def main():
if len(sys.argv) < 2:
print("Usage: python3 s3-batch-restore.py <manifest.csv>", file=sys.stderr)
print("You must first prepare the manifest, which is a csv with content <bucket>:<key>", file=sys.stderr)
sys.exit(1)
manifest_file = sys.argv[1]
# Initialize AWS clients
region = "ap-east-1"
session = boto3.Session(region_name=region)
iam_client = session.client('iam')
s3_client = session.client('s3')
s3control_client = session.client('s3control')
sts_client = session.client('sts')
# Get account ID
account_id = get_account_id(sts_client)
print(f"Using AWS account: {account_id}")
# Create IAM role
role_name = create_iam_role(iam_client)
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
# Create manifest bucket
random_id = generate_random_id()
manifest_bucket = f"deep-archive-batch-restore-{random_id}"
create_manifest_bucket(s3_client, manifest_bucket, session.region_name)
# Upload manifest and get ETag
etag = upload_manifest(s3_client, manifest_bucket, manifest_file)
# Create manifest and report specs (in memory, no temp files)
manifest_spec = create_manifest_spec(manifest_bucket, etag)
report_spec = create_report_spec(manifest_bucket)
# Create batch job
print("Submitting S3 batch job...")
job_id = create_batch_job(
s3control_client,
account_id,
role_arn,
manifest_spec,
report_spec
)
# Wait a bit before approving
time.sleep(5)
# Approve job
print(f"Approving submitted job {job_id}...")
while not approve_job(s3control_client, account_id, job_id):
time.sleep(5)
print(f"\nReview s3 batch job status. When it is completed, delete the manifest bucket:")
print(f"aws s3 rb s3://{manifest_bucket} --force")
if __name__ == "__main__":
main()
+69
View File
@@ -0,0 +1,69 @@
#!/bin/bash
if [ ! -f $1 ]; then
echo "You must first prepare the manifest, which is a csv with content <bucket>:<key>"
exit 1
fi
cat <<EOF > trust-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "batchoperations.s3.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
# Create batch restore role
aws iam create-role \
--role-name S3BatchRestoreRole \
--description "S3 batch restore role" \
--assume-role-policy-document file://trust-policy.json
aws iam attach-role-policy \
--role-name S3BatchRestoreRole \
--policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
# Create manufest bucket and copy manifest over
MANIFEST_BUCKET="deep-archive-restore-batch-$(uuid)"
aws s3 mb s3://$MANIFEST_BUCKET
aws s3 cp $1 s3://$MANIFEST_BUCKET/
echo "Uploaded manifest to s3://$MANIFEST_BUCKET/objectlist.csv"
# Build input json parameters
etag=$(aws s3api list-objects --bucket $MANIFEST_BUCKET --prefix objectlist.csv | jq -cr .Contents[].ETag | tr -d \")
jo -p Spec="$(jo -p Format=S3BatchOperations_CSV_20180820 Fields=$(jo -a Bucket Key))" Location="$(jo -p ObjectArn=arn:aws:s3:::$MANIFEST_BUCKET/objectlist.csv ETag=$etag)" > /tmp/manifest.json
jo Bucket=arn:aws:s3:::$MANIFEST_BUCKET Prefix=batch-reports Format=Report_CSV_20180820 Enabled=true ReportScope=AllTasks > /tmp/report.json
# Submit batch restore job
echo "Submit s3 batch job..."
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
jobid=```aws s3control create-job \
--account-id $ACCOUNT_ID \
--operation '{"S3InitiateRestoreObject": {"ExpirationInDays": 14, "GlacierJobTier": "STANDARD"}}' \
--manifest file:///tmp/manifest.json \
--report file:///tmp/report.json \
--role-arn arn:aws:iam::$ACCOUNT_ID:role/S3BatchRestoreRole \
--description "Restore objects from Deep Archive" \
--priority 10 \
--region ap-east-1 | jq -cr .JobId```
sleep 5
# Approve job
echo "Approve submitted job $jobid..."
aws s3control update-job-status \
--account-id $ACCOUNT_ID \
--job-id $jobid \
--requested-job-status Ready
echo "To delete the manifest bucket:"
echo "aws s3 rb s3://$MANIFEST_BUCKET --force"
+9 -3
View File
@@ -1,6 +1,12 @@
#!/bin/bash
echo "Object key must not start with /"
BUCKET=$1
aws s3 ls s3://$BUCKET --recursive | awk '{print $NF}' | while read i; do
aws s3api restore-object --bucket $BUCKET --key $i --restore-request Days=20
done
# Restore objects
cat /tmp/objectlist.txt | parallel -v -j8 aws s3api restore-object --bucket $BUCKET --key {} --restore-request Days=20
# Check restore status. ongoing-request="true" will be returned if it's in progress
cat /tmp/objectlist.txt | parallel -j8 aws s3api head-object --bucket $BUCKET --key {} --query Restore --output text | sort | uniq -c
# After objects are restored, use aws s3 sync with --force-glacier-transfer to download objects
+123
View File
@@ -0,0 +1,123 @@
#!/usr/bin/env python3
"""
S3 Restore Status Checker
This script checks the restore status of S3 objects that have been archived to
Glacier or Deep Archive storage classes. It reads a CSV file containing bucket
and object key pairs, then queries each object's restore status using the
S3 head_object API.
The script is useful for monitoring the progress of S3 batch restore operations
or checking the status of individual object restorations from Glacier storage.
Usage:
# Normal mode (debug output with filenames):
python3 s3-restore-status.py
# Optimized mode (status only, no filenames):
python3 -O s3-restore-status.py
Input File Format:
The script reads /tmp/objectlist.csv by default. Each line should contain:
<bucket>,<object_key>
Example:
my-bucket,path/to/file1.log
my-bucket,path/to/file2.log
another-bucket,archive/data.json
Output:
The script prints the restore status for each object:
- "Not restoring" if the object has no active restore operation
- Restore status string (e.g., 'ongoing-request="true"') if restoration is in progress
- Restore status with expiry date (e.g., 'ongoing-request="false", expiry-date="..."')
if restoration is complete
Example Output:
Not-being-restored
ongoing-request="true"
ongoing-request="false", expiry-date="Mon, 25 Nov 2025 00:00:00 GMT"
"""
from collections import Counter
import boto3
import csv
import sys
def read_objectlist(path: str = "/tmp/objectlist.csv") -> tuple[list[str], str]:
"""Read object list CSV file and extract bucket and object keys.
Parses a CSV file containing bucket and object key pairs. Each line should
have the format: "<bucket>,<object_key>". The function collects all object
keys and returns the last bucket name encountered (assuming all objects are
in the same bucket).
Uses Python's csv module for proper CSV parsing, which handles edge cases
like quoted fields, escaped characters, and commas within fields.
Args:
path (str): Path to the CSV file containing bucket and object key pairs.
Defaults to "/tmp/objectlist.csv".
Returns:
tuple: A tuple containing:
- list: List of object keys (strings)
- str: The bucket name (last bucket encountered in the file)
Raises:
SystemExit: If the file is not found, the script exits with status code 1.
"""
bucket_to_keys: list[str] = []
bucket: str = ""
try:
with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f)
for row in reader:
bucket = row[0].strip()
key = row[1].strip()
bucket_to_keys.append(key)
except FileNotFoundError:
print(f"Error: object list file not found at {path}", file=sys.stderr)
sys.exit(1)
return bucket_to_keys, bucket
def main():
"""Main function to check restore status of S3 objects.
Reads the object list from CSV file, connects to AWS S3 in ap-east-1 region,
and queries the restore status for each object. Prints the restore status
for each object to stdout.
The restore status indicates:
- "Not-being-restored": Object is not currently being restored (or restore completed and expired)
- Restore status string: Contains restore progress and expiry information
"""
session = boto3.Session(region_name="ap-east-1")
s3_client = session.client('s3')
keys, bucket = read_objectlist(path="/tmp/objectlist.csv")
status_counts: Counter[str] = Counter()
for object in keys:
response = s3_client.head_object(
Bucket=bucket,
Key=object
)
# Split object path by / and get only the last element (filename)
restore_status = response.get('Restore') or "Not-being-restored"
if __debug__:
filename = object.split('/')[-1]
print(f"{filename}: {restore_status}")
# print(restore_status)
status_counts[restore_status] += 1
print(status_counts)
if __name__ == "__main__":
main()
+7
View File
@@ -0,0 +1,7 @@
#!/bin/bash
SG="sg-0a1594ff1259f216b"
aws ec2 describe-security-group-rules --filters Name=group-id,Values=$SG | jq -r '.SecurityGroupRules[] | select(.IsEgress==false) | .SecurityGroupRuleId' | while read r; do
aws ec2 revoke-security-group-ingress --group-id $SG --security-group-rule-ids $r
done
+1
View File
@@ -0,0 +1 @@
aws ssm describe-instance-patch-states --instance-ids $(aws ec2 describe-instances --query Reservations[].Instances[].InstanceId --output text) | jq -cr '["InstanceId", "PatchGroup", "BaselineId", "CriticalNonCompliantCount", "SecurityNonCompliantCount", "OtherNonCompliantCount"],(.InstancePatchStates[] | [.InstanceId, .PatchGroup, .BaselineId, .CriticalNonCompliantCount, .SecurityNonCompliantCount, .OtherNonCompliantCount]) | @csv'
+1 -1
View File
@@ -7,7 +7,7 @@ import java.util.ArrayList;
import java.util.List;
class PasswordGenerator {
private static final int DEFAULT_PASSWORD_LENGTH = 3;
private static final int DEFAULT_PASSWORD_LENGTH = 4;
public static void main(String[] args) {
try {
+5
View File
@@ -0,0 +1,5 @@
# Generate native image
```bash
native-image -Djava.io.tmpdir=~/tmp --parallelism=4 PasswordGenerator
```
+8 -7
View File
@@ -4,13 +4,14 @@ import json
# reference: https://aws.amazon.com/premiumsupport/knowledge-center/start-stop-lambda-eventbridge/
ec2 = boto3.client('ec2', region_name=os.environ['region_name'])
def lambda_handler(event, context):
if (event['action'] == 'start'):
resp = ec2.start_instances(InstanceIds=json.loads(os.environ['instances']))
elif (event['action'] == 'stop'):
resp = ec2.stop_instances(InstanceIds=json.loads(os.environ['instances']))
ec2 = boto3.client('ec2', region_name=os.environ['region_name'])
instances = json.loads(os.environ['instances'])
if event['action'] == 'start':
resp = ec2.start_instances(InstanceIds=instances)
elif event['action'] == 'stop':
resp = ec2.stop_instances(InstanceIds=instances)
else:
resp = "Event action not provided"
raise ValueError("Invalid event action")
return resp
+71
View File
@@ -0,0 +1,71 @@
#!/usr/bin/python3
r"""
Documentation
License: This program is released under the MIT License
"""
# Imports
import boto3
class AwsPrefixList:
def __init__(self):
ec2 = boto3.client('ec2')
response = ec2.describe_managed_prefix_lists(
Filters=[{'Name': 'prefix-list-name', 'Values': ['com.amazonaws.global.cloudfront.origin-facing']}]
)
prefix_lists = response.get('PrefixLists', [])
prefix_list_id = prefix_lists[0]['PrefixListId']
entries = []
paginator = ec2.get_paginator('get_managed_prefix_list_entries')
for page in paginator.paginate(PrefixListId=prefix_list_id):
entries.extend(page.get('Entries', []))
self.pl = [entry['Cidr'] for entry in entries]
self.pl.sort()
def getHash(self):
return hash(tuple(self.pl))
def getTuple(self):
return tuple(self.pl)
def getLength(self):
return len(self.pl)
class WafIpSet:
def __init__(self, name: str, id: str):
waf_client = boto3.client('wafv2')
temp = waf_client.get_ip_set(
Name=name,
Scope='REGIONAL',
Id=id)
self.ip_set = temp["IPSet"]["Addresses"]
self.ip_set.sort()
self.lock_token = temp['LockToken'] # need this to update ipset
def getHash(self):
return hash(tuple(self.ip_set))
def getTuple(self):
return tuple(self.ip_set)
def getLength(self):
return len(self.ip_set)
# Main function
def main() -> None:
pl = AwsPrefixList()
ipset = WafIpSet(name="cloudfront_ip_ipset", id="951120be-31d7-415f-9aa3-5ad9e56b6195")
print(f"PrefixList length: {pl.getLength()}")
print(f"IpSet length: {ipset.getLength()}")
# missing = set(pl.getTuple()) - set(ipset.getTuple())
# notInPl = set(ipset.getTuple()) - set(pl.getTuple())
# print(f"Missing in WAF ipset: {len(missing)}")
# print(f"Not in PL: {len(notInPl)}")
# Call main function
if __name__ == '__main__':
main()
@@ -0,0 +1,118 @@
"""
# The MIT License
Copyright (c) 2025 Nobody
Permission is hereby granted, free of charge, to any person obtaining a copy of this software
and associated documentation files (the “Software”), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial
portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
This function is designed to send a reminder through SNS
when secretsmanager automatic rotation event arrives. It
does not rotate any secret.
Secretsmanager initiate rotation in 4 steps [1], and some of
these steps must be implemented even if the purpose of this
function is not to rotate any secret.
This lambda function requires the following permission:
{
"Effect": "Allow",
"Action": [
"SNS:Publish",
"secretsmanager:DescribeSecret",
"secretsmanager:ListSecretVersionIds",
"secretsmanager:UpdateSecretVersionStage",
"secretsmanager:GetSecretValue",
"secretsmanager:PutSecretValue"
],
"Resource": "*"
}
[1] https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotate-secrets_lambda-functions.html
"""
import boto3
import os
# Read sns topic from lambda environment variable
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
# lambda handler
def lambda_handler(event, context):
secret_id = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
# Secretsmanager sends 4 rotation events, but we will only use the createSecret event
# and the finishSecret event
if step == "createSecret":
# send notification and create a new secret from existing secret
send_notification(secret_id, token)
elif step == "finishSecret":
# set new secret with version AWSCURRENT
swap_current_version(secret_id, token)
else:
print(f"Steps other than createSecret and finishSecret will be ignored: {step}")
return True
def send_notification(secret_id: str, token: str) -> None:
print(f"Clone secret and send notification for {secret_id}")
sm_client = boto3.client('secretsmanager')
"""
A new secret version is required by rotation workflow
We will simply copy existing version to a new version
label it AWSPENDING
"""
orig_secret = sm_client.get_secret_value(
SecretId=secret_id,
VersionStage='AWSCURRENT'
)['SecretString']
sm_client.put_secret_value(
SecretId=secret_id,
ClientRequestToken=token,
SecretString=orig_secret,
VersionStages=['AWSPENDING']
)
# Send out reminder about secret rotation
sns_client = boto3.client('sns')
sns_client.publish(
TopicArn=SNS_TOPIC_ARN,
Message=f'Your secret {secret_id} is due for update. Please change it on secretsmanager and on your applications.',
Subject='Secret rotation reminder for ' + secret_id.split(":")[6]
)
def swap_current_version(secret_id: str, token: str) -> None:
print("Point AWSCURRENT to new secret version")
sm_client = boto3.client('secretsmanager')
metadata = sm_client.describe_secret(SecretId=secret_id)
current_version = None
for version in metadata["VersionIdsToStages"]:
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
current_version = version
break
sm_client.update_secret_version_stage(
SecretId=secret_id,
VersionStage="AWSCURRENT",
MoveToVersionId=token,
RemoveFromVersionId=current_version
)
print("Remove AWSPENDING staging label")
sm_client.update_secret_version_stage(
SecretId=secret_id,
VersionStage='AWSPENDING',
RemoveFromVersionId=token
)
+28
View File
@@ -0,0 +1,28 @@
const https = require('https');
const options = {
hostname: 'blog.headdesk.me',
port: 443,
path: '/',
method: 'GET'
};
const req = https.request(options, (res) => {
console.log('Status Code:', res.statusCode);
console.log('Response Headers:', res.headers);
res.on('data', (chunk) => {
// Consume response data if needed
});
res.on('end', () => {
console.log('Response ended.');
});
});
req.on('error', (e) => {
console.error(`Problem with request: ${e.message}`);
});
req.end();
+39
View File
@@ -0,0 +1,39 @@
# Packer file which query for the latest RHEL9 AMI and print it
packer {
required_plugins {
amazon = {
source = "github.com/hashicorp/amazon"
version = "~> 1"
}
}
}
variable "aws_region" {
type = string
default = "${env("AWS_REGION")}"
}
data "amazon-ami" "rhel" {
filters = {
name = "RHEL-9.*HVM_GA*x86_64*GP3"
root-device-type = "ebs"
virtualization-type = "hvm"
}
most_recent = true
owners = ["amazon"]
region = "${var.aws_region}"
}
source "null" "local" {
communicator = "none"
}
build {
name = "query"
sources = ["null.local"]
provisioner "shell-local" {
inline = ["echo ${data.amazon-ami.rhel.id} ${data.amazon-ami.rhel.name}"]
}
}
+4
View File
@@ -0,0 +1,4 @@
Add-WindowsCapability -Online -Name OpenSSH.Server~~~~0.0.1.0
Start-Service sshd
Set-Service -Name sshd -StartupType 'Automatic'
New-NetFirewallRule -Name "SSH" -DisplayName "OpenSSH Server" -Enabled True -Direction Inbound -Protocol TCP -LocalPort 22 -Action Allow
+50
View File
@@ -0,0 +1,50 @@
Write-Output "Installing PSWindowsUpdate"
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
Install-PackageProvider -Name NuGet -MinimumVersion 2.8.5.201 -Force
Install-Module PSWindowsUpdate -Force -Confirm:$False
Write-Output "Running Get-WindowsUpdate"
Get-WindowsUpdate -MicrosoftUpdate -Category 'Critical Updates', 'Security Updates' -Verbose
Write-Output "Running Invoke-WUJob"
Invoke-WUJob -Script {ipmo PSWindowsUpdate;Get-WindowsUpdate -Install -MicrosoftUpdate -Category 'Critical Updates', 'Security Updates' -AcceptAll -IgnoreReboot -Verbose | Out-File C:\WindowsUpdate.log} -RunNow -Confirm:$false
Write-Output "Get-ScheduledTask -TaskName PSWindowsUpdate"
Get-ScheduledTask -TaskName PSWindowsUpdate
Write-Output "(Get-ScheduledTask -TaskName PSWindowsUpdate).Actions"
(Get-ScheduledTask -TaskName PSWindowsUpdate).Actions
Write-Output "Get-ScheduledTaskInfo -TaskName PSWindowsUpdate"
Get-ScheduledTaskInfo -TaskName PSWindowsUpdate
Write-Output "Get-Content C:\WindowsUpdate.log"
Get-Content "C:\WindowsUpdate.log"
Write-Output "Waiting for PSWindowsUpdate Task to Complete"
$stopWatch = [System.Diagnostics.Stopwatch]::StartNew()
while($true) {
$taskStatus = (Get-ScheduledTask -TaskName PSWindowsUpdate).State
Write-Output "PSWindowsUpdate Task Status - $taskStatus"
if ($taskStatus -ne "Ready" ) {
$elapsed = "{0:D2}:{1:D2}:{2:D2}" -f $stopWatch.Elapsed.Hours, $stopWatch.Elapsed.Minutes, $stopWatch.Elapsed.Seconds
Write-Output "Waiting for completion of task PSWindowsUpdate - $elapsed"
Write-Output "Get-Content C:\WindowsUpdate.log"
Get-Content "C:\WindowsUpdate.log"
Start-Sleep -s 10
}
else {
Write-Output "PSWindowsUpdate Task Completed"
break
}
}
Write-Output "Get-Content C:\WindowsUpdate.log"
Get-Content "C:\WindowsUpdate.log"
$taskStatus = (Get-ScheduledTask -TaskName PSWindowsUpdate).State
Write-Output "PSWindowsUpdate Task Status - $taskStatus"
Write-Output "Get-WUHistory"
Get-WUHistory
+75
View File
@@ -0,0 +1,75 @@
#!/usr/bin/pytho3
from pprint import pprint
import boto3
from datetime import datetime, timezone, timedelta
class Ami:
def __init__(self, name: str, age: int, delete: bool):
self.name = name
self.age = age
self.delete = delete
def __repr__(self):
return f"Ami(name='{self.name}', age={self.age}, delete={self.delete})"
def mark_for_deletion(self):
self.delete = True
ec2 = boto3.client("ec2")
images = ec2.describe_images(
Owners=["self"],
Filters=[
{"Name": "name", "Values": ["var*", "online-ordering-website*"]},
],
)
ami_in_use = []
launch_templates = ec2.describe_launch_templates()
for t in launch_templates.get("LaunchTemplates"):
latest = ec2.describe_launch_template_versions(
LaunchTemplateId=t.get("LaunchTemplateId"),
Versions=["$Latest"],
)
for lt in latest.get("LaunchTemplateVersions"):
ami_in_use.append(lt.get("LaunchTemplateData").get("ImageId"))
all_images = images.get("Images", [])
inactive_images = [i for i in all_images if i.get("ImageId") not in ami_in_use]
my_amis = {}
for i in all_images:
ami_id = i.get("ImageId")
ami_date = datetime.strptime(
i.get("CreationDate"), "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(tzinfo=timezone.utc)
ami_age = datetime.now(timezone.utc) - ami_date
in_use = i.get("ImageId") not in ami_in_use
my_amis[i.get("ImageId")] = Ami(i.get("Name"), ami_age.days, in_use)
bno_images = {
k: v for k, v in my_amis.items() if v.name.startswith("var-backend-non-ordering")
}
bo_images = {
k: v for k, v in my_amis.items() if v.name.startswith("var-backend-ordering")
}
oo_images = {k: v for k, v in my_amis.items() if v.name.startswith("online-ordering")}
# sort and do not delete the newest 2
bno_sorted = sorted(bno_images.items(), key=lambda item: item[1].age)
for key, ami_obj in bno_sorted[:2]:
ami_obj.delete = False
bo_sorted = sorted(bo_images.items(), key=lambda item: item[1].age)
for key, ami_obj in bo_sorted[:2]:
ami_obj.delete = False
oo_sorted = sorted(oo_images.items(), key=lambda item: item[1].age)
for key, ami_obj in oo_sorted[:2]:
ami_obj.delete = False
pprint(bno_sorted)
pprint(bo_sorted)
pprint(oo_sorted)
+20
View File
@@ -0,0 +1,20 @@
"""
This program list all secrets and show their next rotation date if < 30d
"""
import boto3
from datetime import datetime, timezone
print("Secret DaysToNextRotation NextRotationDate")
sm_client = boto3.client('secretsmanager')
paginator = sm_client.get_paginator('list_secrets')
iterator = paginator.paginate()
for page in iterator:
for i in page.get('SecretList'):
if i.get("NextRotationDate") is not None:
NextRotationDate = i.get("NextRotationDate").replace(tzinfo=timezone.utc)
Today = datetime.now(timezone.utc)
Difference = (NextRotationDate - Today).days
# if Difference < 20:
print(i.get("Name"), Difference, NextRotationDate.date())
Regular → Executable
+20 -12
View File
@@ -1,16 +1,24 @@
from typing import NoReturn
import json
#!/usr/bin/env python3.13
from botocore.exceptions import ClientError
import boto3
import base64
import sys
def lambda_handler(event, context) -> NoReturn:
def main() -> None:
# TODO implement
sts_client = boto3.client('sts')
assumed_role_object=sts_client.assume_role(
RoleArn="arn:aws:iam::111122223333:role/SomeRole",
RoleSessionName="lambda-assumeRoleMs"
)
print("export AWS_ACCESS_KEY_ID=" + assumed_role_object['Credentials']['AccessKeyId'])
print("export AWS_SECRET_ACCESS_KEY=" + assumed_role_object['Credentials']['SecretAccessKey'])
print("export AWS_SESSION_TOKEN=" + assumed_role_object['Credentials']['SessionToken'])
print("export AWS_DEFAULT_REGION=ap-east-1")
try:
assumed_role_object = sts_client.assume_role(
RoleArn="arn:aws:iam::" + sys.argv[1] + ":role/" + sys.argv[2],
RoleSessionName=sys.argv[2]
)
print(f'''
export AWS_ACCESS_KEY_ID={assumed_role_object['Credentials']['AccessKeyId']}
export AWS_SECRET_ACCESS_KEY{assumed_role_object['Credentials']['SecretAccessKey']}
export AWS_SESSION_TOKEN={assumed_role_object['Credentials']['SessionToken']}
export AWS_DEFAULT_REGION=ap-east-1")
''')
except ClientError as e:
print(e)
if __name__ == "__main__":
main()
-13
View File
@@ -1,13 +0,0 @@
#!/usr/bin/env python3
# Python data types
# list = [value1, value2, value3,...valueN]
# set = {value1, value2, value3,...valueN}
# dict = { key1:value1, key2:value2,...keyN:valueN }
# Sample use of list of dict
datagroup = [{'name': '203.60.15.113/32', 'data': ''}, {'name': '222.186.30.174/32', 'data': ''},{'name': '120.136.32.106/32', 'data': ''}]
newrecord = {'name': '1.2.3.4/32', 'data': ''}
datagroup.append(newrecord)
print(datagroup)
+89
View File
@@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""
When elasticache valkey is deployed without encryption in transit, IAM auth cannot be used.
To enable TLS after deployment, it takes 20min to set it to prefer TLS, and another 5 min to set it to require TLS.
When TLS is enabled, configuration endpoint address is changed
IAM auth is available only after TLS is required. One needs to manually associate the cluster with the elasticache usergroup
It took me a day to figure out how to connect to elasticache/valkey using iam auth, even with help from perplexity. It should
not be this difficult.
The following code is based on valkey's example, which for some reason did not enable TLS.
Other considerations
* Elasticache userid must be the same as username
* The signed request must have validity of 15min
References
* https://github.com/valkey-io/valkey-py/blob/main/docs/examples/connection_examples.ipynb
"""
from typing import Tuple, Union
from urllib.parse import ParseResult, urlencode, urlunparse
import botocore.session
import valkey
from botocore.model import ServiceId
from botocore.signers import RequestSigner
from cachetools import TTLCache, cached
import ssl
class ElastiCacheIAMProvider(valkey.CredentialProvider):
def __init__(self, user, cluster_name, region="ap-east-1"):
self.user = user
self.cluster_name = cluster_name
self.region = region
session = botocore.session.get_session()
self.request_signer = RequestSigner(
ServiceId("elasticache"),
self.region,
"elasticache",
"v4",
session.get_credentials(),
session.get_component("event_emitter"),
)
# Generated IAM tokens are valid for 15 minutes
@cached(cache=TTLCache(maxsize=128, ttl=900))
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
query_params = {"Action": "connect", "User": self.user}
url = urlunparse(
ParseResult(
scheme="https",
netloc=self.cluster_name,
path="/",
query=urlencode(query_params),
params="",
fragment="",
)
)
signed_url = self.request_signer.generate_presigned_url(
{"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}},
operation_name="connect",
expires_in=900,
region_name=self.region,
)
# RequestSigner only seems to work if the URL has a protocol, but
# Elasticache only accepts the URL without a protocol
# So strip it off the signed URL before returning
return (self.user, signed_url.removeprefix("https://"))
username = "cacheuser2"
cluster_name = "cache002"
endpoint = "clustercfg.cache002.rw4ynm.ape1.cache.amazonaws.com"
creds_provider = ElastiCacheIAMProvider(user=username, cluster_name=cluster_name)
user_connection = valkey.Valkey(
host=endpoint,
port=6379,
credential_provider=creds_provider,
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE)
pong = user_connection.ping()
print(f"Redis ping response: {pong}")
user_connection.set('foo', 'helloworld')
value = user_connection.get('foo')
print(f"Value for 'foo': {value.decode()}")
+1
View File
@@ -0,0 +1 @@
downloaded_files
@@ -0,0 +1,70 @@
#!/usr/bin/env python3
# Imports
import threading
import multiprocessing
import concurrent.futures
import time
def task(name):
"""
Dummy function which pretends to do some work
"""
print(f"Thread {name}: Starting...")
time.sleep(1)
print(f"Thread {name}: Finishing.")
def threading_example():
"""
threading: not truly concurrent as GIL (Global Interpreter Lock) limits 1 process for each bytecode execution.
it does allow the process to do more work while other threads are not busy.
threading is relatively light-weight
"""
threads = []
for i in range(3):
threads.append(threading.Thread(target=task, args=(i,)))
for i in threads:
i.start()
for i in threads:
i.join()
print("threading_example: All threads completed.")
def multiprocessing_example():
"""
multiprocessing: True parallel execution on multiple CPU cores. Tasks are ran on independent processes.
More resource expensive compared to threading
"""
mp = []
for i in range(3):
mp.append(multiprocessing.Process(target=task, args=(i,)))
for i in mp:
i.start()
for i in mp:
i.join()
print("multiprocessing_example: Done with all calculations!")
def concurrent_futures_example():
"""
high-level implementation of threading. facilitate result consolidation
for high-level implemetnation of multiprocessing, use ProcessPoolExecutor
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for i in range(3):
executor.submit(task, i)
print("concurrent_futures_example: All threads completed.")
# Main function
def main() -> None:
threading_example()
multiprocessing_example()
concurrent_futures_example()
# Call main function
if __name__ == '__main__':
main()
+13
View File
@@ -0,0 +1,13 @@
#!/usr/bin/env python3
"""
Python data types
list = [value1, value2, value3,...valueN]
set = {value1, value2, value3,...valueN}
dict = { key1:value1, key2:value2,...keyN:valueN }
"""
list_of_dict = [{'name': '203.60.15.113/32', 'data': ''}, {'name': '222.186.30.174/32', 'data': ''},{'name': '120.136.32.106/32', 'data': ''}]
new_record = {'name': '1.2.3.4/32', 'data': ''}
list_of_dict.append(new_record)
print(list_of_dict)
+4
View File
@@ -0,0 +1,4 @@
name,age
tom, 22
sam, 32
mary, 19
1 name age
2 tom 22
3 sam 32
4 mary 19
+22
View File
@@ -0,0 +1,22 @@
#!/usr/bin/env python3
import duckdb
# Create an in-memory DuckDB connection
con = duckdb.connect(':memory:')
# first query which selects a number
r1 = con.sql("SELECT 42 AS i")
con.sql("SELECT i * 2 AS k FROM r1").show()
# create a table. insert a row and query the table
con.sql("CREATE TABLE test (i INTEGER)")
con.sql("INSERT INTO test VALUES (42)")
con.table("test").show()
# read a csv into duckdb
csvt = con.read_csv("duck.csv")
con.sql("SELECT * FROM csvt WHERE name = 'tom'").show()
# explicitly close the connection
con.close()
+42
View File
@@ -0,0 +1,42 @@
#!/usr/bin/env python3
r"""
Documentation
License: This program is released under the MIT License
"""
# Imports
from enum import Enum
class Color(Enum):
RED = '#FF0000'
GREEN = '#00FF00'
BLUE = '#0000FF'
# function which uses the Enum
def paint_wall(color: Color) -> None:
match color:
case Color.RED:
print("Red wall, are you serious?")
case Color.GREEN:
print("Green wall, very foresty.")
case Color.BLUE:
print("Blue wall, I like it.")
case _:
print("Other colors are not preferred.")
# Main function
def main() -> None:
# Check Enum name and value
print(f"Enum: {Color.RED} Name: {Color.RED.name} Value: {Color.RED.value}")
# function that uses Enum
paint_wall(Color.RED)
# print all Enum values
print([x.value for x in Color])
# check if value is a member of the Enum, kind of pointless as IDE will report the problem ahead
# print(Color.BLACK in Color)
# Call main function
if __name__ == '__main__':
main()
+27
View File
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# Main function
def main() -> None:
"""
Number formatting in f-string using format specifiers
The following prints will output
314%
3.14
12,345
00012345
12345
len(str(var1)) = 8
"""
var1: float = 3.141516
var2: float = 12345
print(f"{var1: .0%}")
print(f"{var1: .2f}")
print(f"{var2: ,}")
print(f"{var2: 09}")
print(f"{var2: >10}")
print(f"{len(str(var1)) = }")
# Call main function
if __name__ == '__main__':
main()
+39
View File
@@ -0,0 +1,39 @@
#!/usr/bin/env python3
def square(x: int) -> int:
return x ** 2
# Main function
def main() -> None:
"""
lambda: a one-liner anonymous function. in its simplest form, it just a function.
for example, the following can be rewritten with a add_1 function which returns x + 1
"""
add_1 = lambda x: x + 1
result = add_1(1)
print(result)
"""
map function: apply a function to every element in an iterable
returns a map object which can then be casted to a list
"""
numbers = range(1,10)
# square is the function and numbers is the iterable where elements will be sent from
results = list(map(square, numbers))
print(results)
"""
implement the same map function with lambda
"""
results2 = list(map(lambda x: x ** 2, numbers))
print(results2)
"""
filter function: apply function to every element. if true, keep the element. if false, reject it
"""
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
print(even_numbers)
# Call main function
if __name__ == '__main__':
main()
@@ -0,0 +1,33 @@
#!/usr/bin/env python3
import logging
logging.basicConfig(level=logging.INFO, format="%(funcName)s %(levelname)s: %(message)s")
def while_loop() -> None:
"""
while loop which requires a counter
"""
counter: int = 1
while counter <= 5:
logging.info(counter)
counter += 1
def for_loop() -> None:
"""
for loop which puts the counter inline
"""
for i in range(5):
logging.info(i)
# Main function
def main() -> None:
"""
Both functions will log a message 5 times, but the for loop is so much simpler
"""
while_loop()
for_loop()
# Call main function
if __name__ == '__main__':
main()
Binary file not shown.
+18
View File
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
import numpy as np
celsius_temps = np.array([70, 75, 80])
fahrenheit_temps = (celsius_temps * 9/5) + 32
print(fahrenheit_temps) # [158. 167. 176.]
revenues = np.array([1000, 1500, 800, 2000, 1200])
costs = np.array([600, 900, 500, 1100, 700])
tax_rates = np.array([0.15, 0.18, 0.12, 0.20, 0.16])
gross_profits = revenues - costs
net_profits = gross_profits * (1 - tax_rates)
print(net_profits) # [340. 492. 264. 720. 420.]
+26
View File
@@ -0,0 +1,26 @@
#!/usr/bin/env python3
"""
Demonstrate how to use asyncio
"""
# Imports
import asyncio
async def doit():
print('Start doing...')
await asyncio.sleep(2)
print('Done!')
# Main function
async def main() -> None:
print('Main starts...')
job_queue = []
for i in range(3):
job_queue.append(doit())
await asyncio.gather(*job_queue)
print('Main ends...')
# Call main function
if __name__ == '__main__':
asyncio.run(main())
+43
View File
@@ -0,0 +1,43 @@
#!/usr/bin/env python3
from seleniumbase import SB
from seleniumbase.common.exceptions import WebDriverException
import re
def main():
try:
with SB(uc=True, headless=True) as sb:
"""
UC mode is designed to make browser automation appear human and thus evade detection by anti-bot systems.
CDP mode uses the Chrome DevTools Protocol to allow more direct and lower-level control over the browser.
don't need this # sb.activate_cdp_mode(url)
"""
sb.open("https://www.openssh.org/goals.html")
# page source
print(sb.get_page_source())
print("-" * 80)
# enumerate all elements... causes TimeoutError
# all_elements = sb.find_elements("xpath", "/html/body/*")
# print(f"Total elements found: {len(all_elements)}")
# for elem in all_elements:
# print(elem.tag_name)
# print("-" * 80)
# specific html element and element id
print(sb.get_text("/html/body/h2[@id='OpenBSD']"))
print("-" * 80)
# similar to the above, but line breaks are differently presented
print(sb.get_element("xpath", "//h2[@id='OpenBSD']").get_attribute('innerText'))
print("-" * 80)
except WebDriverException as e:
print(f"Failed to get page: {e}")
except Exception as e:
print(e)
if __name__ == '__main__':
main()
+18
View File
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""
Use terminaltables to draw a nice table
"""
from terminaltables import SingleTable
def main():
table_data = [
["Heading1", "Heading2"],
["row1 column1", "row1 column2"],
["row2 column1", "row2 column2"],
["row3 column1", "row3 column2"],
]
t1 = SingleTable(table_data)
print(t1.table)
if __name__ == '__main__':
main()
+36
View File
@@ -0,0 +1,36 @@
#!/usr/bin/env python3
r"""
Documentation
License: This program is released under the MIT License
"""
# Imports
import sys
from terminaltables import SingleTable
# Main function
def main() -> None:
difference = float(sys.argv[2]) - float(sys.argv[1]);
if float(sys.argv[2]) > float(sys.argv[1]):
label1, label2 = "LO", "HI"
else:
label1, label2 = "HI", "LO"
table_data = [
['Position', 'Value'],
[label1, f"{int(sys.argv[1]):,}"],
['0.382', f"{int(difference * 0.382 + float(sys.argv[1])):,}"],
['0.5', f"{int(difference * 0.5 + float(sys.argv[1])):,}"],
['0.618', f"{int(difference * 0.618 + float(sys.argv[1])):,}"],
['0.764', f"{int(difference * 0.764 + float(sys.argv[1])):,}"],
[label2, f"{int(sys.argv[2]):,}"]
]
t1 = SingleTable(table_data)
print(t1.table)
# Call main function
if __name__ == '__main__':
main()
-11
View File
@@ -1,11 +0,0 @@
#!/usr/bin/python3
import logging
logging.basicConfig(format='%(levelname)s: %(message)s',level=logging.DEBUG)
logging.debug('Debug message')
logging.info('Info message')
logging.warning('Warning message')
logging.error('Error message')
logging.critical('Critical message')
+40
View File
@@ -0,0 +1,40 @@
import json
import pymysql
import pymysql.cursors
import boto3
import traceback
def lambda_handler(event, context):
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name='ap-east-1'
)
secret = client.get_secret_value(
SecretId='your-mysql-proxy-secret'
)
#print(secret.get("SecretString"))
secret_dict = json.loads(secret.get("SecretString"))
print("* Retrieved secret from sm. DB username is", secret_dict.get('username'))
print("* Trying to connect to rds proxy...")
try:
# Connect to the database
connection = pymysql.connect(host="xxxx.ap-east-1.rds.amazonaws.com",
user=secret_dict.get('username'),
password=secret_dict.get('password'),
database="mysql",
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=10,
ssl_ca="global-bundle.pem",
ssl_verify_identity=False)
print("* Connected to rds proxy. Running query...")
cur = db.cursor(pymysql.cursors.DictCursor)
sql = "SELECT User from mysql.user"
cur.execute(sql)
for row in cur:
print(row)
except pymysql.err.OperationalError as e1:
print("* Connection failed - ", e1)
Executable
+50
View File
@@ -0,0 +1,50 @@
#!/usr/bin/env python3
import sys
import petname
import secrets
from itertools import chain
def generate_password(names: int) -> str:
"""
Generate a password from pet names and a random number
:param names: Number of names to generate
:return: A simple yet secure password
"""
results = []
while len(results) < names:
candidate = petname.Name().capitalize()
results.append(candidate) if len(candidate) > 3 else None
results.append(str(secrets.randbelow(8999)+1000))
return ".".join(results)
def generate_insane_password(length: int) -> str:
"""
Generate an insane password using extended ascii characters
:param length:
:return:
"""
ascii_pool: str = ''.join(chr(i) for i in chain(range(40, 127), range(180, 256)))
return ''.join(secrets.choice(ascii_pool) for _ in range(length))
def password_qc(pw: str) -> bool:
specials = set("!@#$%^&*()-_=+[]{};:'\"|,.<>/?`~\\")
has_lower = any(c.islower() for c in pw)
has_upper = any(c.isupper() for c in pw)
has_digit = any(c.isdigit() for c in pw)
has_special = any(c in specials for c in pw)
return has_lower and has_upper and has_digit and has_special
if __name__ == '__main__':
if len(sys.argv) <= 1:
print(generate_password(3))
elif sys.argv[1] == '-insane' or sys.argv[1] == '-i':
pw = generate_insane_password(20)
while not password_qc(pw):
pw = generate_insane_password(20)
else:
print(pw)
-7
View File
@@ -1,7 +0,0 @@
#!/usr/bin/python3
msg = ""
msg += "Line 1"
msg += "Line 2"
print(msg)
-11
View File
@@ -1,11 +0,0 @@
from prettytable import PrettyTable
x = PrettyTable()
x.field_names = ["City name", "Area", "Population", "Annual Rainfall"]
x.add_row(["Adelaide", 1295, 1158259, 600.5])
x.add_row(["Brisbane", 5905, 1857594, 1146.4])
x.add_row(["Darwin", 112, 120900, 1714.7])
x.add_row(["Hobart", 1357, 205556, 619.5])
x.add_row(["Sydney", 2058, 4336374, 1214.8])
x.add_row(["Melbourne", 1566, 3806092, 646.9])
x.add_row(["Perth", 5386, 1554769, 869.4])
print(x);
-10
View File
@@ -1,10 +0,0 @@
from terminaltables import SingleTable
table_data = [
['Heading1', 'Heading2'],
['row1 column1', 'row1 column2'],
['row2 column1', 'row2 column2'],
['row3 column1', 'row3 column2']
]
t1 = SingleTable(table_data)
print (t1.table)
-8
View File
@@ -1,8 +0,0 @@
#!/usr/bin/env python
import os, time
fileage = os.path.getmtime('/var/log/ufw.log');
if time.time() - fileage > 7200:
print ('File is > 2 hours old')
+6 -4
View File
@@ -1,10 +1,12 @@
#!/usr/bin/python3
import urllib3
from urllib3 import Timeout
from pprint import pprint
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
http = urllib3.PoolManager(cert_reqs='CERT_NONE')
url = "https://stats.oecd.org/"
r = http.request('GET', url, timeout=Timeout(10))
http = urllib3.PoolManager(cert_reqs="CERT_NONE")
url = "https://blog.headdesk.me/"
r = http.request("GET", url, timeout=Timeout(10))
print("statusCode: %s" % r.status)
print(f"ResponseCode: {r.status}")
pprint(dict(r.headers))
+8
View File
@@ -0,0 +1,8 @@
#!/bin/bash
# Replace key and iv with your random string. The following are examples of how to generate the strings.
key=$(openssl rand -hex 32)
iv=$(openssl rand -hex 16)
openssl enc -chacha20 -base64 -salt -md sha512 -in $1 -out $1.asc -K $key -iv $iv
+19
View File
@@ -0,0 +1,19 @@
#!/bin/bash
echo "========================================================================="
echo "This script will check if master branch contains commit from this branch."
echo "Then suggest if the branch can be deleted"
echo "========================================================================="
echo ""
CURRBRANCH=$(git branch --show-current)
COMMITID=$(git rev-parse HEAD)
git switch -q master
git log | grep $COMMITID
if [ $? -eq 1 ]; then
echo "commit not found in master. You should keep $CURRBRANCH"
else
echo "commit found in master. You should delete $CURRBRANCH"
git log -n1 $COMMITID
fi
git switch -q $CURRBRANCH
Executable
+31
View File
@@ -0,0 +1,31 @@
#!/bin/bash
if ls -d /tmp; then
echo "Command succeeded: file exists."
fi
if [ -f /etc/hosts ]; then
echo "/etc/hosts exists"
fi
num=100
if [ "$num" -gt 10 ]; then
echo "The number is greater than 10."
fi
floatnum=20.34
result=$(echo "$floatnum > 10.5" | bc)
if [ "$result" -eq 1 ]; then
echo "The number is greater than 10.5"
fi
word="hello"
if [ "$word" = "hello" ]; then
echo "You typed hello."
fi
sentense="I want to say hello to the world"
regex="say.*hello"
if [[ "$sentense" =~ $regex ]]; then
echo "Substring found."
fi
+20
View File
@@ -0,0 +1,20 @@
#!/bin/bash
function doit() {
echo $1 | ts
sleep 1
}
export -f doit
echo "* Regular loop execution"
for i in $(seq 1 5); do
doit $i
done
echo "* Execution with sem"
for i in $(seq 1 5); do
sem -j4 doit $i
done
sem --wait