#!/usr/bin/python3 r""" Documentation License: This program is released under the MIT License """ # Imports import boto3 class AwsPrefixList: def __init__(self): ec2 = boto3.client('ec2') response = ec2.describe_managed_prefix_lists( Filters=[{'Name': 'prefix-list-name', 'Values': ['com.amazonaws.global.cloudfront.origin-facing']}] ) prefix_lists = response.get('PrefixLists', []) prefix_list_id = prefix_lists[0]['PrefixListId'] entries = [] paginator = ec2.get_paginator('get_managed_prefix_list_entries') for page in paginator.paginate(PrefixListId=prefix_list_id): entries.extend(page.get('Entries', [])) self.pl = [entry['Cidr'] for entry in entries] self.pl.sort() def getHash(self): return hash(tuple(self.pl)) def getTuple(self): return tuple(self.pl) def getLength(self): return len(self.pl) class WafIpSet: def __init__(self, name: str, id: str): waf_client = boto3.client('wafv2') temp = waf_client.get_ip_set( Name=name, Scope='REGIONAL', Id=id) self.ip_set = temp["IPSet"]["Addresses"] self.ip_set.sort() self.lock_token = temp['LockToken'] # need this to update ipset def getHash(self): return hash(tuple(self.ip_set)) def getTuple(self): return tuple(self.ip_set) def getLength(self): return len(self.ip_set) # Main function def main() -> None: pl = AwsPrefixList() ipset = WafIpSet(name="cloudfront_ip_ipset", id="951120be-31d7-415f-9aa3-5ad9e56b6195") print(f"PrefixList length: {pl.getLength()}") print(f"IpSet length: {ipset.getLength()}") # missing = set(pl.getTuple()) - set(ipset.getTuple()) # notInPl = set(ipset.getTuple()) - set(pl.getTuple()) # print(f"Missing in WAF ipset: {len(missing)}") # print(f"Not in PL: {len(notInPl)}") # Call main function if __name__ == '__main__': main()