feat: added function caching and assertion

This commit is contained in:
xpk
2025-11-24 11:10:13 +08:00
parent e75abcf971
commit 3e5bb0547a
+36 -40
View File
@@ -19,7 +19,7 @@ from mdutils.mdutils import MdUtils
import os
import json
import concurrent.futures
import functools
def printTitle(level: int, title: str):
if level <= 2:
@@ -28,17 +28,19 @@ def printTitle(level: int, title: str):
mdFile.new_paragraph(title)
return
def getAllRegions(myclient):
return jmespath.search("Regions[*].RegionName", myclient.describe_regions(AllRegions=False))
@functools.cache
def getAllRegions() -> list:
tmpSession = botoSession(os.environ['AWS_DEFAULT_REGION'])
tmpClient = tmpSession.client('ec2')
regions = jmespath.search("Regions[*].RegionName", tmpClient.describe_regions(AllRegions=False))
assert type(regions) == list
return regions
def getAgeFromDate(inputDate):
today = date.today()
delta = today - inputDate.date()
return delta.days
def printResult(content: list, header: str):
if len(content) <= 0:
mdFile.new_paragraph("✅ No issue found.")
@@ -53,7 +55,7 @@ def printResult(content: list, header: str):
mdFile.new_table(columns=tableCol, rows=len(content)+1, text=table, text_align='left')
return
@functools.cache
def botoSession(region:str) -> boto3.Session:
session = boto3.Session(region_name=region)
return session
@@ -76,7 +78,6 @@ sts = globalSession.client("sts")
ec2Client = localSession.client("ec2")
aid = sts.get_caller_identity().get("Account")
regions = getAllRegions(ec2Client)
"""Check instances stopped for long time"""
printTitle(1, "Ec2 service review")
@@ -84,7 +85,7 @@ printTitle(2, "[Cost Optimization] Instances stopped for over 14 days")
printTitle(3, "Consider backing up and terminate instances "
"or use AutoScalingGroup to spin up and down instances as needed.")
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
@@ -100,7 +101,7 @@ printTitle(3, "Consider requiring IDMSv2. For more information, "
"see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
@@ -119,13 +120,13 @@ printResult(outTable, "Region, AccountID, InstanceId, IDMSv2")
printTitle(2,"[Sustainability] Use of early generation instance type")
printTitle(3, "Consider using current generation instances")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if re.search("^(t1|t2|m3|m1|m2|m4|c1|c2|c3|c4|r3|r4|i2)", i[0].get("InstanceType")) is not None:
if re.search("^(t1|t2|m3|m1|m2|m4|m5|c1|c2|c3|c4|r3|r4|i2)", i[0].get("InstanceType")) is not None:
outTable.append([
r,
aid,
@@ -139,7 +140,7 @@ printResult(outTable, "Region, AccountID, InstanceId, InstanceName, InstanceType
printTitle(2, "[Cost Optimization] Unattached EBS volumes")
printTitle(3, "Consider backing up the volumes and delete them")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_volumes(
@@ -158,7 +159,7 @@ printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
printTitle(2, "[Cost Optimization] EBS snapshots more than 365 days old")
printTitle(3,"Consider removing snapshots if no longer needed")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_snapshots(
@@ -179,7 +180,7 @@ printTitle(3, "Consider replacing volume with encrypted ones. "
"detach the original volume and attach the encrypted volume. Remember to clean up the volumes"
"and snapshots afterwards.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_volumes(
@@ -203,7 +204,7 @@ printTitle(2, "[Cost Optimization] Unused Elastic IP")
printTitle(3, "Consider deleting unused EIP")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_addresses()
@@ -218,7 +219,7 @@ printTitle(2, "[Security] Security group rules allowing ingress from 0.0.0.0/0")
printTitle(3, "Consider setting more restrictive rules allowing access from specific sources.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_security_group_rules()
@@ -239,7 +240,7 @@ printTitle(2, "[Security] Unencrypted RDS instances")
printTitle(3, "Consider encrypting RDS instances. For more detail, see "
"https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/encrypt-an-existing-amazon-rds-for-postgresql-db-instance.html")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("rds")
response = client.describe_db_instances()
@@ -256,7 +257,7 @@ printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
printTitle(2, "[Reliability] RDS instance running in single availability zone")
printTitle(3, "Consider enabling multi-az for production use.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("rds")
response = client.describe_db_instances()
@@ -275,7 +276,7 @@ printTitle(2, "[Security] Outdated Lambda runtime")
printTitle(3, "Consider changing to currently supported Lambda runtime versions, "
"listed on https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("lambda")
response = client.list_functions()
@@ -425,7 +426,7 @@ printTitle(2, "[Cost Optimization] Cloudwatch LogGroups without retention period
printTitle(3, "Consider setting retention")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("logs")
response = client.describe_log_groups()
@@ -439,7 +440,7 @@ printTitle(2, "[Security] Cloudwatch LogGroups unencrypted")
printTitle(3, "Consider encrypting LogGroups")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("logs")
response = client.describe_log_groups()
@@ -453,7 +454,7 @@ printTitle(1, "Backup service review")
printTitle(2, "[Reliability] Ec2/Rds instances found but AWSBackup plan missing")
printTitle(3, "Consider setting up AWSBackup plans to backup AWS resources.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("backup")
response = client.list_backup_plans()
@@ -508,7 +509,7 @@ printTitle(2, "[Sustainability] ElastiCache instances on x64 platform")
printTitle(3, "Consider Graviton instances such as t4g/r7g to optimize your infrastructure investment.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("elasticache")
response = client.describe_cache_clusters()
@@ -523,7 +524,7 @@ printTitle(2, "[Cost Optimization] LB Target group without targets")
printTitle(3, "Consider removing empty target groups")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("elbv2")
response = client.describe_target_groups()
@@ -541,7 +542,7 @@ printTitle(3, "Consider enabling auto key rotation. When a key is rotated, previ
"are still kept within AWS to allow data retrival.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("kms")
response = client.list_keys()
@@ -567,7 +568,7 @@ printTitle(3, "Consider restricting access to private API with a "
"https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-resource-policies-examples.html")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("apigateway")
response = client.get_rest_apis()
@@ -596,7 +597,7 @@ printTitle(3, "Consider enabling Multi-Region for at least 1 cloudtrail")
outTable = []
multiRegionTrailCount = 0
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("cloudtrail")
response = client.describe_trails()
@@ -614,7 +615,7 @@ printTitle(1, "Vpc service review")
printTitle(2, "[Security] VPC flow log not enabled")
printTitle(3, "Consider enabling VPC flowlog for audit purpose or delete the default VPCs if not in use")
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpcs()
@@ -639,7 +640,7 @@ printTitle(2, "[Security] Default VPCs should be removed")
printTitle(3, "Consider deleting the default VPCs if not in use")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpcs(
@@ -667,7 +668,7 @@ outTable = []
with concurrent.futures.ThreadPoolExecutor() as executor:
# Map returns results in the same order as regions
results = executor.map(check_vpc_endpoint, regions)
results = executor.map(check_vpc_endpoint, getAllRegions())
for result in results:
if result:
outTable.append(result)
@@ -681,7 +682,7 @@ printTitle(3, "Consider having 2 tunnels for each site VPN connection. "
"of service interruption.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpn_connections()
@@ -730,7 +731,7 @@ printTitle(3, "Consider using AmazonLinux2023. "
"which offers better performance and security.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("eks")
response = client.list_clusters()
@@ -753,7 +754,7 @@ printTitle(3, "Consider using upgrading Eks cluster. "
"for upgrade instructions.")
outTable = []
for r in regions:
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("eks")
response = client.list_clusters()
@@ -764,9 +765,4 @@ for r in regions:
printResult(outTable, "Region, AccountID, Cluster, Version")
mdFile.create_md_file()
print("Report written to AwsReviewReport.md")
# TODO
"""
- config enabled for all regions
"""
print("Report written to AwsReviewReport.md")