Files
code-dumps/aws/s3-batch-restore.py

247 lines
7.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
# S3 Batch Restore Script
# Restores objects from S3 Glacier Deep Archive using AWS S3 Batch Operations.
# Generate objectlist.csv with the following script:
# BUCKET=whk1-bea-icc-mbk-prd-s3-log-infra-log
# PREFIX=elb/alb-icc-mbk/AWSLogs/851239346925/elasticloadbalancing/ap-east-1/2025/08/11/
# aws s3 ls s3://$BUCKET/$PREFIX | awk "{print \"$BUCKET,$PREFIX\"\$NF}" | tee /tmp/objectlist.csv
"""
import sys
import json
import time
import boto3
import random
from botocore.exceptions import ClientError
def generate_random_id():
"""Generate a random alphanumeric ID of specified length."""
return random.randint(1000, 9999)
def create_trust_policy():
"""Create the trust policy document for the IAM role."""
return {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "batchoperations.s3.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
def create_iam_role(iam_client, role_name="S3BatchRestoreRole"):
"""Create IAM role for S3 batch operations."""
trust_policy = create_trust_policy()
try:
# Create role
iam_client.create_role(
RoleName=role_name,
Description="S3 batch restore role",
AssumeRolePolicyDocument=json.dumps(trust_policy)
)
print(f"Created IAM role: {role_name}")
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print(f"IAM role {role_name} already exists, skipping creation")
else:
raise
# Attach policy
try:
iam_client.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess'
)
print(f"Attached policy to role: {role_name}")
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
print(f"Policy already attached to {role_name}")
else:
raise
return role_name
def create_manifest_bucket(s3_client, bucket_name, region):
"""Create S3 bucket for manifest file."""
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
print(f"Created manifest bucket: {bucket_name}")
def upload_manifest(s3_client, bucket_name, manifest_file_path, object_key="objectlist.csv"):
"""Upload manifest file to S3 and return ETag."""
try:
with open(manifest_file_path, 'rb') as f:
response = s3_client.put_object(
Bucket=bucket_name,
Key=object_key,
Body=f
)
print(f"Uploaded manifest to s3://{bucket_name}/{object_key}")
# Get ETag (remove quotes if present)
etag = response['ETag'].strip('"')
return etag
except FileNotFoundError:
print(f"Error: Manifest file '{manifest_file_path}' not found", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error uploading manifest: {e}", file=sys.stderr)
sys.exit(1)
def create_manifest_spec(bucket_name, etag, object_key="objectlist.csv"):
"""Create manifest specification for batch operation."""
return {
"Spec": {
"Format": "S3BatchOperations_CSV_20180820",
"Fields": ["Bucket", "Key"]
},
"Location": {
"ObjectArn": f"arn:aws:s3:::{bucket_name}/{object_key}",
"ETag": etag
}
}
def create_report_spec(bucket_name, prefix="batch-reports"):
"""Create report specification for batch operation."""
return {
"Bucket": f"arn:aws:s3:::{bucket_name}",
"Prefix": prefix,
"Format": "Report_CSV_20180820",
"Enabled": True,
"ReportScope": "AllTasks"
}
def create_batch_job(s3control_client, account_id, role_arn, manifest_spec, report_spec,
expiration_days=14, glacier_job_tier="STANDARD", priority=10,
description="Restore objects from Deep Archive"):
"""Create S3 batch restore job."""
operation = {
"S3InitiateRestoreObject": {
"ExpirationInDays": expiration_days,
"GlacierJobTier": glacier_job_tier
}
}
try:
response = s3control_client.create_job(
AccountId=account_id,
Operation=operation,
Manifest=manifest_spec,
Report=report_spec,
Priority=priority,
RoleArn=role_arn,
Description=description
)
job_id = response['JobId']
print(f"Submitted S3 batch job: {job_id}")
return job_id
except ClientError as e:
print(f"Error creating batch job: {e}", file=sys.stderr)
sys.exit(1)
def approve_job(s3control_client, account_id, job_id) -> bool:
"""Approve the batch job to start execution."""
try:
s3control_client.update_job_status(
AccountId=account_id,
JobId=job_id,
RequestedJobStatus='Ready'
)
print(f"Approved job: {job_id}")
return True
except ClientError as e:
print(f"Error approving job: {e}", file=sys.stderr)
return False
def get_account_id(sts_client):
"""Get AWS account ID."""
try:
response = sts_client.get_caller_identity()
return response['Account']
except ClientError as e:
print(f"Error getting account ID: {e}", file=sys.stderr)
sys.exit(1)
def main():
if len(sys.argv) < 2:
print("Usage: python3 s3-batch-restore.py <manifest.csv>", file=sys.stderr)
print("You must first prepare the manifest, which is a csv with content <bucket>:<key>", file=sys.stderr)
sys.exit(1)
manifest_file = sys.argv[1]
# Initialize AWS clients
region = "ap-east-1"
session = boto3.Session(region_name=region)
iam_client = session.client('iam')
s3_client = session.client('s3')
s3control_client = session.client('s3control')
sts_client = session.client('sts')
# Get account ID
account_id = get_account_id(sts_client)
print(f"Using AWS account: {account_id}")
# Create IAM role
role_name = create_iam_role(iam_client)
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
# Create manifest bucket
random_id = generate_random_id()
manifest_bucket = f"deep-archive-batch-restore-{random_id}"
create_manifest_bucket(s3_client, manifest_bucket, session.region_name)
# Upload manifest and get ETag
etag = upload_manifest(s3_client, manifest_bucket, manifest_file)
# Create manifest and report specs (in memory, no temp files)
manifest_spec = create_manifest_spec(manifest_bucket, etag)
report_spec = create_report_spec(manifest_bucket)
# Create batch job
print("Submitting S3 batch job...")
job_id = create_batch_job(
s3control_client,
account_id,
role_arn,
manifest_spec,
report_spec
)
# Wait a bit before approving
time.sleep(5)
# Approve job
print(f"Approving submitted job {job_id}...")
while not approve_job(s3control_client, account_id, job_id):
time.sleep(5)
print(f"\nReview s3 batch job status. When it is completed, delete the manifest bucket:")
print(f"aws s3 rb s3://{manifest_bucket} --force")
if __name__ == "__main__":
main()