Files
code-dumps/aws/AwsEnvReview.py
T

768 lines
30 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Review AWS environment based on 6 WAR pillars, namely:
1. Operational Excellence
2. Security
3. Reliability
4. Performance Efficiency
5. Cost Optimization
6. Sustainability
"""
import boto3
import botocore
from botocore.exceptions import ClientError
import jmespath
import re
from pprint import pprint
from datetime import date
from mdutils.mdutils import MdUtils
import os
import json
import concurrent.futures
import functools
def printTitle(level: int, title: str):
if level <= 2:
mdFile.new_header(level=level, title=title)
else:
mdFile.new_paragraph(title)
return
@functools.cache
def getAllRegions() -> list:
tmpSession = botoSession(os.environ['AWS_DEFAULT_REGION'])
tmpClient = tmpSession.client('ec2')
regions = jmespath.search("Regions[*].RegionName", tmpClient.describe_regions(AllRegions=False))
assert type(regions) == list
return regions
def getAgeFromDate(inputDate):
today = date.today()
delta = today - inputDate.date()
return delta.days
def printResult(content: list, header: str):
if len(content) <= 0:
mdFile.new_paragraph("✅ No issue found.")
return
header = "Item," + header
table = header.split(",")
tableCol = len(table)
for count, row in enumerate(content):
row.insert(0, count+1)
table.extend(row)
mdFile.new_line()
mdFile.new_table(columns=tableCol, rows=len(content)+1, text=table, text_align='left')
return
@functools.cache
def botoSession(region:str) -> boto3.Session:
session = boto3.Session(region_name=region)
return session
# initialize md file output
mdFile = MdUtils(file_name='AwsReviewReport.md', title='Aws Review ' + str(date.today()))
mdFile.write("-" * 5)
outTable = []
print("Script started. It may take 7+ minutes to run. Report will be saved to AwsReviewReport.md. Please be patient...")
printTitle(3, f"Primary region: {os.environ['AWS_DEFAULT_REGION']}\n")
# create sessions
globalSession = botoSession("us-east-1")
localSession = botoSession(os.environ['AWS_DEFAULT_REGION'])
# create clients
sts = globalSession.client("sts")
ec2Client = localSession.client("ec2")
aid = sts.get_caller_identity().get("Account")
"""Check instances stopped for long time"""
printTitle(1, "Ec2 service review")
printTitle(2, "[Cost Optimization] Instances stopped for over 14 days")
printTitle(3, "Consider backing up and terminate instances "
"or use AutoScalingGroup to spin up and down instances as needed.")
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if i[0].get("State").get("Name") == "stopped":
outTable.append([r, aid, i[0].get("InstanceId"), getAgeFromDate(i[0].get("UsageOperationUpdateTime"))])
printResult(outTable, "Region, AccountID, InstanceId, DaysStopped")
"""Check IMDS version"""
printTitle(2, "[Security] Insecure IDMSv1 allowed")
printTitle(3, "Consider requiring IDMSv2. For more information, "
"see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if i[0].get("MetadataOptions").get("HttpTokens") == "optional":
outTable.append([
r,
aid,
i[0].get("InstanceId"),
i[0].get("MetadataOptions").get("HttpTokens")
])
printResult(outTable, "Region, AccountID, InstanceId, IDMSv2")
"""Check EC2 instance generation"""
printTitle(2,"[Sustainability] Use of early generation instance type")
printTitle(3, "Consider using current generation instances")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_instances()
if len(response.get("Reservations")) > 0:
for i in jmespath.search("Reservations[*].Instances[*]", response):
if re.search("^(t1|t2|m3|m1|m2|m4|m5|c1|c2|c3|c4|r3|r4|i2)", i[0].get("InstanceType")) is not None:
outTable.append([
r,
aid,
i[0].get("InstanceId"),
jmespath.search("Tags[?Key =='Name'].Value", i[0])[0],
i[0].get("InstanceType")
])
printResult(outTable, "Region, AccountID, InstanceId, InstanceName, InstanceType")
"""Check unattached EBS volumes"""
printTitle(2, "[Cost Optimization] Unattached EBS volumes")
printTitle(3, "Consider backing up the volumes and delete them")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_volumes(
Filters=[
{
'Name': 'status',
'Values': ['available']
}
]
)
for i in response.get("Volumes"):
outTable.append([r, aid, i.get("VolumeId"), i.get("Size"), i.get("VolumeType")])
printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
"""Check stale EBS snapshots"""
printTitle(2, "[Cost Optimization] EBS snapshots more than 365 days old")
printTitle(3,"Consider removing snapshots if no longer needed")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_snapshots(
OwnerIds=[aid]
)
for i in response.get("Snapshots"):
if getAgeFromDate(i.get("StartTime")) > 365 and i.get(
"Description") != "This snapshot is created by the AWS Backup service.":
outTable.append(
[r, aid, i.get("SnapshotId"), i.get("Description")[:70], getAgeFromDate(i.get("StartTime"))])
printResult(outTable, "Region, AccountID, SnapshotId, Description, SnapshotAge")
"""Check EBS encryption"""
printTitle(2, "[Security] Unencrypted EBS volumes")
printTitle(3, "Consider replacing volume with encrypted ones. "
"One can do so by stopping the Ec2 instance, creating snapshot for the unencrypted volume, "
"copy the snapshot to a new encrypted snapshot, create a volume from the encrypted snapshot,"
"detach the original volume and attach the encrypted volume. Remember to clean up the volumes"
"and snapshots afterwards.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_volumes(
Filters=[
{
'Name': 'encrypted',
'Values': ['false']
},
{
'Name': 'status',
'Values': ['in-use']
}
]
)
for i in response.get("Volumes"):
outTable.append([r, aid, i.get("VolumeId"), i.get("Size"), i.get("VolumeType")])
printResult(outTable, "Region, AccountID, VolumeId, Size, VolumeType")
"""Check unused EIP"""
printTitle(2, "[Cost Optimization] Unused Elastic IP")
printTitle(3, "Consider deleting unused EIP")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_addresses()
for i in response.get("Addresses"):
if i.get("AssociationId") is None:
outTable.append([r, aid, i.get("PublicIp")])
printResult(outTable, "Region, AccountID, PublicIp")
"""Check unsafe security groups"""
printTitle(1, "Security group review")
printTitle(2, "[Security] Security group rules allowing ingress from 0.0.0.0/0")
printTitle(3, "Consider setting more restrictive rules allowing access from specific sources.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_security_group_rules()
for sgr in jmespath.search("SecurityGroupRules[?IsEgress==`false`]", response):
if (not sgr.get("IsEgress")
and sgr.get("CidrIpv4") == "0.0.0.0/0"
and sgr.get("FromPort") != 443
and sgr.get("ToPort") != 443
and sgr.get("FromPort") != 80
and sgr.get("ToPort") != 80):
outTable.append(
[r, aid, sgr.get("GroupId"), sgr.get("SecurityGroupRuleId"), sgr.get("FromPort"), sgr.get("ToPort")])
printResult(outTable, "Region, AccountID, SecurityGroup, Rule, FromPort, ToPort")
"""Check RDS encryption setting"""
printTitle(1, "Rds service review")
printTitle(2, "[Security] Unencrypted RDS instances")
printTitle(3, "Consider encrypting RDS instances. For more detail, see "
"https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/encrypt-an-existing-amazon-rds-for-postgresql-db-instance.html")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("rds")
response = client.describe_db_instances()
for i in response.get("DBInstances"):
if i.get("StorageEncrypted") == "False":
outTable.append([r, aid, i.get("DBInstanceIdentifier"), i.get("Engine")])
response = client.describe_db_clusters()
for i in response.get("DBClusters"):
if i.get("StorageEncrypted") == "False":
outTable.append([r, aid, i.get("DBClusterIdentifier"), i.get("Engine")])
printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
"""Check RDS instance in single AZ"""
printTitle(2, "[Reliability] RDS instance running in single availability zone")
printTitle(3, "Consider enabling multi-az for production use.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("rds")
response = client.describe_db_instances()
for i in response.get("DBInstances"):
if not i.get("MultiAZ"):
outTable.append([r, aid, i.get("DBInstanceIdentifier"), i.get("Engine")])
response = client.describe_db_clusters()
for i in response.get("DBClusters"):
if not i.get("MultiAZ"):
outTable.append([r, aid, i.get("DBClusterIdentifier"), i.get("Engine")])
printResult(outTable, "Region, AccountID, DBIdentifier, Engine")
"""Check outdated lambda runtime"""
printTitle(1, "Lambda service review")
printTitle(2, "[Security] Outdated Lambda runtime")
printTitle(3, "Consider changing to currently supported Lambda runtime versions, "
"listed on https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("lambda")
response = client.list_functions()
for i in response.get("Functions"):
if i.get("Runtime") is not None:
if re.search("python2|python3.[678]|java8|nodejs[468]|nodejs1[024]|dotnet6", i.get("Runtime")) is not None:
outTable.append([r, aid, i.get("FunctionName"), i.get("Runtime")])
printResult(outTable, "Region, AccountID, FunctionName, Runtime")
"""Check IAM access key rotation"""
printTitle(1, "Iam service review")
printTitle(2, "[Security] Iam user access key not rotated for 180 days")
printTitle(3, "Consider rotating access key")
outTable = []
client = globalSession.client("iam")
listUsers = client.list_users()
users = jmespath.search("Users[*].UserName", listUsers)
for u in users:
response = client.list_access_keys(UserName=u)
for i in response.get("AccessKeyMetadata"):
if getAgeFromDate(i.get("CreateDate")) > 180:
outTable.append([aid, u, i.get("AccessKeyId"), getAgeFromDate(i.get("CreateDate"))])
printResult(outTable, "AccountID, UserName, AccessKeyId, AccessKeyAge")
"""Check IAM entity with admin access"""
printTitle(2, "[Security] Iam AdministratorAccess policy attached")
printTitle(3, "Consider granting minimum privileges "
"to users/groups/roles. AWS managed policies for job functions are recommended. See "
"https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions.html")
outTable = []
client = globalSession.client("iam")
entityResp = client.list_entities_for_policy(
PolicyArn='arn:aws:iam::aws:policy/AdministratorAccess'
)
for group in jmespath.search("PolicyGroups[*].GroupName", entityResp):
outTable.append([aid, "Group", group])
for user in jmespath.search("PolicyUsers[*].UserName", entityResp):
outTable.append([aid, "User", user])
for role in jmespath.search("PolicyRoles[*].RoleName", entityResp):
outTable.append([aid, "Role", role])
printResult(outTable, "AccountID, Type, Name")
"""Check Lambda role and IAM instance profile permissions"""
printTitle(2, "[Security] Check permissions of Lamda roles and Ec2 instance roles")
printTitle(3, "Typically these roles should not have admin or iam permissions.")
outTable = []
# Get a list of roles
client = localSession.client("lambda")
roles = set()
paginator = client.get_paginator('list_functions')
for page in paginator.paginate():
for function in page['Functions']:
role_arn = function.get('Role')
if role_arn:
roles.add(role_arn.split('/')[-1])
client = boto3.client('iam')
paginator = client.get_paginator('list_instance_profiles')
for page in paginator.paginate():
for profile in page['InstanceProfiles']:
profile_name = profile['InstanceProfileName']
instance_roles = profile.get('Roles', [])
for role in instance_roles:
roles.add(role['RoleName'])
# Need to remove non-existent roles
iam_client = globalSession.client("iam")
confirmed_roles = set()
for role in roles:
try:
iam_client.get_role(RoleName=role)
confirmed_roles.add(role)
except ClientError as e:
outTable.append([aid, role, "na", "Role does not exist anymore"])
pass
roles = confirmed_roles
printTitle(3, f"Roles to be examined: {len(confirmed_roles)}")
# predefined actions which should not be granted
high_risk_actions = {
"*",
"iam:*",
"iam:CreateUser",
"iam:AttachUserPolicy",
"iam:CreateRole",
"iam:AttachRolePolicy",
"iam:CreateInstanceProfile",
"iam:CreateLoginProfile",
"iam:CreateOpenIDConnectProvider",
"iam:CreateSAMLProvider",
"iam:CreateServiceLinkedRole"
}
# Check inline policies for each role
client = globalSession.client("iam")
for role in roles:
inline_policy_names = client.list_role_policies(RoleName=role)['PolicyNames']
for policy_name in inline_policy_names:
response = client.get_role_policy(RoleName=role, PolicyName=policy_name)
policy = response['PolicyDocument']
flat_actions = jmespath.search('Statement[?Effect=="Allow"].Action[]', policy)
if flat_actions is None:
print(json.dumps(policy))
outTable.append([aid, role, policy_name, "Single statement policy not supported by this program"])
else:
common = high_risk_actions.intersection(flat_actions)
if len(common) >= 1:
outTable.append([aid, role, policy_name, f"Inline policy contains {common}, please review it"])
# Check managed policies for each role
for role in roles:
attached_policies = client.list_attached_role_policies(RoleName=role)['AttachedPolicies']
for policy in attached_policies:
policy_arn = policy['PolicyArn']
policy_name = policy['PolicyName']
# Get the policy default version
policy_info = client.get_policy(PolicyArn=policy_arn)['Policy']
default_version_id = policy_info['DefaultVersionId']
# Get the policy document of the default version
version = client.get_policy_version(PolicyArn=policy_arn, VersionId=default_version_id)
policy_document = version['PolicyVersion']['Document']
flat_actions = jmespath.search('Statement[?Effect=="Allow"].Action[]', policy_document)
if flat_actions is None:
print(json.dumps(policy_document))
outTable.append([aid, role, policy_name, "Single statement policy not supported by this program"])
else:
common = high_risk_actions.intersection(flat_actions)
if len(common) >= 1:
outTable.append([aid, role, policy_name, f"Managed policy contains {common}, please review it"])
printResult(outTable, "AccountID, RoleName, PolicyName, Issue")
"""Check cloudwatch log group retention"""
printTitle(1, "Cloudwatch service review")
printTitle(2, "[Cost Optimization] Cloudwatch LogGroups without retention period")
printTitle(3, "Consider setting retention")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("logs")
response = client.describe_log_groups()
for i in response.get("logGroups"):
if i.get("retentionInDays") is None:
outTable.append([r, aid, i.get("logGroupName"), int(round(i.get("storedBytes") / 1024 / 1024, 0))])
printResult(outTable, "Region, AccountID, LogGroup, SizeMiB")
"""Check unencrypted cloudwatch log groups"""
printTitle(2, "[Security] Cloudwatch LogGroups unencrypted")
printTitle(3, "Consider encrypting LogGroups")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("logs")
response = client.describe_log_groups()
for i in response.get("logGroups"):
if i.get("kmsKeyId") is None:
outTable.append([r, aid, i.get("logGroupName")])
printResult(outTable, "Region, AccountID, LogGroup")
"""Check AWS Backup plan"""
printTitle(1, "Backup service review")
printTitle(2, "[Reliability] Ec2/Rds instances found but AWSBackup plan missing")
printTitle(3, "Consider setting up AWSBackup plans to backup AWS resources.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("backup")
response = client.list_backup_plans()
if len(response.get("BackupPlansList")) <= 0:
ec2client = boto3.client("ec2", region_name=r)
ec2resp = ec2client.describe_instances()
ec2instances = jmespath.search("Reservations[*].Instances[*]", ec2resp)
rdsclient = boto3.client("rds", region_name=r)
rdsresp = rdsclient.describe_db_instances()
rdsinstances = rdsresp.get("DBInstances")
instanceCount = len(ec2instances) + len(rdsinstances)
if instanceCount >= 1:
outTable.append([r, aid, "AWSBackup plan missing", instanceCount])
printResult(outTable, "Region, AccountID, BackupPlan, Ec2RdsInstances")
"""Check S3 bucket policy"""
printTitle(1, "S3 service review")
printTitle(2, "[Security] S3 bucket policy missing")
printTitle(3, "Consider creating bucket policy and restrict access to bucket")
outTable = []
client = globalSession.client("s3")
response = client.list_buckets()
for i in jmespath.search("Buckets[*].Name", response):
try:
policyResp = client.get_bucket_policy(Bucket=i)
except:
outTable.append([aid, i])
printResult(outTable, "AccountID, BucketName")
"""Check s3 public access block"""
printTitle(2, "[Security] S3 public access block")
printTitle(3, "Blocking public access prevents accidental data leak due to misconfigurations in bucket policy or acl")
# get account id
sts = globalSession.client("sts")
account_id = sts.get_caller_identity()["Account"]
s3control = boto3.client("s3control")
try:
response = s3control.get_public_access_block(AccountId=account_id)
config = response["PublicAccessBlockConfiguration"]
is_blocked = all(config.values())
printTitle(3, "Account-level Public Access blocked.")
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration":
printTitle(3, "Account-level Public Access not blocked.")
"""Check elasticache platform"""
printTitle(1, "ElastiCache review")
printTitle(2, "[Sustainability] ElastiCache instances on x64 platform")
printTitle(3, "Consider Graviton instances such as t4g/r7g to optimize your infrastructure investment.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("elasticache")
response = client.describe_cache_clusters()
for i in response.get("CacheClusters"):
if re.search("[0-9]g.", i.get("CacheNodeType")) is None:
outTable.append([r, aid, i.get("CacheClusterId"), i.get("CacheNodeType")])
printResult(outTable, "Region, AccountID, CacheClusterId, CacheNodeType")
"""Check target group with no target"""
printTitle(1, "LoadBalancer service review")
printTitle(2, "[Cost Optimization] LB Target group without targets")
printTitle(3, "Consider removing empty target groups")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("elbv2")
response = client.describe_target_groups()
for i in response.get("TargetGroups"):
tgResp = client.describe_target_health(TargetGroupArn=i.get("TargetGroupArn"))
if len(jmespath.search("TargetHealthDescriptions[*].Target", tgResp)) == 0:
outTable.append([r, aid, i.get("TargetGroupName")])
printResult(outTable, "Region, AccountID, TargetGroup")
"""Check KMS key rotation"""
printTitle(1, "KMS service review")
printTitle(2, "[Security] Customer Managed Keys do not have auto rotation enabled")
printTitle(3, "Consider enabling auto key rotation. When a key is rotated, previous ones "
"are still kept within AWS to allow data retrival.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("kms")
response = client.list_keys()
for i in jmespath.search("Keys[*].KeyId", response):
try:
keyResp = client.describe_key(KeyId=i)
if (keyResp.get("KeyMetadata").get("Enabled") == "True"
and keyResp.get("KeyMetadata").get("KeyManager") == "CUSTOMER"):
krResp = client.get_key_rotation_status(KeyId=i)
if krResp.get("KeyRotationEnabled") != "False":
outTable.append([r, aid, i])
except:
pass
printResult(outTable, "Region, AccountID, KeyId")
"""Check API gateway resource policy"""
printTitle(1, "ApiGateway service review")
printTitle(2, "[Security] ApiGateway resource policy missing")
printTitle(3, "Consider restricting access to private API with a "
"policy. Private Api should be accessed through Vpc endpoint and a policy ensures the Api cannot "
"be accessed otherwise. For more detail, see "
"https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-resource-policies-examples.html")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("apigateway")
response = client.get_rest_apis()
for i in response.get("items"):
if "PRIVATE" in i.get("endpointConfiguration").get("types") and len(i.get("policy")) <= 0:
outTable.append([r, aid, i.get("name")])
printResult(outTable, "Region, AccountID, PrivateApiName")
"""Check cloudtrail for encryption"""
printTitle(1, "Cloudtrail service review")
printTitle(2, "[Security] Cloudtrail not encrypted")
printTitle(3, "Consider enabling encryption for cloudtrail")
outTable = []
#for r in regions:
client = localSession.client('cloudtrail')
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("KmsKeyId") is None:
outTable.append([aid, i.get("TrailARN")])
printResult(outTable, "AccountID, Trail")
"""Check cloudtrail for multi-region logging"""
printTitle(2, "[Security] Multi-Region cloudtrail not enabled")
printTitle(3, "Consider enabling Multi-Region for at least 1 cloudtrail")
outTable = []
multiRegionTrailCount = 0
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("cloudtrail")
response = client.describe_trails()
for i in response.get("trailList"):
if i.get("IsMultiRegionTrail"):
multiRegionTrailCount += 1
if multiRegionTrailCount <= 0:
outTable.append([r, aid, "Missing multi region trail"])
printResult(outTable, "Region, AccountID, Status")
printTitle(1, "Vpc service review")
"""Check VPC flow log"""
printTitle(2, "[Security] VPC flow log not enabled")
printTitle(3, "Consider enabling VPC flowlog for audit purpose or delete the default VPCs if not in use")
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpcs()
vpc_ids = [vpc['VpcId'] for vpc in response['Vpcs']]
for vpc_id in vpc_ids:
flow_logs_response = client.describe_flow_logs(
Filters=[
{
'Name': 'resource-id',
'Values': [vpc_id]
}
]
)
flow_logs = flow_logs_response.get('FlowLogs', [])
if not flow_logs:
outTable.append([r, aid, vpc_id])
printResult(outTable, "Region, AccountID, VpcId")
"""Check default VPCs"""
printTitle(2, "[Security] Default VPCs should be removed")
printTitle(3, "Consider deleting the default VPCs if not in use")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpcs(
Filters=[{'Name': 'isDefault', 'Values': ['true']}]
)
vpc_ids = [vpc['VpcId'] for vpc in response['Vpcs']]
for vpc_id in vpc_ids:
outTable.append([r, aid, vpc_id])
printResult(outTable, "Region, AccountID, DefaultVpcId")
"""Check VPC endpoints"""
def check_vpc_endpoint(region: str):
fSession = botoSession(region=region)
fClient = fSession.client("ec2")
fResponse = fClient.describe_vpc_endpoints()
if len(fResponse['VpcEndpoints']) <= 0:
return [region, aid, "Vpc endpoint not found"]
else:
return None
printTitle(2, "[Security] Use of VPC endpoints")
printTitle(3, "Consider deploying VPC endpoints and connect to AWS api endpoints privately")
outTable = []
with concurrent.futures.ThreadPoolExecutor() as executor:
# Map returns results in the same order as regions
results = executor.map(check_vpc_endpoint, getAllRegions())
for result in results:
if result:
outTable.append(result)
printResult(outTable, "Region, AccountID, VpcEndpointCount")
"""Check VPN tunnels"""
printTitle(2, "[Reliability] Insufficient VPN tunnels")
printTitle(3, "Consider having 2 tunnels for each site VPN connection. "
"AWS performs VPN tunnel endpoint maintenance rather frequently. Having 2 tunnel reduces the risk "
"of service interruption.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("ec2")
response = client.describe_vpn_connections()
for i in response.get("VpnConnections"):
if len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i)) < 2:
outTable.append([r, aid, i.get("VpnConnectionId"),
len(jmespath.search("Options.TunnelOptions[*].OutsideIpAddress", i))])
printResult(outTable, "Region, AccountID, VpnConnection, TunnelCount")
"""Check CF-ALB which allows public access"""
printTitle(2, "[Security] Cloudfront origins allow public access")
printTitle(3, "Your ALB is exposed to public, which bypass edge and WAF protection. Consider restricting access from Cloudfront only")
outTable = []
client = localSession.client("elbv2")
response = client.describe_load_balancers()
alb_sgs = {}
for alb in response['LoadBalancers']:
dns_name = alb.get('DNSName')
security_groups = alb.get('SecurityGroups', [])
alb_sgs[dns_name] = security_groups
client = boto3.client('cloudfront')
distributions = client.list_distributions()
for dist in distributions.get('DistributionList', {}).get('Items', []):
origins = dist['Origins']['Items']
for origin in origins:
if re.match(".*elb.*", origin['DomainName']):
for sg in alb_sgs[origin['DomainName']]:
ec2client = boto3.client('ec2')
s = ec2client.describe_security_groups(GroupIds=[sg])['SecurityGroups'][0]
for source_ip in jmespath.search('IpPermissions[*].IpRanges[*].CidrIp', s):
if '0.0.0.0/0' in source_ip:
outTable.append([aid, dist['Id'], origin['DomainName'], sg])
break
printResult(outTable, "AccountID, CFDistribution, Origin, SecurityGroup")
"""EKS node OS version"""
printTitle(1, "Eks service review")
printTitle(2, "[Sustainability] Eks node running on AmazonLinux2 (AL2)")
printTitle(3, "Consider using AmazonLinux2023. "
"AL2's end of life date is 2025-06-30. AmazonLinux2023 runs on newer kernel and libraries, "
"which offers better performance and security.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("eks")
response = client.list_clusters()
for cluster in response.get("clusters"):
ngsResp = client.list_nodegroups(clusterName=cluster)
for ng in ngsResp.get("nodegroups"):
ngResp = client.describe_nodegroup(
clusterName=cluster,
nodegroupName=ng
)
if re.search("^AL2_", ngResp.get("nodegroup").get("amiType")):
outTable.append([r, aid, cluster, ng, ngResp.get("nodegroup").get("amiType")])
printResult(outTable, "Region, AccountID, Cluster, NodeGroup, AmiType")
"""Check outdated EKS control plane"""
printTitle(2, "[Sustainability] Eks control plane version outdated")
printTitle(3, "Consider using upgrading Eks cluster. "
"Reference https://docs.aws.amazon.com/eks/latest/userguide/kubernetes-versions.html for a list "
"of current versions. Reference https://docs.aws.amazon.com/eks/latest/userguide/update-cluster.html "
"for upgrade instructions.")
outTable = []
for r in getAllRegions():
newSession = botoSession(region=r)
client = newSession.client("eks")
response = client.list_clusters()
for cluster in response.get("clusters"):
clusterResp = client.describe_cluster(name=cluster)
if float(jmespath.search("cluster.version", clusterResp)) < 1.28:
outTable.append([r, aid, cluster, clusterResp.get("cluster").get("version")])
printResult(outTable, "Region, AccountID, Cluster, Version")
mdFile.create_md_file()
print("Report written to AwsReviewReport.md")