1
0

initial commit

This commit is contained in:
xpk
2026-02-13 15:44:24 +08:00
parent 66be8224f4
commit 09ce4c881a
570 changed files with 61807 additions and 0 deletions
+17
View File
@@ -0,0 +1,17 @@
*.tfstate.backup
*.backup
*.tfstate
*.tfstate.lock
**/*.tfstate
**/*.backup
.terraform/
.DS_Store
*.iml
.idea
.terraform.lock.hcl
*.log
examples/
experimental/
headdesk-aws/
vsphere-yige/
anz-sandbox/
+16
View File
@@ -0,0 +1,16 @@
def lambda_handler(event, context):
# Extract query parameters from the event
params = event.get('queryStringParameters', {})
# Print all query parameters
print("Received query parameters:", params)
# Example: If you want to print a specific parameter, e.g., 'param1'
if params and 'inputValue' in params:
print("Value of 'inputValue':", params['inputValue'])
# You can return the input parameters as response if needed
return {
'statusCode': 200,
'body': f"Received parameters: {params}"
}
+74
View File
@@ -0,0 +1,74 @@
<!-- This readme file is generated with terraform-docs -->
# ApigwAuthSample
A working example which deploys HTTP api, Lambda functions, and necessary permissions.
## Testing the API
To test this in postman, put in the following settings:
URL: https://<api-id>.execute-api.ap-east-1.amazonaws.com/?inputValue=TestMessage123
Authorization: api key, key = Authorizations, value = sha256 hash, add to = Header
## Requirements
| Name | Version |
|------|---------|
| terraform | ~> 1.13.0 |
| aws | ~> 5.0 |
## Providers
| Name | Version |
|------|---------|
| archive | 2.7.1 |
| aws | 5.100.0 |
| random | 3.7.2 |
## Modules
No modules.
## Resources
| Name | Type |
|------|------|
| [aws_apigatewayv2_api.SampleHttpApi](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/apigatewayv2_api) | resource |
| [aws_apigatewayv2_deployment.deployment](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/apigatewayv2_deployment) | resource |
| [aws_apigatewayv2_stage.stage1](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/apigatewayv2_stage) | resource |
| [aws_cloudwatch_log_group.api_logging](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_cloudwatch_log_group.loggroups](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_iam_role.role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource |
| [aws_iam_role_policy_attachment.role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_lambda_function.EchoFunction](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
| [aws_lambda_function.SampleAuthorizer](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
| [aws_lambda_permission.EchoFunction](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_permission) | resource |
| [aws_lambda_permission.SampleAuthorizer](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_permission) | resource |
| [random_password.pw](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/password) | resource |
| [archive_file.EchoFunction](https://registry.terraform.io/providers/hashicorp/archive/latest/docs/data-sources/file) | data source |
| [archive_file.SampleAuthorizer](https://registry.terraform.io/providers/hashicorp/archive/latest/docs/data-sources/file) | data source |
| [aws_caller_identity.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source |
| [aws_iam_policy_document.lambda_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
## Inputs
| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| DynamicAddressGroup | n/a | `any` | n/a | yes |
| application | n/a | `any` | n/a | yes |
| aws-region | n/a | `any` | n/a | yes |
| costcenter | n/a | `any` | n/a | yes |
| customer-name | n/a | `any` | n/a | yes |
| environment | n/a | `any` | n/a | yes |
| owner | n/a | `any` | n/a | yes |
| project | n/a | `any` | n/a | yes |
## Outputs
| Name | Description |
|------|-------------|
| api\_deployment\_id | n/a |
| api\_endpoint | n/a |
| last-updated | n/a |
---
## Authorship
This module was developed by Rackspace.
+55
View File
@@ -0,0 +1,55 @@
import hashlib
import os
#region = os.environ['region']
#account_id = os.environ['account_id']
#api_id = os.environ['api_id']
pw_hash = os.environ['pw_hash']
#resource_arn = f"arn:aws:execute-api:{region}:{account_id}:{api_id}:/*/*/" # based on observed routeArn in event
def lambda_handler(event, context):
# debug
# print(f"Event received: {event}")
# print(f"resource_arn: {resource_arn}")
# Extract the token from headers
token = event['headers'].get('authorization', '')
# Check token validity
is_authorized = token == pw_hash
# Log for debugging
print(f"Authorization status: {is_authorized}. Authorization token: {'*' * len(token)}")
# Simple response
return {
"isAuthorized" : is_authorized
}
# IAM policy response, which is overkilled with no added benefit
# to use IAM policy response, your api needs to have "enableSimpleResponses" : false
# if is_authorized:
# return {
# "principalId" : "demo",
# "policyDocument": {
# "Version": "2012-10-17",
# "Statement": [{
# "Action": "execute-api:Invoke",
# "Effect": "Allow",
# "Resource": event["routeArn"]
# }]
# }
# }
# else:
# return {
# "principalId" : "demo",
# "policyDocument": {
# "Version": "2012-10-17",
# "Statement": [{
# "Action": "*",
# "Effect": "Deny",
# "Resource": "*"
# }]
# }
# }
+43
View File
@@ -0,0 +1,43 @@
{
"openapi" : "3.0.1",
"paths" : {
"/" : {
"get" : {
"responses" : {
"default" : {
"description" : "Default response for GET /"
}
},
"security" : [ {
"SampleAuthorizer" : [ ]
} ],
"x-amazon-apigateway-integration" : {
"payloadFormatVersion" : "2.0",
"type" : "aws_proxy",
"httpMethod" : "POST",
"uri" : "arn:aws:apigateway:ap-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:ap-east-1:040216112220:function:EchoFunction/invocations",
"connectionType" : "INTERNET"
}
}
}
},
"components" : {
"securitySchemes" : {
"SampleAuthorizer" : {
"type" : "apiKey",
"name" : "Authorization",
"in" : "header",
"x-amazon-apigateway-authorizer" : {
"identitySource" : "$request.header.Authorization",
"authorizerUri" : "arn:aws:apigateway:ap-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:ap-east-1:040216112220:function:SampleAuthorizer/invocations",
"authorizerPayloadFormatVersion" : "2.0",
"authorizerResultTtlInSeconds" : 0,
"type" : "request",
"enableSimpleResponses" : true
}
}
}
},
"x-amazon-apigateway-importexport-version" : "1.0"
}
+170
View File
@@ -0,0 +1,170 @@
/**
* # ApigwAuthSample
* A working example which deploys HTTP api, Lambda functions, and necessary permissions.
*
*
* ## Testing the API
* To test this in postman, put in the following settings:
*
* URL: https://<api-id>.execute-api.ap-east-1.amazonaws.com/?inputValue=TestMessage123
* Authorization: api key, key = Authorizations, value = sha256 hash, add to = Header
*
*/
# IAM role for Lambda execution
data "aws_iam_policy_document" "lambda_role" {
statement {
effect = "Allow"
principals {
type = "Service"
identifiers = ["lambda.amazonaws.com"]
}
actions = ["sts:AssumeRole"]
}
}
resource "aws_iam_role" "role" {
name = "ApiFunctionRole"
assume_role_policy = data.aws_iam_policy_document.lambda_role.json
}
resource "aws_iam_role_policy_attachment" "role" {
role = aws_iam_role.role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
data "archive_file" "EchoFunction" {
type = "zip"
source_file = "${path.module}/EchoFunction.py"
output_path = "${path.module}/EchoFunction.zip"
}
resource "aws_lambda_function" "EchoFunction" {
filename = data.archive_file.EchoFunction.output_path
function_name = "EchoFunction"
description = "Function that echo query parameter inputValue"
role = aws_iam_role.role.arn
handler = "EchoFunction.lambda_handler"
source_code_hash = data.archive_file.EchoFunction.output_base64sha256
architectures = ["arm64"]
runtime = "python3.13"
}
resource "aws_lambda_permission" "EchoFunction" {
statement_id = "AllowExecutionFromApi"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.EchoFunction.function_name
principal = "apigateway.amazonaws.com"
source_arn = "arn:aws:execute-api:${var.aws-region}:${data.aws_caller_identity.this.account_id}:${aws_apigatewayv2_api.SampleHttpApi.id}/*/*"
}
data "archive_file" "SampleAuthorizer" {
type = "zip"
source_file = "${path.module}/SampleAuthorizer.py"
output_path = "${path.module}/SampleAuthorizer.zip"
}
/* Test function with this input
{
"routeArn": "arn:aws:execute-api:ap-east-1:040216112220:wxzvfmiyd2/$default/GET/"
"headers": {
"authorization": "value of pw_hash"
}
}
*/
resource "random_password" "pw" {
length = 20
min_upper = 2
min_lower = 2
min_numeric = 2
min_special = 2
}
resource "aws_lambda_function" "SampleAuthorizer" {
filename = data.archive_file.SampleAuthorizer.output_path
function_name = "SampleAuthorizer"
description = "API authorizer"
role = aws_iam_role.role.arn
handler = "SampleAuthorizer.lambda_handler"
source_code_hash = data.archive_file.SampleAuthorizer.output_base64sha256
architectures = ["arm64"]
runtime = "python3.13"
environment {
variables = {
region = var.aws-region
account_id = data.aws_caller_identity.this.account_id
api_id = aws_apigatewayv2_api.SampleHttpApi.id
pw_hash = sha256(random_password.pw.result)
}
}
}
resource "aws_lambda_permission" "SampleAuthorizer" {
statement_id = "AllowExecutionFromApi"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.SampleAuthorizer.function_name
principal = "apigateway.amazonaws.com"
source_arn = "arn:aws:execute-api:${var.aws-region}:${data.aws_caller_identity.this.account_id}:${aws_apigatewayv2_api.SampleHttpApi.id}/*/*"
}
resource "aws_cloudwatch_log_group" "loggroups" {
for_each = toset(["SampleAuthorizer", "EchoFunction"])
name = "/aws/lambda/${each.value}"
retention_in_days = 1
}
# api
resource "aws_apigatewayv2_api" "SampleHttpApi" {
name = "SampleHttpApi"
protocol_type = "HTTP"
description = "Sample http api which uses Lambda integration"
ip_address_type = "ipv4"
body = file("api_body.json")
}
resource "aws_cloudwatch_log_group" "api_logging" {
name = "/aws/api/SampleHttpApi"
retention_in_days = 1
}
resource "aws_apigatewayv2_stage" "stage1" {
api_id = aws_apigatewayv2_api.SampleHttpApi.id
name = "$default"
description = "Default environment"
deployment_id = aws_apigatewayv2_deployment.deployment.id
access_log_settings {
destination_arn = aws_cloudwatch_log_group.api_logging.arn
format = jsonencode(
{
"requestId" : "$context.requestId",
"ip" : "$context.identity.sourceIp",
"requestTime" : "$context.requestTime",
"httpMethod" : "$context.httpMethod",
"routeKey" : "$context.routeKey",
"status" : "$context.status",
"protocol" : "$context.protocol",
"responseLength" : "$context.responseLength",
"AuthorizerError" : "$context.authorizer.error"
}
)
}
}
resource "aws_apigatewayv2_deployment" "deployment" {
api_id = aws_apigatewayv2_api.SampleHttpApi.id
description = "Triggered by terraform"
triggers = {
redeployment = timestamp()
}
lifecycle {
create_before_destroy = true
}
}
+7
View File
@@ -0,0 +1,7 @@
output "api_endpoint" {
value = aws_apigatewayv2_api.SampleHttpApi.api_endpoint
}
output "api_deployment_id" {
value = aws_apigatewayv2_deployment.deployment.id
}
+31
View File
@@ -0,0 +1,31 @@
provider "aws" {
region = var.aws-region
default_tags {
tags = {
ServiceProvider = "RackspaceTechnology"
Environment = var.environment
Project = var.project
Application = var.application
TerraformMode = "managed"
Owner = var.owner
CostCenter = var.costcenter
DynamicAddressGroup = var.DynamicAddressGroup
TerraformDir = "${reverse(split("/", path.cwd))[1]}/${reverse(split("/", path.cwd))[0]}"
}
}
}
output "last-updated" {
value = timestamp()
}
terraform {
required_version = "~> 1.13.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
+8
View File
@@ -0,0 +1,8 @@
aws-region = "ap-east-1"
customer-name = "ken2026"
environment = "lab"
project = "iac"
application = "api"
costcenter = "undefined"
DynamicAddressGroup = "undefined"
owner = "ken2026"
+10
View File
@@ -0,0 +1,10 @@
variable "aws-region" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
variable "owner" {}
variable "costcenter" {}
variable "DynamicAddressGroup" {}
data "aws_caller_identity" "this" {}
+8
View File
@@ -0,0 +1,8 @@
# LambdaPyZip
This layer uses the ```python_aws_lambda``` data source, which creates zip archives with the following inputs
- source/function.py
- source/requirements.txt
Function.py contains the lambda handler, while requirements.txt states the dependencies. This datasource will run
pip install and generate zip archives in the output directory.
+27
View File
@@ -0,0 +1,27 @@
terraform {
required_providers {
python = {
source = "ATenderholt/python"
version = "0.9.2"
}
}
}
provider "python" {
pip_command = "pip3"
}
data "python_aws_lambda" "example" {
source_dir = "source"
archive_path = "output/handler.zip"
dependencies_path = "output/dependencies.zip"
extra_args = "--only-binary=:all:"
}
output lib_sum {
value = data.python_aws_lambda.example.dependencies_base64sha256
}
output function_sum {
value = data.python_aws_lambda.example.archive_base64sha256
}
+8
View File
@@ -0,0 +1,8 @@
# reference: https://aws.amazon.com/premiumsupport/knowledge-center/start-stop-lambda-eventbridge/
import requests
def lambda_handler(event, context):
r = requests.get('https://ipinfo.io/')
return {
"HttpResponseCode": r.status_code
}
+2
View File
@@ -0,0 +1,2 @@
dnspython==2.7.0
requests
+1
View File
@@ -1,2 +1,3 @@
# terraform.examples
Terraform code examples
+7
View File
@@ -0,0 +1,7 @@
# bea-adc
Module to deploy network resources and ad connector for use with AWS SSO
## Input variables
The variable adc-service-account-password needs to be supplied via environment variable. This prevents terraform
from saving the password in tfstate or in the source code.
+15
View File
@@ -0,0 +1,15 @@
data "aws_caller_identity" "this" {}
locals {
default-tags = merge({
ServiceProvider = "None"
Environment = var.environment
Project = var.project
Application = var.application
TerraformMode = "managed"
TerraformDir = trimprefix(path.cwd, "/my/work/xpk-git/")
CreatedBy = data.aws_caller_identity.this.arn
BuildDate = formatdate("YYYYMMDD", timestamp())
})
resource-prefix = "${var.environment}-${var.aws-region-short}-${var.customer-name}-${var.project}"
}
+48
View File
@@ -0,0 +1,48 @@
module "vpc-subnets" {
source = "../../modules/networking/vpc_subnets"
application = var.application
aws-region = var.aws-region
customer-name = var.customer-name
default-tags = local.default-tags
environment = var.environment
project = var.project
vpc-cidr = var.vpc-cidr
number-of-private-subnets-per-az = var.number-of-private-subnets-per-az
number-of-public-subnets-per-az = var.number-of-public-subnets-per-az
create-nat-gateway = false
enable-flow-log = true
vpcflowlog-retain-days = 90
vpcflowlog-cwl-loggroup-key-arn = ""
create-free-vpc-endpoints = false
}
# S3 flow log needs to be created separately. it's not supported by vpc_subnets module
resource "aws_flow_log" "vpc-log-s3" {
log_destination = var.vpc-flowlog-bucket-arn
log_destination_type = "s3"
traffic_type = "ALL"
vpc_id = module.vpc-subnets.vpc_id
}
/*
After adc is deployed by terraform, the following tasks need to be performed manually.
They cannot be managed by terraform
1. Edit security group created for adconnector. SG name is d-???_controllers
2. Enable client LDAPS communication
3. Setup maintenance notification through SNS
4. Enable SSO application. Setting enable_sso in member account results in error. alias is deliberately not set
*/
module "adconnector" {
source = "../../modules/security_identity_compliance/ds-adconnector"
adc-dns-ips = var.adc-dns-ips
adc-domainname = var.adc-domainname
adc-service-account-password = var.adc-service-account-password
adc-service-account-username = var.adc-service-account-username
adc-size = var.adc-size
adc-subnet-ids = module.vpc-subnets.private-subnet-ids
adc-vpc-id = module.vpc-subnets.vpc_id
default-tags = local.default-tags
}
+11
View File
@@ -0,0 +1,11 @@
output "directory-id" {
value = module.adconnector.directory-id
}
output "security-group-id" {
value = module.adconnector.security-group-id
}
output "customer-dns-ip" {
value = module.adconnector.customer-dns-ip
}
+13
View File
@@ -0,0 +1,13 @@
provider "aws" {
region = var.aws-region
}
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 3.25"
}
}
}
+15
View File
@@ -0,0 +1,15 @@
aws-region = "ap-east-1"
aws-region-short = "ape1"
customer-name = "acme"
environment = "preview"
project = "sso"
application = "sso"
vpc-cidr = "10.37.54.0/24"
number-of-public-subnets-per-az = 0
number-of-private-subnets-per-az = 1
vpc-flowlog-bucket-arn = "arn:aws:s3:::prd-vpc-flow-logs-894849410890"
adc-domainname = "acme.com"
adc-size = "Large"
adc-dns-ips = ["10.135.72.66", "10.135.72.67"]
adc-service-account-username = "AWSSSOPRD"
adc-enable-sso = true
+22
View File
@@ -0,0 +1,22 @@
variable "aws-region" {}
variable "aws-region-short" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
variable "vpc-cidr" {}
variable "number-of-private-subnets-per-az" {}
variable "number-of-public-subnets-per-az" {}
variable vpc-flowlog-bucket-arn {}
variable "adc-domainname" {}
variable "adc-size" {}
variable "adc-dns-ips" {}
variable "adc-service-account-username" {}
variable "adc-service-account-password" {
type = string
sensitive = true
description = "Please supply ad svc account with environment variable (i.e. export TG_VAR_adc-service-account-password=xxx"
default = ""
}
variable "adc-enable-sso" {}
+15
View File
@@ -0,0 +1,15 @@
data "aws_caller_identity" "this" {}
locals {
default-tags = merge({
ServiceProvider = "None"
Environment = var.environment
Project = var.project
Application = var.application
TerraformMode = "managed"
TerraformDir = trimprefix(path.cwd, "/my/work/xpk-git/")
CreatedBy = data.aws_caller_identity.this.arn
BuildDate = formatdate("YYYYMMDD", timestamp())
})
resource-prefix = "${var.environment}-${var.aws-region-short}-${var.customer-name}-${var.project}"
}
+28
View File
@@ -0,0 +1,28 @@
module sso {
source = "../../modules/security_identity_compliance/sso-permissionsets"
for_each = { for item in local.items : item.name => item }
default-tags = local.default-tags
pset-name = each.value.name
pset-desc = each.value.desc
pset-managed-policy-arn = each.value.mpolicy
pset-session-duration = each.value.session
}
locals {
csv_data = <<-CSV
name,desc,mpolicy,session
ViewOnly,View only access,arn:aws:iam::aws:policy/job-function/ViewOnlyAccess,PT4H
ReadOnly,Read only access,arn:aws:iam::aws:policy/ReadOnlyAccess,PT4H
FullAccess,Full admin access,arn:aws:iam::aws:policy/AdministratorAccess,PT4H
NetworkAdmin,Network admin access,arn:aws:iam::aws:policy/job-function/NetworkAdministrator,PT4H
DatabaseAdmin,Database admin access,arn:aws:iam::aws:policy/job-function/DatabaseAdministrator,PT4H
BillingAdmin,Billing admin access,arn:aws:iam::aws:policy/job-function/Billing,PT4H
SecurityAudit,Security admin access,arn:aws:iam::aws:policy/SecurityAudit,PT4H
PowerUser,Full access excluding IAM,arn:aws:iam::aws:policy/PowerUserAccess,PT4H
CSV
items = csvdecode(local.csv_data)
}
+13
View File
@@ -0,0 +1,13 @@
provider "aws" {
region = var.aws-region
}
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 3.25"
}
}
}
+64
View File
@@ -0,0 +1,64 @@
data "aws_ssoadmin_instances" "sso1" {}
locals {
csv_data2 = <<-CSV
username,email,lastName,firstName
user1,user1@acme.local,Doe,John
user2,user2@acme.local,Smith,Jane
CSV
users = csvdecode(local.csv_data2)
}
resource "aws_identitystore_user" "sso-user" {
for_each = { for item in local.users : item.username => item }
identity_store_id = tolist(data.aws_ssoadmin_instances.sso1.identity_store_ids)[0]
display_name = "${each.value.firstName} ${each.value.lastName}"
user_name = each.value.username
nickname = each.value.username
emails {
primary = true
value = each.value.email
}
name {
family_name = each.value.lastName
given_name = each.value.firstName
}
}
resource "aws_identitystore_group" "sso-group" {
identity_store_id = tolist(data.aws_ssoadmin_instances.sso1.identity_store_ids)[0]
display_name = "Viewers"
description = "Users with view permission"
}
resource "aws_identitystore_group_membership" "sso-group-membership" {
for_each = aws_identitystore_user.sso-user
identity_store_id = tolist(data.aws_ssoadmin_instances.sso1.identity_store_ids)[0]
group_id = aws_identitystore_group.sso-group.group_id
member_id = each.value.user_id
}
locals {
csv_data3 = <<-CSV
seq,groupName,permission,accountId
1,Viewers,ViewOnly,865184416664
2,Viewers,ViewOnly,572802010687
CSV
accounts = csvdecode(local.csv_data3)
}
resource "aws_ssoadmin_account_assignment" "pset-assignment" {
for_each = { for item in local.accounts : item.seq => item }
instance_arn = tolist(data.aws_ssoadmin_instances.sso1.arns)[0]
permission_set_arn = module.sso[each.value.permission].pset-arn
principal_id = aws_identitystore_group.sso-group.group_id
principal_type = "GROUP"
target_id = each.value.accountId
target_type = "AWS_ACCOUNT"
}
+7
View File
@@ -0,0 +1,7 @@
aws-region = "ap-east-1"
aws-region-short = "ape1"
customer-name = "acme"
environment = "preview"
project = "security"
application = "sso"
+6
View File
@@ -0,0 +1,6 @@
variable "aws-region" {}
variable "aws-region-short" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
+40
View File
@@ -0,0 +1,40 @@
module "aws-backup" {
source = "../../modules/storage/aws-backup"
daily-backup-cron = var.daily-backup-cron
monthly-backup-cron = var.monthly-backup-cron
daily-backup-retention = var.daily-backup-retention
monthly-backup-retention = var.monthly-backup-retention
service-opt-in = {
"Aurora" : {
enabled = false
}
"DynamoDB" : {
enabled = true
}
"EBS" : {
enabled = false
}
"EC2" : {
enabled = true
}
"EFS" : {
enabled = true
}
"FSx" : {
enabled = false
}
"Redshift" : {
enabled = true
}
"RDS" : {
enabled = true
}
"VirtualMachine" : {
enabled = false
}
"S3" : {
enabled = false
}
}
}
+23
View File
@@ -0,0 +1,23 @@
provider "aws" {
region = var.aws-region
default_tags {
tags = {
ServiceProvider = "RackspaceTechnology"
Environment = var.environment
Project = var.project
Application = var.application
Owner = var.owner
TerraformDir = "${reverse(split("/", path.cwd))[1]}/${reverse(split("/", path.cwd))[0]}"
}
}
}
terraform {
required_version = ">= 1.3.9"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
+11
View File
@@ -0,0 +1,11 @@
aws-region = "ap-east-1"
customer-name = "ken2026"
environment = "dev"
project = "iac"
application = "backup"
owner = "ken2026"
daily-backup-retention = 31
daily-backup-cron = "cron(0 20 * * ? *)"
monthly-backup-retention = 365
monthly-backup-cron = "cron(0 20 1 * ? *)"
# cron(Minutes Hours Day-of-month Month Day-of-week Year)
+11
View File
@@ -0,0 +1,11 @@
variable "aws-region" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
variable "owner" {}
variable "daily-backup-retention" {}
variable "daily-backup-cron" {}
variable "monthly-backup-retention" {}
variable "monthly-backup-cron" {}
+12
View File
@@ -0,0 +1,12 @@
# Root module for creating baseline resources including:
- iam password policy
- delete default VPCs in all region
- create cloudtrail
- enable aws config in all region
- enable guardduty
- enable securityhub
- disable s3 public access
- require EBS encryption
## If AWS organisation is in use
If you are using AWS organisation, setup delegated admin for guardduty and securityhub. This allows centralised management.
+51
View File
@@ -0,0 +1,51 @@
module "iam-baseline" {
# iam password policy, baseline roles, access analyzer, cloudhealth role
source = "../../modules/security_identity_compliance/roles_iam_resources"
customer-name = var.customer-name
default-tags = local.default-tags
create-cloudhealth-resources = false
}
module "cloudtrail" {
# Create cloudtrail
source = "../../modules/security_identity_compliance/cloudtrail_cwlogs"
resource-prefix = local.resource-prefix
default-tags = local.default-tags
}
module "delete-default-vpcs" {
# delete default VPCs in all regions
source = "../../modules/networking/delete-default-vpcs"
}
module "enable-aws-config" {
# enable aws config in all regions and setup aggregation
source = "../../modules/security_identity_compliance/aws_config"
resource-prefix = local.resource-prefix
default-tags = local.default-tags
}
module "enable-guardduty" {
/* enable guardduty
If you are using AWS organisation, GD delegated admin should be configured
on the landing zone security account. This allows centralised management.
See https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_settingup.html
*/
source = "../../modules/security_identity_compliance/guardduty"
default-tags = local.default-tags
}
module "enable-securityhub" {
/* enable security hub
If you are using AWS organisation, SH deleted admin should be configured
on the landing zone security account. This allows centralised management.
https://docs.aws.amazon.com/securityhub/latest/userguide/designate-orgs-admin-account.html
*/
source = "../../modules/security_identity_compliance/security_hub"
}
module "default-account-settings" {
# other default account settings
source = "../../modules/security_identity_compliance/other-default-settings"
}
+13
View File
@@ -0,0 +1,13 @@
provider "aws" {
region = var.aws-region
}
terraform {
required_version = "~> 1.2.5"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 3.75.2"
}
}
}
+5
View File
@@ -0,0 +1,5 @@
aws-region = "ap-southeast-1"
customer-name = "ken2026"
environment = "lab"
project = "terraform-dev"
application = "infra"
+19
View File
@@ -0,0 +1,19 @@
variable "aws-region" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
locals {
default-tags = {
ServiceProvider = "RackspaceTechnology"
Environment = var.environment
Project = var.project
Application = var.application
TerraformMode = "managed"
TerraformDir = trimprefix(path.cwd, "/my/work/xpk-git/")
BuildDate = formatdate("YYYYMMDD", timestamp())
}
resource-prefix = "${var.environment}-substr(${var.aws-region},0,2)-${var.customer-name}-${var.project}"
}
+39
View File
@@ -0,0 +1,39 @@
module "deployer-ec2" {
source = "../../modules/compute/ec2"
additional_tags = { "Backup" : "None" }
# ami-id = "ami-072e4595d41025d94"
ami-id = data.aws_ami.ami-lookup.id
default-tags = local.default-tags
ebs-encrypted = true
asso-eip = false
instance-name = "rackspace-deployer-ec2-test"
instance-type = "t3.micro"
key-name = "whk1-ec2-key-555344966285"
asso-public-ip = false
root-volume-size = 15
security-groups = ["sg-03282995027b7a9fc"]
subnet-id = "subnet-07e4392828a70b1f9"
instance-profile = "TerraformRole"
}
data "aws_ami" "ami-lookup" {
most_recent = true
filter {
name = "name"
values = ["CIS Amazon Linux 2 Kernel 5.10*"]
}
filter {
name = "virtualization-type"
values = ["hvm"]
}
filter {
name = "architecture"
values = ["x86_64"]
}
owners = ["211372476111"] # CIS
}
+8
View File
@@ -0,0 +1,8 @@
aws-region = "ap-southeast-1"
customer-name = "bea"
environment = "dev"
project = "iac"
application = "terraform"
CostCenter = "none"
DynamicAddressGroup = ""
Owner = "Rackspace"
+25
View File
@@ -0,0 +1,25 @@
variable "aws-region" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
variable "owner" {}
variable "costcenter" {}
variable "DynamicAddressGroup" {}
locals {
default-tags = {
ServiceProvider = "RackspaceTechnology"
Environment = var.environment
Project = var.project
Application = var.application
TerraformMode = "managed"
BuildDate = formatdate("YYYYMMDD", timestamp())
Owner = var.owner
CostCenter = var.costcenter
DynamicAddressGroup = var.DynamicAddressGroup
}
resource-prefix = "${var.environment}-substr(${var.aws-region},0,2)-${var.customer-name}-${var.project}"
}
+64
View File
@@ -0,0 +1,64 @@
# Post-install steps
## Create lbc service account
kubectl apply -f 1-lbc.yaml
## Install AWS Load Balancer Controller in EKS
helm repo add eks https://aws.github.io/eks-charts
helm repo update
helm install aws-load-balancer-controller eks/aws-load-balancer-controller \
-n kube-system \
--set clusterName=xpk-eks01-sunbird \
--set serviceAccount.create=false \
--set serviceAccount.name=aws-load-balancer-controller-sa
kubectl -n kube-system get deployment aws-load-balancer-controller
kubectl logs -n kube-system deployment/aws-load-balancer-controller -f
## Allow web traffic to nodes
Port 80 needs to be allowed on eks node's SGs. Then ALB can successfully register targets. This is now done in main.tf.
## Testing
ALB correctly sending traffic to nginx pods!
```bash
curl k8s-default-nginxing-a42064aa7e-1786392641.ap-east-1.elb.amazonaws.com
<h1>Web Server nginx-web-f5988bf66-9lghc - Unique ID: </h1><p>Deployed on EKS Wed Feb 11 09:46:41 UTC 2026</p>
curl k8s-default-nginxing-a42064aa7e-1786392641.ap-east-1.elb.amazonaws.com
<h1>Web Server nginx-web-f5988bf66-6ptff - Unique ID: </h1><p>Deployed on EKS Wed Feb 11 09:46:41 UTC 2026</p>
curl k8s-default-nginxing-a42064aa7e-1786392641.ap-east-1.elb.amazonaws.com
<h1>Web Server nginx-web-f5988bf66-tw6rr - Unique ID: </h1><p>Deployed on EKS Wed Feb 11 09:46:45 UTC 2026</p>
```
## Notes on IPv6
EKS could not be deployed on ipv6-only private subnets. It appears AWS requires at least 2 free IPv4 addresses in the subnet.
I tried and the following error was returned.
```
Error: creating EKS Cluster (xpk-eks01-akita): operation error EKS: CreateCluster, https response error StatusCode: 400,
RequestID: b25794cc-3220-4393-a435-c92e2f8aafdd, InvalidParameterException: Atleast one subnet in each AZ should have 2 free IPs.
Invalid AZs: { [ap-east-1c, ap-east-1b] }, provided subnets: { subnet-02aaf75a3e4700f74, subnet-02071b29e2883d5b1 }
```
## Notes on KMS key
I tried using aws-managed key for EKS, but it failed to deploy with an error.
```hcl
encryption_config = {
provider_key_arn = "arn:aws:kms:${data.aws_region.this.id}:${data.aws_caller_identity.current.account_id}:alias/aws/secretsmanager"
resources = ["secrets"]
}
```
```
Error: creating EKS Cluster (xpk-eks01-vervet): operation error EKS: CreateCluster, https response error StatusCode: 400, RequestID:
0b866e07-352a-439c-9196-f7a671bdd0ee, api error InvalidRequestException: User not authorized to perform kms:CreateGrant operation
```
When I used ```create_kms_key = true```, EKS was created successfully. I can see that the EKS cluster role is explicitly allowed
in the key policy.
+146
View File
@@ -0,0 +1,146 @@
module "BastionRole" {
source = "../../modules/security_identity_compliance/iam-role-v2"
description = "EKS bastion instance profile"
role-name = "BastionInstanceProfile"
trusted-entity = "ec2.amazonaws.com"
create-instance-profile = true
policies = {
EksAdmin = {
description = "Eks read permissions required for kubectl"
policy = jsonencode(
{
"Statement" : [
{
"Sid" : "EksRead",
"Action" : [
"eks:Describe*",
"eks:List*"
],
"Effect" : "Allow",
"Resource" : "*"
}
],
"Version" : "2012-10-17"
}
)
}
}
}
resource "aws_iam_role_policy_attachment" "BastionProfilePermissions" {
role = module.BastionRole.name
policy_arn = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
}
module "eks-bastion" {
depends_on = [module.eks] # essential for initializing kubectl in userdata
source = "../../modules/compute/ec2"
additional-tags = {}
ami-id = data.aws_ami.this.id
asso-eip = false
asso-public-ip = true
use-ipv6 = true
data-volumes = {}
ebs-encrypted = true
instance-name = "${var.environment}-eks-bastion-${random_pet.pet.id}"
instance-type = "t4g.micro"
key-name = aws_key_pair.kp.key_name
kms-key-id = ""
root-volume-size = "8"
# security-groups = [module.bastion-sg.id, module.eks.cluster_primary_security_group_id]
security-groups = [module.bastion-sg.id]
subnet-id = module.vpc.public_subnets[0]
instance-profile = module.BastionRole.profile-name[0]
spot-max-price = 0.0116 # t4g.micro
user-data = <<EOF
#!/bin/bash
# eks bastion setup
## Install git
dnf -y install git
## Install kubectl
curl -LO https://storage.googleapis.com/kubernetes-release/release/`curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt`/bin/linux/arm64/kubectl
chmod +x kubectl
mv kubectl /usr/local/bin/
## Install helm
cd /tmp
wget -O/tmp/helm.tgz https://get.helm.sh/helm-v4.1.1-linux-arm64.tar.gz
tar zxf /tmp/helm.tgz
mv /tmp/linux-arm64/helm /usr/local/bin/helm
chmod +x /usr/local/bin/helm
## Install eksctl
cd /tmp
ARCH=arm64
PLATFORM=$(uname -s)_$ARCH
curl -sLO "https://github.com/eksctl-io/eksctl/releases/latest/download/eksctl_$PLATFORM.tar.gz"
tar zxf eksctl_Linux_arm64.tar.gz
mv eksctl /usr/local/bin
chmod +x /usr/local/bin/eksctl
## Create kube config
echo Create kube config...
/usr/bin/aws eks update-kubeconfig --name ${var.eks_cluster_name}-${random_pet.pet.id}
# echo Sleep for 5 minutes and wait for fargate profile to come up
# /usr/bin/sleep 300
#
# ## Grant EKS console access to IAM role: must be executed with cluster creator's identity. cluster role as instance profile won't do it
# echo Patching configmap/aws-auth...
# ROLE=" - rolearn: arn:aws:iam::${data.aws_caller_identity.current.account_id}:role/rackLE\n username: build\n groups:\n - system:masters"
# /usr/local/bin/kubectl --kubeconfig=/root/.kube/config get -n kube-system configmap/aws-auth -o yaml | awk "/mapRoles: \|/{print;print \"$ROLE\";next}1" > /tmp/aws-auth-patch.yml
# /usr/local/bin/kubectl --kubeconfig=/root/.kube/config patch configmap/aws-auth -n kube-system --patch "$(cat /tmp/aws-auth-patch.yml)"
# /usr/local/bin/kubectl --kubeconfig=/root/.kube/config get -n kube-system configmap/aws-auth -o yaml
EOF
}
data "aws_ami" "this" {
most_recent = true
name_regex = "^al2023-ami-2023.*-kernel-6.1-arm64"
owners = ["amazon"]
filter {
name = "virtualization-type"
values = ["hvm"]
}
filter {
name = "architecture"
values = ["arm64"]
}
}
resource "tls_private_key" "sshkey" {
algorithm = "ED25519"
}
resource "aws_key_pair" "kp" {
key_name = "${var.environment}-eks-bastion-${random_pet.pet.id}-key"
public_key = tls_private_key.sshkey.public_key_openssh
}
module "bastion-sg" {
source = "../../modules/compute/security_group"
description = "${var.environment}-eks-bastion-${random_pet.pet.id}-sg"
egress = {
r1 = "-1,-1,-1,0.0.0.0/0,Allow egress"
}
ingress = {
r1 = "tcp,22,22,0.0.0.0/0,ssh"
}
name = "eks-bastion-${random_pet.pet.id}-sg"
vpc-id = module.vpc.vpc_id
}
# my security_group module does not support ipv6_cidr_blocks
resource "aws_security_group_rule" "ipv6_egress" {
security_group_id = module.bastion-sg.id
type = "egress"
from_port = -1
to_port = -1
protocol = "all"
ipv6_cidr_blocks = ["::/0"]
description = "Allow ipv6 egress"
}
@@ -0,0 +1,5 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: aws-load-balancer-controller-sa
namespace: kube-system
@@ -0,0 +1,58 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-web
spec:
replicas: 10
selector:
matchLabels:
app: nginx-web
template:
metadata:
labels:
app: nginx-web
annotations:
# Require dedicated ENI per pod
vpc.cni.amazonaws.com/network-mode: "IPV4"
vpc.cni.amazonaws.com/eniMode: "per-pod" # One ENI per pod
vpc.cni.amazonaws.com/eniPrefixMode: "GLOBAL" # Prefix mode for efficiency
spec:
initContainers:
- name: unique-index
image: busybox:1.35
command: ['sh', '-c']
args:
- |
echo "<h1>Web Server $(POD_NAME)</h1><p>Deployed at $(date)</p>" > /usr/share/nginx/html/index.html
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumeMounts:
- name: nginx-html
mountPath: /usr/share/nginx/html
containers:
- name: nginx
image: nginx:1.27-alpine
ports:
- containerPort: 80
volumeMounts:
- name: nginx-html
mountPath: /usr/share/nginx/html
readOnly: true
volumes:
- name: nginx-html
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: nginx-service
spec:
selector:
app: nginx-web
ports:
- port: 80
targetPort: 80
type: ClusterIP
@@ -0,0 +1,21 @@
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: aln-ingress-nginx-service
annotations:
alb.ingress.kubernetes.io/scheme: internet-facing
alb.ingress.kubernetes.io/ip-address-type: dualstack
alb.ingress.kubernetes.io/healthcheck-path: /
alb.ingress.kubernetes.io/target-type: ip
spec:
ingressClassName: alb
rules:
- http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: nginx-service
port:
number: 80
+297
View File
@@ -0,0 +1,297 @@
/**
* # eks-ipv6-nginxpod
*
* ## Features
* - Use terraform-aws-eks to deploy eks cluster and a nodegroup using spot instances
* - Use Ipv6 for eks cluster
* - Dependent VPC and roles are created
* - use pod identity for EBS abd loadbalancer controller
* - Create a bastion to manage EKS cluster
*
*
*/
data "aws_region" "this" {}
# Eks Vpc on IPv6
resource "random_pet" "pet" {
length = 1
}
locals {
vpc_cidr = "10.18.0.0/16"
# ensure there is room for future expansion
private_net_start = cidrsubnet(local.vpc_cidr, 2, 1)
public_net_start = cidrsubnet(local.vpc_cidr, 2, 2)
}
data "aws_availability_zones" "this" {
state = "available"
}
resource "random_shuffle" "Select2Az" {
input = data.aws_availability_zones.this.names
result_count = 2
}
module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "6.6.0"
name = "lab-vpc"
cidr = local.vpc_cidr
azs = random_shuffle.Select2Az.result
enable_ipv6 = true
public_subnet_assign_ipv6_address_on_creation = true
private_subnet_assign_ipv6_address_on_creation = true
# private_subnet_ipv6_native = true # EKS requires free IPv4 addresses. see README
private_subnets = cidrsubnets(local.private_net_start, 4, 4) # EKS requires free IPv4 addresses. see README
public_subnets = cidrsubnets(local.public_net_start, 8, 8) # 2 AZ required by eks lbc
public_subnet_ipv6_prefixes = [0, 1]
private_subnet_ipv6_prefixes = [10, 11]
public_subnet_tags = {
"kubernetes.io/role/elb" = 1
}
enable_dns_hostnames = true
enable_dns_support = true
# nat gateway and eigw (vpc module creates the dns64 /64 route to NGW)
enable_nat_gateway = true # AWS public endpoints do not support IPv6
single_nat_gateway = true
create_egress_only_igw = true
enable_flow_log = false
create_flow_log_cloudwatch_log_group = false
create_flow_log_cloudwatch_iam_role = false
manage_default_network_acl = false
}
# EKS resources
module "CsiPodIdentity" {
source = "../../modules/security_identity_compliance/iam-role-v2"
description = "EKSCSIDriverRole"
role-name = "AmazonEBSCSIDriverRole"
trusted-entity = jsonencode(
{
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Principal" : {
"Service" : "pods.eks.amazonaws.com"
},
"Action" : [
"sts:AssumeRole",
"sts:TagSession"
]
}
]
}
)
}
# 2 policies are required for the ebs csi to work
resource "aws_iam_role_policy_attachment" "CsiPodIdentity" {
for_each = toset([
"arn:aws:iam::aws:policy/AmazonEC2ReadOnlyAccess",
"arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy"
])
role = module.CsiPodIdentity.name
policy_arn = each.value
}
locals {
userdata = <<EOT
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="//"
--//
Content-Type: application/node.eks.aws
---
apiVersion: node.eks.aws/v1alpha1
kind: NodeConfig
spec:
cluster:
apiServerEndpoint: ${module.eks.cluster_endpoint}
certificateAuthority: ${module.eks.cluster_certificate_authority_data}
cidr: ${module.eks.cluster_service_cidr}
name: ${module.eks.cluster_name}
kubelet:
config:
maxPods: 110
clusterDNS:
- ${replace(module.eks.cluster_service_cidr, "/\\/.*/", "a")}
--//--
EOT
}
resource "aws_launch_template" "node_lt" {
name = "eks135-node-template"
description = "Launch template for eks 1.35"
vpc_security_group_ids = [module.eks.node_security_group_id]
update_default_version = true
# Critical: Set hop limit to 2 for pod IMDS access, required for aws lbc
metadata_options {
http_endpoint = "enabled"
http_tokens = "required" # IMDSv2 required
http_put_response_hop_limit = 2 # Allows pods to reach IMDS
instance_metadata_tags = "enabled"
}
block_device_mappings {
device_name = "/dev/xvda"
ebs {
volume_size = 20
volume_type = "gp3"
}
}
# must not specify this # image_id = data.aws_ami.eks_worker.id
user_data = base64encode(local.userdata)
tag_specifications {
resource_type = "instance"
tags = {
Name = "${module.eks.cluster_name}-worker"
}
}
tag_specifications {
resource_type = "volume"
tags = {
Name = "${module.eks.cluster_name}-worker"
}
}
}
# eks optimized ami
# data "aws_ami" "eks_worker" {
# name_regex = "amazon-eks-node-al2023-x86_64-standard-1\\.35.*"
# owners = ["800184023465"]
# most_recent = true
# }
module "eks" {
source = "terraform-aws-modules/eks/aws"
# version = "20.34.0"
create_iam_role = true
name = "${var.eks_cluster_name}-${random_pet.pet.id}"
kubernetes_version = "1.35"
# enabled_log_types = ["api", "audit", "authenticator", "controllerManager", "scheduler"]
create_security_group = true
security_group_additional_rules = {
bastion_access = {
description = "Allow access from bastion"
protocol = "tcp"
from_port = 443
to_port = 443
type = "ingress"
source_security_group_id = module.bastion-sg.id
}
}
vpc_id = module.vpc.vpc_id
subnet_ids = module.vpc.private_subnets
ip_family = "ipv6"
create_cni_ipv6_iam_policy = true
create_kms_key = true
endpoint_private_access = true
endpoint_public_access = false
enable_irsa = false
create_cloudwatch_log_group = false
create_node_security_group = true
# authentication_mode = "API_AND_CONFIG_MAP" # use access entries and leave this to default
upgrade_policy = {
support_type = "STANDARD"
}
addons = {
coredns = {}
eks-pod-identity-agent = {
before_compute = true
}
kube-proxy = {}
aws-ebs-csi-driver = {
pod_identity_association = [{
role_arn = module.CsiPodIdentity.role-arn
service_account = "ebs-csi-controller-sa"
}]
}
vpc-cni = {
before_compute = true
configuration_values = jsonencode({
env = {
ENABLE_POD_ENI = "true",
POD_SECURITY_GROUP_ENFORCING_MODE = "strict",
# in prefix mode, ipv6 will have /80 and ipv4 will have /28
ENABLE_PREFIX_DELEGATION = "true"
},
init = {
env = {
DISABLE_TCP_EARLY_DEMUX = "true"
}
}
})
}
}
node_iam_role_additional_policies = {
SsmManaged = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
}
eks_managed_node_groups = {
EksNodeGroup1 = {
# required for setting hop limit to 2 for pod IMDS access, required for aws lbc
create_launch_template = false
use_custom_launch_template = true
launch_template_id = aws_launch_template.node_lt.id
launch_template_version = aws_launch_template.node_lt.latest_version
min_size = 2
max_size = 2
desired_size = 2
instance_types = ["t3.large"]
capacity_type = "SPOT"
subnet_ids = module.vpc.private_subnets
}
}
access_entries = {
ClusterAdminRole = {
principal_arn = "arn:aws:iam::040216112220:role/rackLE"
policy_associations = {
ClusterAdminPolicy = {
policy_arn = "arn:aws:eks::aws:cluster-access-policy/AmazonEKSClusterAdminPolicy"
access_scope = {
type = "cluster"
}
}
}
}
BastionRole = {
principal_arn = module.BastionRole.role-arn
policy_associations = {
ClusterAdminPolicy = {
policy_arn = "arn:aws:eks::aws:cluster-access-policy/AmazonEKSClusterAdminPolicy"
access_scope = {
type = "cluster"
}
}
}
}
}
}
# Allow http traffic from ALB to eks node
resource "aws_security_group_rule" "eks_node_alb_ingress" {
type = "ingress"
from_port = 80
to_port = 80
protocol = "tcp"
security_group_id = module.eks.node_security_group_id
ipv6_cidr_blocks = [module.vpc.vpc_ipv6_cidr_block]
description = "ALB to nginx pods port 80"
}
+14
View File
@@ -0,0 +1,14 @@
# # https://github.com/terraform-aws-modules/terraform-aws-eks-pod-identity
module "aws_lb_controller_pod_identity" {
source = "terraform-aws-modules/eks-pod-identity/aws"
name = "aws-loadbalancer-controller"
attach_aws_lb_controller_policy = true
associations = {
this = {
cluster_name = module.eks.cluster_name
namespace = "kube-system"
service_account = "aws-load-balancer-controller-sa"
}
}
}
+35
View File
@@ -0,0 +1,35 @@
provider "aws" {
region = var.aws-region
default_tags {
tags = {
ServiceProvider = "RackspaceTechnology"
Environment = var.environment
Project = var.project
Application = var.application
TerraformDir = join("/", reverse(slice(reverse(split("/", path.cwd)), 0, 2)))
}
}
}
terraform {
required_version = "~> 1.13.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 6.0"
}
}
/*
backend "s3" {
bucket = "whk1-bea-sys-ss-prd-tfgen2-state1"
key = "terraform_state/LandingZone/master-payer/sso.tfstate"
region = "ap-east-1"
dynamodb_table = "whk1-bea-sys-ss-prd-tfgen2-lock"
encrypt = true
}
*/
}
data aws_caller_identity current {}
+12
View File
@@ -0,0 +1,12 @@
variable "aws-region" {}
variable "aws-region-short" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
variable "eks_master_user_arn" {}
variable "eks_cluster_name" {
type = string
default = "xpk-eks01"
}
+15
View File
@@ -0,0 +1,15 @@
# eks-managed-nodegroup
Create EKS cluster using managed nodegroup. Then performed EKS control plane upgrades.
## Versions and upgrade notes
Based on 1-4 t3.medium worker node with no app pods
| eks-ver | coredns | kube-proxy | vpc-cni | AMI-version | upgrade notes |
|---------|--------------------|---------------------|--------------------|------------------|---------------------------------------------------------------------|
| 1.25 | v1.9.3-eksbuild.10 | v1.25.16-eksbuild.1 | v1.15.4-eksbuild.1 | 1.25.15-20231201 | N/A |
| 1.26 | v1.9.3-eksbuild.10 | v1.26.11-eksbuild.1 | v1.15.4-eksbuild.1 | 1.26.10-20231201 | from 1.25, set cluster_version = "1.26". nodes are recreated. 23min |
| 1.27 | v1.10.1-eksbuild.6 | v1.27.6-eksbuild.2 | v1.15.4-eksbuild.1 | 1.27.7-20231201 | from 1.26, set cluster_version = "1.27". nodes are recreated. 16min |
| 1.28 | v1.10.1-eksbuild.6 | v1.28.4-eksbuild.1 | v1.15.4-eksbuild.1 | 1.28.3-20231201 | from 1.27, set cluster_version = "1.28". nodes are recreated. 26min |
## References
https://repost.aws/knowledge-center/eks-plan-upgrade-cluster
+78
View File
@@ -0,0 +1,78 @@
module "bastion" {
source = "terraform-aws-modules/ec2-instance/aws"
version = "5.5.0"
name = "lab-ken2026-eks-bastion"
instance_type = "t3.micro"
ami = data.aws_ami.this.id
ignore_ami_changes = true
subnet_id = var.subnet_ids[0]
vpc_security_group_ids = [module.sg.id, module.eks.cluster_primary_security_group_id]
create_iam_instance_profile = true
iam_role_description = "IAM role for EC2 instance"
iam_role_policies = {
SSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
CloudwatchAgent = "arn:aws:iam::aws:policy/CloudWatchAgentServerPolicy"
Admin = "arn:aws:iam::aws:policy/AdministratorAccess"
}
key_name = "kf-key"
ebs_optimized = true
root_block_device = [
{
encrypted = true
volume_type = "gp3"
volume_size = 10
},
]
volume_tags = data.aws_default_tags.this.tags
# IMDSv2 requirement
metadata_options = {
http_endpoint = "enabled"
http_tokens = "required"
http_put_response_hop_limit = 2
}
user_data = <<EOF
#!/bin/bash
curl -LO https://storage.googleapis.com/kubernetes-release/release/`curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt`/bin/linux/amd64/kubectl
chmod 755 kubectl
mv kubectl /usr/local/bin/
EOF
}
module "sg" {
source = "../../modules/compute/security_group"
description = "Security group for web server"
egress = {
r1 = "tcp,0,65535,0.0.0.0/0,Allow outbound tcp traffic"
r2 = "udp,0,65535,0.0.0.0/0,Allow outbound udp traffic"
r3 = "icmp,0,-1,0.0.0.0/0,Allow icmp echo reply"
}
ingress = {
r1 = "icmp,8,-1,0.0.0.0/0,Allow ICMP traffic"
}
name = "lab-ken2026-eks-bastion-sg"
vpc-id = var.vpc_id
}
data "aws_default_tags" "this" {}
data "aws_ami" "this" {
most_recent = true
name_regex = "al2023-ami-202.*"
filter {
name = "virtualization-type"
values = ["hvm"]
}
filter {
name = "root-device-type"
values = ["ebs"]
}
filter {
name = "architecture"
values = ["x86_64"]
}
owners = ["910595266909"] # AWS
}
+3
View File
@@ -0,0 +1,3 @@
locals {
resource-prefix = "${var.environment}-${var.aws-region-short}-${var.customer-name}-${var.project}"
}
+189
View File
@@ -0,0 +1,189 @@
provider "kubernetes" {
host = module.eks.cluster_endpoint
cluster_ca_certificate = base64decode(module.eks.cluster_certificate_authority_data)
exec {
api_version = "client.authentication.k8s.io/v1beta1"
command = "aws"
# This requires the awscli to be installed locally where Terraform is executed
args = ["eks", "get-token", "--cluster-name", module.eks.cluster_name]
}
}
module "eks" {
source = "terraform-aws-modules/eks/aws"
version = "19.21.0"
cluster_name = "lab-ken2026-eks01"
cluster_endpoint_public_access = true
cluster_version = "1.27"
cluster_addons = {
coredns = {
preserve = true
most_recent = true
timeouts = {
create = "25m"
delete = "10m"
}
}
kube-proxy = {
most_recent = true
}
vpc-cni = {
most_recent = true
}
}
create_kms_key = false
cluster_encryption_config = {
resources = ["secrets"]
provider_key_arn = module.kms.key_arn
}
iam_role_additional_policies = {
additional = aws_iam_policy.additional.arn
}
vpc_id = var.vpc_id
subnet_ids = var.subnet_ids
control_plane_subnet_ids = var.control_plane_subnet_ids
# Extend cluster security group rules
cluster_security_group_additional_rules = {
ingress_nodes_ephemeral_ports_tcp = {
description = "Nodes on ephemeral ports"
protocol = "tcp"
from_port = 1025
to_port = 65535
type = "ingress"
source_node_security_group = true
}
# Test: https://github.com/terraform-aws-modules/terraform-aws-eks/pull/2319
ingress_source_security_group_id = {
description = "Ingress from another computed security group"
protocol = "tcp"
from_port = 22
to_port = 22
type = "ingress"
source_security_group_id = aws_security_group.additional.id
}
}
# requires terraform be ran inside VPC
# manage_aws_auth_configmap = true
#
# aws_auth_roles = [
# {
# rolearn = module.eks_managed_node_group.iam_role_arn
# username = "system:node:{{EC2PrivateDNSName}}"
# groups = [
# "system:bootstrappers",
# "system:nodes",
# ]
# },
# {
# rolearn = "arn:aws:iam::040216112220:role/rackLE"
# username = "rackLE"
# groups = ["system:masters"]
# }
# ]
#
# aws_auth_users = [
# {
# userarn = var.eks_master_user_arn
# username = "eksmaster"
# groups = ["system:masters"]
# }
# ]
#
# aws_auth_accounts = [
# data.aws_caller_identity.current.account_id
# ]
}
module "eks_managed_node_group" {
source = "terraform-aws-modules/eks/aws//modules/eks-managed-node-group"
version = "19.21.0"
name = "eks-mng"
cluster_name = module.eks.cluster_name
cluster_version = module.eks.cluster_version
subnet_ids = var.subnet_ids
cluster_primary_security_group_id = module.eks.cluster_primary_security_group_id
vpc_security_group_ids = [
module.eks.cluster_security_group_id,
aws_security_group.additional.id
]
ami_type = "AL2_x86_64"
instance_types = ["t3.medium"]
iam_role_additional_policies = {
SsmInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
}
# this will get added to what AWS provides
bootstrap_extra_args = <<-EOT
# extra args added
[settings.kernel]
lockdown = "integrity"
[settings.kubernetes.node-labels]
"label1" = "foo"
"label2" = "bar"
EOT
min_size = 0
desired_size = 1
max_size = 2
}
module "kms" {
source = "terraform-aws-modules/kms/aws"
version = "~> 1.5"
aliases = ["eks/${local.resource-prefix}"]
description = "${local.resource-prefix} cluster encryption key"
enable_default_policy = true
key_owners = [data.aws_caller_identity.current.arn]
}
resource "aws_security_group" "additional" {
name_prefix = "${local.resource-prefix}-sg"
vpc_id = var.vpc_id
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = [
"10.0.0.0/8",
"172.16.0.0/12",
"192.168.0.0/16",
]
}
}
resource "aws_iam_policy" "additional" {
name = "${local.resource-prefix}-policy"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"ec2:Describe*",
]
Effect = "Allow"
Resource = "*"
},
]
})
}
data "aws_caller_identity" "current" {}
+30
View File
@@ -0,0 +1,30 @@
provider "aws" {
region = var.aws-region
default_tags {
tags = {
ServiceProvider = "RackspaceTechnology"
Environment = var.environment
Project = var.project
Application = var.application
TerraformMode = "managed"
TerraformDir = "${reverse(split("/", path.cwd))[1]}/${reverse(split("/", path.cwd))[0]}"
}
}
}
terraform {
required_version = ">= 1.3.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0"
}
}
backend "s3" {
bucket = "lab-ken2026-tf-state"
key = "experimental/eks-upgrade-test.tfstate"
region = "ap-east-1"
encrypt = true
}
}
+11
View File
@@ -0,0 +1,11 @@
aws-region = "ap-east-1"
aws-region-short = "ape1"
customer-name = "ken2026"
environment = "lab"
project = "eks-pub-module-test"
application = "terraform"
vpc_id = "vpc-01a10b033169f89a8"
subnet_ids = ["subnet-0927ba1b06ccfe6c5", "subnet-08dec6787782ee087"]
control_plane_subnet_ids = ["subnet-0927ba1b06ccfe6c5", "subnet-08dec6787782ee087"]
eks_master_user_arn = "arn:aws:iam::040216112220:role/rackLE"
+11
View File
@@ -0,0 +1,11 @@
variable "aws-region" {}
variable "aws-region-short" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
variable vpc_id {}
variable subnet_ids {}
variable control_plane_subnet_ids {}
variable eks_master_user_arn {}
+227
View File
@@ -0,0 +1,227 @@
locals {
name = "${var.environment}-${var.customer-name}"
}
module "emr" {
source = "terraform-aws-modules/emr/aws"
version = "1.2.0"
name = "${local.name}-emr"
release_label = "emr-7.0.0"
security_configuration_name = aws_emr_security_configuration.security_config.name
applications = ["hbase", "phoenix"]
auto_termination_policy = {
idle_timeout = 3600
}
bootstrap_action = {
}
configurations_json = jsonencode([
{
Classification : "hbase-env",
Configurations : [
{
"Classification" : "export",
"Properties" : {
"HBASE_MASTER_OPTS" : "-Xmx4g",
"HBASE_REGIONSERVER_OPTS" : "-Xmx8g"
}
}
],
Properties : {}
},
{
Classification : "hbase-site",
Properties : {
"hbase.regionserver.handler.count" : "300"
}
}
])
master_instance_fleet = {
name = "master-fleet"
target_on_demand_capacity = 1
instance_type_configs = [
{
instance_type = "c6g.xlarge"
ebs_config = {
size = 20
type = "gp3"
volumes_per_instance = 1
}
}
]
}
core_instance_fleet = {
name = "core-fleet"
target_on_demand_capacity = 0
target_spot_capacity = 1
instance_type_configs = [
{
bid_price_as_percentage_of_on_demand_price = 70
instance_type = "c6g.xlarge"
weighted_capacity = 1
ebs_config = {
size = 20
type = "gp3"
volumes_per_instance = 1
}
},
{
bid_price_as_percentage_of_on_demand_price = 70
instance_type = "m6g.xlarge"
weighted_capacity = 1
ebs_config = {
size = 20
type = "gp3"
volumes_per_instance = 1
}
}
]
launch_specifications = {
spot_specification = {
allocation_strategy = "capacity-optimized"
block_duration_minutes = 0
timeout_action = "SWITCH_TO_ON_DEMAND"
timeout_duration_minutes = 5
}
}
}
ebs_root_volume_size = 20
# Subnets should be tagged with
# { "for-use-with-amazon-emr-managed-policies" = true }
ec2_attributes = {
subnet_ids = ["subnet-08dec6787782ee087", "subnet-0551e96ffd016192a"]
key_name = "kf-key"
}
vpc_id = "vpc-01a10b033169f89a8"
# Required for creating public cluster
is_private_cluster = false
keep_job_flow_alive_when_no_steps = true
list_steps_states = ["PENDING", "RUNNING", "CANCEL_PENDING", "CANCELLED", "FAILED", "INTERRUPTED", "COMPLETED"]
log_uri = "s3n://${module.s3_bucket.s3_bucket_id}/"
scale_down_behavior = "TERMINATE_AT_TASK_COMPLETION"
step_concurrency_level = 3
termination_protection = false
visible_to_all_users = true
service_iam_role_policies = {
AmazonEMRServicePolicy_v2 = "arn:aws:iam::aws:policy/service-role/AmazonEMRServicePolicy_v2"
PowerUser = "arn:aws:iam::aws:policy/PowerUserAccess"
}
iam_instance_profile_policies = {
AmazonElasticMapReduceforEC2Role = "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role"
PowerUser = "arn:aws:iam::aws:policy/PowerUserAccess"
}
# Use managed scaling policy to refill spot instances
managed_scaling_policy = {
unit_type = "InstanceFleetUnits"
minimum_capacity_units = 1
maximum_capacity_units = 4
maximum_ondemand_capacity_units = 0
maximum_core_capacity_units = 4
}
}
resource "random_id" "this" {
byte_length = 2
}
module "s3_bucket" {
source = "terraform-aws-modules/s3-bucket/aws"
version = "~> 3.0"
bucket = "${local.name}-emrlogs-${random_id.this.dec}"
# Allow deletion of non-empty bucket
# Example usage only - not recommended for production
force_destroy = true
attach_deny_insecure_transport_policy = true
attach_require_latest_tls_policy = true
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
server_side_encryption_configuration = {
rule = {
apply_server_side_encryption_by_default = {
sse_algorithm = "AES256"
}
}
}
}
resource "aws_kms_key" "ebs" {
description = "KMS key for EBS volumes"
deletion_window_in_days = 7
}
resource "aws_emr_security_configuration" "security_config" {
name = "${local.name}-emr-security-config"
configuration = jsonencode(
{
EncryptionConfiguration = {
AtRestEncryptionConfiguration = {
LocalDiskEncryptionConfiguration = {
AwsKmsKey = aws_kms_key.ebs.arn
EnableEbsEncryption = true
EncryptionKeyProviderType = "AwsKms"
}
S3EncryptionConfiguration = {
EncryptionMode = "SSE-S3"
}
}
EnableAtRestEncryption = true
EnableInTransitEncryption = false
}
InstanceMetadataServiceConfiguration = {
HttpPutResponseHopLimit = 1
MinimumInstanceMetadataServiceVersion = 2
}
}
)
}
# Tag EMR master and core instances
# Need to run this layer twice to set instance tags
# Adding depends_on will results in dependency loop
data "aws_instances" "master_instances" {
# depends_on = [module.emr]
instance_tags = {
"aws:elasticmapreduce:instance-group-role" = "MASTER"
}
instance_state_names = ["running"]
}
data "aws_instances" "core_instances" {
# depends_on = [module.emr]
instance_tags = {
"aws:elasticmapreduce:instance-group-role" = "CORE"
}
instance_state_names = ["running"]
}
resource "aws_ec2_tag" "tag-emr-core-instances" {
# depends_on = [data.aws_instances.core_instances]
count = length(data.aws_instances.core_instances.ids)
resource_id = sort(data.aws_instances.core_instances.ids)[count.index]
key = "Name"
value = "${local.name}-emr-core-${count.index + 1}"
}
resource "aws_ec2_tag" "tag-emr-master-instances" {
# depends_on = [data.aws_instances.master_instances]
count = length(data.aws_instances.master_instances.ids)
resource_id = sort(data.aws_instances.master_instances.ids)[count.index]
key = "Name"
value = "${local.name}-emr-master-${count.index + 1}"
}
+7
View File
@@ -0,0 +1,7 @@
output "core_instance_ids" {
value = data.aws_instances.core_instances.ids
}
output "master_instance_ids" {
value = data.aws_instances.master_instances.ids
}
+22
View File
@@ -0,0 +1,22 @@
provider "aws" {
region = var.aws-region
default_tags {
tags = {
Environment = var.environment
Project = var.project
Application = var.application
TerraformMode = "managed"
TerraformDir = "${reverse(split("/", path.cwd))[1]}/${reverse(split("/", path.cwd))[0]}"
}
}
}
terraform {
required_version = ">= 1.3.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0.0"
}
}
}
+6
View File
@@ -0,0 +1,6 @@
aws-region = "ap-east-1"
# aws-region-short = "ape1"
customer-name = "ken2026"
environment = "lab"
project = "iac"
application = "emr"
+9
View File
@@ -0,0 +1,9 @@
variable "aws-region" {}
# variable "aws-region-short" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
locals {
resource-prefix = "${var.environment}-${substr(var.aws-region, 0, 2)}-${var.customer-name}-${var.project}"
}
+24
View File
@@ -0,0 +1,24 @@
/*
Note that attribute of ephemeral resources can only be accessed by write-only parameters
such as secret_string_wo
*/
ephemeral "random_password" "example" {
length = 16
special = true
}
resource "aws_secretsmanager_secret" "example" {
name = "example-secret"
description = "example secret created from ephemeral resource"
}
resource "aws_secretsmanager_secret_version" "example" {
secret_id = aws_secretsmanager_secret.example.id
secret_string_wo = ephemeral.random_password.example.result
secret_string_wo_version = 1
}
ephemeral "aws_secretsmanager_secret_version" "example" {
secret_id = aws_secretsmanager_secret_version.example.secret_id
}
+13
View File
@@ -0,0 +1,13 @@
terraform {
required_version = ">= 1.3.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0.0"
}
random = {
source = "hashicorp/random"
version = ">= 3.7.1"
}
}
}
+3
View File
@@ -0,0 +1,3 @@
#!/bin/bash
RESULTS=$(aws rds describe-db-instances --query 'DBInstances[*].DBInstanceIdentifier' --output text | xargs)
jq -n --arg result "$RESULTS" '{"result":$result}'
+7
View File
@@ -0,0 +1,7 @@
data external rds-instances {
program = ["bash", "./list-rds-instances.sh"]
}
output rds-instances {
value = split(" ", data.external.rds-instances.result.result)
}
+67
View File
@@ -0,0 +1,67 @@
module "iam-group" {
source = "../../modules/security_identity_compliance/iam-group"
iam-group-name = "ViewOnlyUsers001"
iam-group-policy = ""
iam-group-policy-name = ""
managed-policy-arns = ["arn:aws:iam::aws:policy/job-function/ViewOnlyAccess"]
}
module "iam-group2" {
source = "../../modules/security_identity_compliance/iam-group"
iam-group-name = "ViewOnlyAndS3Admin001"
iam-group-policy = data.aws_iam_policy_document.user-policy.json
iam-group-policy-name = "S3AdminPermissions"
managed-policy-arns = ["arn:aws:iam::aws:policy/job-function/ViewOnlyAccess"]
}
module "iam-user1" {
source = "../../modules/security_identity_compliance/iam-user"
iam-user-name = "JohnNotInGroup"
create-access-key = true
create-password = true
managed-policy-arns = ["arn:aws:iam::aws:policy/job-function/ViewOnlyAccess"]
}
module "iam-user2" {
source = "../../modules/security_identity_compliance/iam-user"
iam-user-name = "PeterInGroup"
iam-user-policy = data.aws_iam_policy_document.user-policy.json
iam-user-policy-name = "S3AdminPermissions"
create-access-key = false
create-password = false
managed-policy-arns = ["arn:aws:iam::aws:policy/job-function/ViewOnlyAccess"]
add-to-groups = [module.iam-group.iam-group-name]
}
data "aws_iam_policy_document" "user-policy" {
statement {
sid = "s3admin"
actions = [
"s3:*"
]
effect = "Allow"
resources = ["*"]
}
}
output "iam-user1-arn" {
value = module.iam-user1.iam-user-arn
}
output "iam-user2-arn" {
value = module.iam-user2.iam-user-arn
}
output "iam-user1-access-key" {
value = module.iam-user1.iam-user-access-key
}
output iam-user1-secret-location {
value = module.iam-user1.iam-user-secret-arn
}
+8
View File
@@ -0,0 +1,8 @@
aws-region = "ap-southeast-1"
customer-name = "ken2026"
environment = "dev"
project = "iac"
application = "terraform"
costcenter = "none"
DynamicAddressGroup = ""
owner = "Rackspace"
+21
View File
@@ -0,0 +1,21 @@
variable "aws-region" {}
variable "customer-name" {}
variable "environment" {}
variable "project" {}
variable "application" {}
variable "owner" {}
variable "costcenter" {}
variable "DynamicAddressGroup" {}
locals {
default-tags = {
ServiceProvider = "RackspaceTechnology"
Environment = var.environment
Project = var.project
Application = var.application
TerraformMode = "managed"
Owner = var.owner
TerraformDir = join("/", reverse(slice(reverse(split("/", path.cwd)), 0, 2)))
}
resource-prefix = "${var.environment}-substr(${var.aws-region},0,2)-${var.customer-name}-${var.project}"
}
+47
View File
@@ -0,0 +1,47 @@
<!-- This readme file is generated with terraform-docs -->
## Prepare lambda-layer1 with the following command.
The path is hard-required by AWS. See https://docs.aws.amazon.com/lambda/latest/dg/packaging-layers.html
```bash
pip install requests -t python/lib/python3.12/site-packages/
```
## Requirements
| Name | Version |
|------|---------|
| terraform | >= 1.3.0 |
| aws | >= 4.40 |
## Providers
| Name | Version |
|------|---------|
| archive | 2.5.0 |
| aws | 5.64.0 |
## Modules
No modules.
## Resources
| Name | Type |
|------|------|
| [aws_iam_role.lambda-role1](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource |
| [aws_lambda_function.myFunction](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_function) | resource |
| [aws_lambda_layer_version.libraries](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_layer_version) | resource |
| [archive_file.function1](https://registry.terraform.io/providers/hashicorp/archive/latest/docs/data-sources/file) | data source |
| [archive_file.layer1](https://registry.terraform.io/providers/hashicorp/archive/latest/docs/data-sources/file) | data source |
## Inputs
No inputs.
## Outputs
No outputs.
---
## Authorship
This module was developed by xpk.
+10
View File
@@ -0,0 +1,10 @@
# reference: https://aws.amazon.com/premiumsupport/knowledge-center/start-stop-lambda-eventbridge/
import requests
def lambda_handler(event, context):
r = requests.get('https://ipinfo.io/')
return {
"HttpResponseCode": r.status_code
}
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import re
import sys
from charset_normalizer.cli import cli_detect
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(cli_detect())
@@ -0,0 +1,20 @@
This package contains a modified version of ca-bundle.crt:
ca-bundle.crt -- Bundle of CA Root Certificates
This is a bundle of X.509 certificates of public Certificate Authorities
(CA). These were automatically extracted from Mozilla's root certificates
file (certdata.txt). This file can be found in the mozilla source tree:
https://hg.mozilla.org/mozilla-central/file/tip/security/nss/lib/ckfw/builtins/certdata.txt
It contains the certificates in PEM format and therefore
can be directly used with curl / libcurl / php_curl, or with
an Apache+mod_ssl webserver for SSL client authentication.
Just configure this file as the SSLCACertificateFile.#
***** BEGIN LICENSE BLOCK *****
This Source Code Form is subject to the terms of the Mozilla Public License,
v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain
one at http://mozilla.org/MPL/2.0/.
***** END LICENSE BLOCK *****
@(#) $RCSfile: certdata.txt,v $ $Revision: 1.80 $ $Date: 2011/11/03 15:11:58 $
@@ -0,0 +1,67 @@
Metadata-Version: 2.1
Name: certifi
Version: 2024.7.4
Summary: Python package for providing Mozilla's CA Bundle.
Home-page: https://github.com/certifi/python-certifi
Author: Kenneth Reitz
Author-email: me@kennethreitz.com
License: MPL-2.0
Project-URL: Source, https://github.com/certifi/python-certifi
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)
Classifier: Natural Language :: English
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.6
License-File: LICENSE
Certifi: Python SSL Certificates
================================
Certifi provides Mozilla's carefully curated collection of Root Certificates for
validating the trustworthiness of SSL certificates while verifying the identity
of TLS hosts. It has been extracted from the `Requests`_ project.
Installation
------------
``certifi`` is available on PyPI. Simply install it with ``pip``::
$ pip install certifi
Usage
-----
To reference the installed certificate authority (CA) bundle, you can use the
built-in function::
>>> import certifi
>>> certifi.where()
'/usr/local/lib/python3.7/site-packages/certifi/cacert.pem'
Or from the command line::
$ python -m certifi
/usr/local/lib/python3.7/site-packages/certifi/cacert.pem
Enjoy!
.. _`Requests`: https://requests.readthedocs.io/en/master/
Addition/Removal of Certificates
--------------------------------
Certifi does not support any addition/removal or other modification of the
CA trust store content. This project is intended to provide a reliable and
highly portable root of trust to python deployments. Look to upstream projects
for methods to use alternate trust.
@@ -0,0 +1,14 @@
certifi-2024.7.4.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
certifi-2024.7.4.dist-info/LICENSE,sha256=6TcW2mucDVpKHfYP5pWzcPBpVgPSH2-D8FPkLPwQyvc,989
certifi-2024.7.4.dist-info/METADATA,sha256=L9_EuPoQQvHFzxu03_ctaEZxhEty7inz569jGWjlLGo,2221
certifi-2024.7.4.dist-info/RECORD,,
certifi-2024.7.4.dist-info/WHEEL,sha256=y4mX-SOX4fYIkonsAGA5N0Oy-8_gI4FXw5HNI1xqvWg,91
certifi-2024.7.4.dist-info/top_level.txt,sha256=KMu4vUCfsjLrkPbSNdgdekS-pVJzBAJFO__nI8NF6-U,8
certifi/__init__.py,sha256=LHXz7E80YJYBzCBv6ZyidQ5-ciYSkSebpY2E5OM0l7o,94
certifi/__main__.py,sha256=xBBoj905TUWBLRGANOcf7oi6e-3dMP4cEoG9OyMs11g,243
certifi/__pycache__/__init__.cpython-312.pyc,,
certifi/__pycache__/__main__.cpython-312.pyc,,
certifi/__pycache__/core.cpython-312.pyc,,
certifi/cacert.pem,sha256=SIupYGAr8HzGP073rsEIaS_sQYIPwzKKjj894DgUmu4,291528
certifi/core.py,sha256=qRDDFyXVJwTB_EmoGppaXU_R9qCZvhl-EzxPMuV3nTA,4426
certifi/py.typed,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
@@ -0,0 +1,5 @@
Wheel-Version: 1.0
Generator: setuptools (70.2.0)
Root-Is-Purelib: true
Tag: py3-none-any
@@ -0,0 +1,4 @@
from .core import contents, where
__all__ = ["contents", "where"]
__version__ = "2024.07.04"
@@ -0,0 +1,12 @@
import argparse
from certifi import contents, where
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--contents", action="store_true")
args = parser.parse_args()
if args.contents:
print(contents())
else:
print(where())
@@ -0,0 +1,114 @@
"""
certifi.py
~~~~~~~~~~
This module returns the installation location of cacert.pem or its contents.
"""
import sys
import atexit
def exit_cacert_ctx() -> None:
_CACERT_CTX.__exit__(None, None, None) # type: ignore[union-attr]
if sys.version_info >= (3, 11):
from importlib.resources import as_file, files
_CACERT_CTX = None
_CACERT_PATH = None
def where() -> str:
# This is slightly terrible, but we want to delay extracting the file
# in cases where we're inside of a zipimport situation until someone
# actually calls where(), but we don't want to re-extract the file
# on every call of where(), so we'll do it once then store it in a
# global variable.
global _CACERT_CTX
global _CACERT_PATH
if _CACERT_PATH is None:
# This is slightly janky, the importlib.resources API wants you to
# manage the cleanup of this file, so it doesn't actually return a
# path, it returns a context manager that will give you the path
# when you enter it and will do any cleanup when you leave it. In
# the common case of not needing a temporary file, it will just
# return the file system location and the __exit__() is a no-op.
#
# We also have to hold onto the actual context manager, because
# it will do the cleanup whenever it gets garbage collected, so
# we will also store that at the global level as well.
_CACERT_CTX = as_file(files("certifi").joinpath("cacert.pem"))
_CACERT_PATH = str(_CACERT_CTX.__enter__())
atexit.register(exit_cacert_ctx)
return _CACERT_PATH
def contents() -> str:
return files("certifi").joinpath("cacert.pem").read_text(encoding="ascii")
elif sys.version_info >= (3, 7):
from importlib.resources import path as get_path, read_text
_CACERT_CTX = None
_CACERT_PATH = None
def where() -> str:
# This is slightly terrible, but we want to delay extracting the
# file in cases where we're inside of a zipimport situation until
# someone actually calls where(), but we don't want to re-extract
# the file on every call of where(), so we'll do it once then store
# it in a global variable.
global _CACERT_CTX
global _CACERT_PATH
if _CACERT_PATH is None:
# This is slightly janky, the importlib.resources API wants you
# to manage the cleanup of this file, so it doesn't actually
# return a path, it returns a context manager that will give
# you the path when you enter it and will do any cleanup when
# you leave it. In the common case of not needing a temporary
# file, it will just return the file system location and the
# __exit__() is a no-op.
#
# We also have to hold onto the actual context manager, because
# it will do the cleanup whenever it gets garbage collected, so
# we will also store that at the global level as well.
_CACERT_CTX = get_path("certifi", "cacert.pem")
_CACERT_PATH = str(_CACERT_CTX.__enter__())
atexit.register(exit_cacert_ctx)
return _CACERT_PATH
def contents() -> str:
return read_text("certifi", "cacert.pem", encoding="ascii")
else:
import os
import types
from typing import Union
Package = Union[types.ModuleType, str]
Resource = Union[str, "os.PathLike"]
# This fallback will work for Python versions prior to 3.7 that lack the
# importlib.resources module but relies on the existing `where` function
# so won't address issues with environments like PyOxidizer that don't set
# __file__ on modules.
def read_text(
package: Package,
resource: Resource,
encoding: str = 'utf-8',
errors: str = 'strict'
) -> str:
with open(where(), encoding=encoding) as data:
return data.read()
# If we don't have importlib.resources, then we will just do the old logic
# of assuming we're on the filesystem and munge the path directly.
def where() -> str:
f = os.path.dirname(__file__)
return os.path.join(f, "cacert.pem")
def contents() -> str:
return read_text("certifi", "cacert.pem", encoding="ascii")
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2019 TAHRI Ahmed R.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,683 @@
Metadata-Version: 2.1
Name: charset-normalizer
Version: 3.3.2
Summary: The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet.
Home-page: https://github.com/Ousret/charset_normalizer
Author: Ahmed TAHRI
Author-email: ahmed.tahri@cloudnursery.dev
License: MIT
Project-URL: Bug Reports, https://github.com/Ousret/charset_normalizer/issues
Project-URL: Documentation, https://charset-normalizer.readthedocs.io/en/latest
Keywords: encoding,charset,charset-detector,detector,normalization,unicode,chardet,detect
Classifier: Development Status :: 5 - Production/Stable
Classifier: License :: OSI Approved :: MIT License
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Text Processing :: Linguistic
Classifier: Topic :: Utilities
Classifier: Typing :: Typed
Requires-Python: >=3.7.0
Description-Content-Type: text/markdown
License-File: LICENSE
Provides-Extra: unicode_backport
<h1 align="center">Charset Detection, for Everyone 👋</h1>
<p align="center">
<sup>The Real First Universal Charset Detector</sup><br>
<a href="https://pypi.org/project/charset-normalizer">
<img src="https://img.shields.io/pypi/pyversions/charset_normalizer.svg?orange=blue" />
</a>
<a href="https://pepy.tech/project/charset-normalizer/">
<img alt="Download Count Total" src="https://static.pepy.tech/badge/charset-normalizer/month" />
</a>
<a href="https://bestpractices.coreinfrastructure.org/projects/7297">
<img src="https://bestpractices.coreinfrastructure.org/projects/7297/badge">
</a>
</p>
<p align="center">
<sup><i>Featured Packages</i></sup><br>
<a href="https://github.com/jawah/niquests">
<img alt="Static Badge" src="https://img.shields.io/badge/Niquests-HTTP_1.1%2C%202%2C_and_3_Client-cyan">
</a>
<a href="https://github.com/jawah/wassima">
<img alt="Static Badge" src="https://img.shields.io/badge/Wassima-Certifi_Killer-cyan">
</a>
</p>
<p align="center">
<sup><i>In other language (unofficial port - by the community)</i></sup><br>
<a href="https://github.com/nickspring/charset-normalizer-rs">
<img alt="Static Badge" src="https://img.shields.io/badge/Rust-red">
</a>
</p>
> A library that helps you read text from an unknown charset encoding.<br /> Motivated by `chardet`,
> I'm trying to resolve the issue by taking a new approach.
> All IANA character set names for which the Python core library provides codecs are supported.
<p align="center">
>>>>> <a href="https://charsetnormalizerweb.ousret.now.sh" target="_blank">👉 Try Me Online Now, Then Adopt Me 👈 </a> <<<<<
</p>
This project offers you an alternative to **Universal Charset Encoding Detector**, also known as **Chardet**.
| Feature | [Chardet](https://github.com/chardet/chardet) | Charset Normalizer | [cChardet](https://github.com/PyYoshi/cChardet) |
|--------------------------------------------------|:---------------------------------------------:|:--------------------------------------------------------------------------------------------------:|:-----------------------------------------------:|
| `Fast` | ❌ | ✅ | ✅ |
| `Universal**` | ❌ | ✅ | ❌ |
| `Reliable` **without** distinguishable standards | ❌ | ✅ | ✅ |
| `Reliable` **with** distinguishable standards | ✅ | ✅ | ✅ |
| `License` | LGPL-2.1<br>_restrictive_ | MIT | MPL-1.1<br>_restrictive_ |
| `Native Python` | ✅ | ✅ | ❌ |
| `Detect spoken language` | ❌ | ✅ | N/A |
| `UnicodeDecodeError Safety` | ❌ | ✅ | ❌ |
| `Whl Size (min)` | 193.6 kB | 42 kB | ~200 kB |
| `Supported Encoding` | 33 | 🎉 [99](https://charset-normalizer.readthedocs.io/en/latest/user/support.html#supported-encodings) | 40 |
<p align="center">
<img src="https://i.imgflip.com/373iay.gif" alt="Reading Normalized Text" width="226"/><img src="https://media.tenor.com/images/c0180f70732a18b4965448d33adba3d0/tenor.gif" alt="Cat Reading Text" width="200"/>
</p>
*\*\* : They are clearly using specific code for a specific encoding even if covering most of used one*<br>
Did you got there because of the logs? See [https://charset-normalizer.readthedocs.io/en/latest/user/miscellaneous.html](https://charset-normalizer.readthedocs.io/en/latest/user/miscellaneous.html)
## ⚡ Performance
This package offer better performance than its counterpart Chardet. Here are some numbers.
| Package | Accuracy | Mean per file (ms) | File per sec (est) |
|-----------------------------------------------|:--------:|:------------------:|:------------------:|
| [chardet](https://github.com/chardet/chardet) | 86 % | 200 ms | 5 file/sec |
| charset-normalizer | **98 %** | **10 ms** | 100 file/sec |
| Package | 99th percentile | 95th percentile | 50th percentile |
|-----------------------------------------------|:---------------:|:---------------:|:---------------:|
| [chardet](https://github.com/chardet/chardet) | 1200 ms | 287 ms | 23 ms |
| charset-normalizer | 100 ms | 50 ms | 5 ms |
Chardet's performance on larger file (1MB+) are very poor. Expect huge difference on large payload.
> Stats are generated using 400+ files using default parameters. More details on used files, see GHA workflows.
> And yes, these results might change at any time. The dataset can be updated to include more files.
> The actual delays heavily depends on your CPU capabilities. The factors should remain the same.
> Keep in mind that the stats are generous and that Chardet accuracy vs our is measured using Chardet initial capability
> (eg. Supported Encoding) Challenge-them if you want.
## ✨ Installation
Using pip:
```sh
pip install charset-normalizer -U
```
## 🚀 Basic Usage
### CLI
This package comes with a CLI.
```
usage: normalizer [-h] [-v] [-a] [-n] [-m] [-r] [-f] [-t THRESHOLD]
file [file ...]
The Real First Universal Charset Detector. Discover originating encoding used
on text file. Normalize text to unicode.
positional arguments:
files File(s) to be analysed
optional arguments:
-h, --help show this help message and exit
-v, --verbose Display complementary information about file if any.
Stdout will contain logs about the detection process.
-a, --with-alternative
Output complementary possibilities if any. Top-level
JSON WILL be a list.
-n, --normalize Permit to normalize input file. If not set, program
does not write anything.
-m, --minimal Only output the charset detected to STDOUT. Disabling
JSON output.
-r, --replace Replace file when trying to normalize it instead of
creating a new one.
-f, --force Replace file without asking if you are sure, use this
flag with caution.
-t THRESHOLD, --threshold THRESHOLD
Define a custom maximum amount of chaos allowed in
decoded content. 0. <= chaos <= 1.
--version Show version information and exit.
```
```bash
normalizer ./data/sample.1.fr.srt
```
or
```bash
python -m charset_normalizer ./data/sample.1.fr.srt
```
🎉 Since version 1.4.0 the CLI produce easily usable stdout result in JSON format.
```json
{
"path": "/home/default/projects/charset_normalizer/data/sample.1.fr.srt",
"encoding": "cp1252",
"encoding_aliases": [
"1252",
"windows_1252"
],
"alternative_encodings": [
"cp1254",
"cp1256",
"cp1258",
"iso8859_14",
"iso8859_15",
"iso8859_16",
"iso8859_3",
"iso8859_9",
"latin_1",
"mbcs"
],
"language": "French",
"alphabets": [
"Basic Latin",
"Latin-1 Supplement"
],
"has_sig_or_bom": false,
"chaos": 0.149,
"coherence": 97.152,
"unicode_path": null,
"is_preferred": true
}
```
### Python
*Just print out normalized text*
```python
from charset_normalizer import from_path
results = from_path('./my_subtitle.srt')
print(str(results.best()))
```
*Upgrade your code without effort*
```python
from charset_normalizer import detect
```
The above code will behave the same as **chardet**. We ensure that we offer the best (reasonable) BC result possible.
See the docs for advanced usage : [readthedocs.io](https://charset-normalizer.readthedocs.io/en/latest/)
## 😇 Why
When I started using Chardet, I noticed that it was not suited to my expectations, and I wanted to propose a
reliable alternative using a completely different method. Also! I never back down on a good challenge!
I **don't care** about the **originating charset** encoding, because **two different tables** can
produce **two identical rendered string.**
What I want is to get readable text, the best I can.
In a way, **I'm brute forcing text decoding.** How cool is that ? 😎
Don't confuse package **ftfy** with charset-normalizer or chardet. ftfy goal is to repair unicode string whereas charset-normalizer to convert raw file in unknown encoding to unicode.
## 🍰 How
- Discard all charset encoding table that could not fit the binary content.
- Measure noise, or the mess once opened (by chunks) with a corresponding charset encoding.
- Extract matches with the lowest mess detected.
- Additionally, we measure coherence / probe for a language.
**Wait a minute**, what is noise/mess and coherence according to **YOU ?**
*Noise :* I opened hundred of text files, **written by humans**, with the wrong encoding table. **I observed**, then
**I established** some ground rules about **what is obvious** when **it seems like** a mess.
I know that my interpretation of what is noise is probably incomplete, feel free to contribute in order to
improve or rewrite it.
*Coherence :* For each language there is on earth, we have computed ranked letter appearance occurrences (the best we can). So I thought
that intel is worth something here. So I use those records against decoded text to check if I can detect intelligent design.
## ⚡ Known limitations
- Language detection is unreliable when text contains two or more languages sharing identical letters. (eg. HTML (english tags) + Turkish content (Sharing Latin characters))
- Every charset detector heavily depends on sufficient content. In common cases, do not bother run detection on very tiny content.
## ⚠️ About Python EOLs
**If you are running:**
- Python >=2.7,<3.5: Unsupported
- Python 3.5: charset-normalizer < 2.1
- Python 3.6: charset-normalizer < 3.1
- Python 3.7: charset-normalizer < 4.0
Upgrade your Python interpreter as soon as possible.
## 👤 Contributing
Contributions, issues and feature requests are very much welcome.<br />
Feel free to check [issues page](https://github.com/ousret/charset_normalizer/issues) if you want to contribute.
## 📝 License
Copyright © [Ahmed TAHRI @Ousret](https://github.com/Ousret).<br />
This project is [MIT](https://github.com/Ousret/charset_normalizer/blob/master/LICENSE) licensed.
Characters frequencies used in this project © 2012 [Denny Vrandečić](http://simia.net/letters/)
## 💼 For Enterprise
Professional support for charset-normalizer is available as part of the [Tidelift
Subscription][1]. Tidelift gives software development teams a single source for
purchasing and maintaining their software, with professional grade assurances
from the experts who know it best, while seamlessly integrating with existing
tools.
[1]: https://tidelift.com/subscription/pkg/pypi-charset-normalizer?utm_source=pypi-charset-normalizer&utm_medium=readme
# Changelog
All notable changes to charset-normalizer will be documented in this file. This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [3.3.2](https://github.com/Ousret/charset_normalizer/compare/3.3.1...3.3.2) (2023-10-31)
### Fixed
- Unintentional memory usage regression when using large payload that match several encoding (#376)
- Regression on some detection case showcased in the documentation (#371)
### Added
- Noise (md) probe that identify malformed arabic representation due to the presence of letters in isolated form (credit to my wife)
## [3.3.1](https://github.com/Ousret/charset_normalizer/compare/3.3.0...3.3.1) (2023-10-22)
### Changed
- Optional mypyc compilation upgraded to version 1.6.1 for Python >= 3.8
- Improved the general detection reliability based on reports from the community
## [3.3.0](https://github.com/Ousret/charset_normalizer/compare/3.2.0...3.3.0) (2023-09-30)
### Added
- Allow to execute the CLI (e.g. normalizer) through `python -m charset_normalizer.cli` or `python -m charset_normalizer`
- Support for 9 forgotten encoding that are supported by Python but unlisted in `encoding.aliases` as they have no alias (#323)
### Removed
- (internal) Redundant utils.is_ascii function and unused function is_private_use_only
- (internal) charset_normalizer.assets is moved inside charset_normalizer.constant
### Changed
- (internal) Unicode code blocks in constants are updated using the latest v15.0.0 definition to improve detection
- Optional mypyc compilation upgraded to version 1.5.1 for Python >= 3.8
### Fixed
- Unable to properly sort CharsetMatch when both chaos/noise and coherence were close due to an unreachable condition in \_\_lt\_\_ (#350)
## [3.2.0](https://github.com/Ousret/charset_normalizer/compare/3.1.0...3.2.0) (2023-06-07)
### Changed
- Typehint for function `from_path` no longer enforce `PathLike` as its first argument
- Minor improvement over the global detection reliability
### Added
- Introduce function `is_binary` that relies on main capabilities, and optimized to detect binaries
- Propagate `enable_fallback` argument throughout `from_bytes`, `from_path`, and `from_fp` that allow a deeper control over the detection (default True)
- Explicit support for Python 3.12
### Fixed
- Edge case detection failure where a file would contain 'very-long' camel cased word (Issue #289)
## [3.1.0](https://github.com/Ousret/charset_normalizer/compare/3.0.1...3.1.0) (2023-03-06)
### Added
- Argument `should_rename_legacy` for legacy function `detect` and disregard any new arguments without errors (PR #262)
### Removed
- Support for Python 3.6 (PR #260)
### Changed
- Optional speedup provided by mypy/c 1.0.1
## [3.0.1](https://github.com/Ousret/charset_normalizer/compare/3.0.0...3.0.1) (2022-11-18)
### Fixed
- Multi-bytes cutter/chunk generator did not always cut correctly (PR #233)
### Changed
- Speedup provided by mypy/c 0.990 on Python >= 3.7
## [3.0.0](https://github.com/Ousret/charset_normalizer/compare/2.1.1...3.0.0) (2022-10-20)
### Added
- Extend the capability of explain=True when cp_isolation contains at most two entries (min one), will log in details of the Mess-detector results
- Support for alternative language frequency set in charset_normalizer.assets.FREQUENCIES
- Add parameter `language_threshold` in `from_bytes`, `from_path` and `from_fp` to adjust the minimum expected coherence ratio
- `normalizer --version` now specify if current version provide extra speedup (meaning mypyc compilation whl)
### Changed
- Build with static metadata using 'build' frontend
- Make the language detection stricter
- Optional: Module `md.py` can be compiled using Mypyc to provide an extra speedup up to 4x faster than v2.1
### Fixed
- CLI with opt --normalize fail when using full path for files
- TooManyAccentuatedPlugin induce false positive on the mess detection when too few alpha character have been fed to it
- Sphinx warnings when generating the documentation
### Removed
- Coherence detector no longer return 'Simple English' instead return 'English'
- Coherence detector no longer return 'Classical Chinese' instead return 'Chinese'
- Breaking: Method `first()` and `best()` from CharsetMatch
- UTF-7 will no longer appear as "detected" without a recognized SIG/mark (is unreliable/conflict with ASCII)
- Breaking: Class aliases CharsetDetector, CharsetDoctor, CharsetNormalizerMatch and CharsetNormalizerMatches
- Breaking: Top-level function `normalize`
- Breaking: Properties `chaos_secondary_pass`, `coherence_non_latin` and `w_counter` from CharsetMatch
- Support for the backport `unicodedata2`
## [3.0.0rc1](https://github.com/Ousret/charset_normalizer/compare/3.0.0b2...3.0.0rc1) (2022-10-18)
### Added
- Extend the capability of explain=True when cp_isolation contains at most two entries (min one), will log in details of the Mess-detector results
- Support for alternative language frequency set in charset_normalizer.assets.FREQUENCIES
- Add parameter `language_threshold` in `from_bytes`, `from_path` and `from_fp` to adjust the minimum expected coherence ratio
### Changed
- Build with static metadata using 'build' frontend
- Make the language detection stricter
### Fixed
- CLI with opt --normalize fail when using full path for files
- TooManyAccentuatedPlugin induce false positive on the mess detection when too few alpha character have been fed to it
### Removed
- Coherence detector no longer return 'Simple English' instead return 'English'
- Coherence detector no longer return 'Classical Chinese' instead return 'Chinese'
## [3.0.0b2](https://github.com/Ousret/charset_normalizer/compare/3.0.0b1...3.0.0b2) (2022-08-21)
### Added
- `normalizer --version` now specify if current version provide extra speedup (meaning mypyc compilation whl)
### Removed
- Breaking: Method `first()` and `best()` from CharsetMatch
- UTF-7 will no longer appear as "detected" without a recognized SIG/mark (is unreliable/conflict with ASCII)
### Fixed
- Sphinx warnings when generating the documentation
## [3.0.0b1](https://github.com/Ousret/charset_normalizer/compare/2.1.0...3.0.0b1) (2022-08-15)
### Changed
- Optional: Module `md.py` can be compiled using Mypyc to provide an extra speedup up to 4x faster than v2.1
### Removed
- Breaking: Class aliases CharsetDetector, CharsetDoctor, CharsetNormalizerMatch and CharsetNormalizerMatches
- Breaking: Top-level function `normalize`
- Breaking: Properties `chaos_secondary_pass`, `coherence_non_latin` and `w_counter` from CharsetMatch
- Support for the backport `unicodedata2`
## [2.1.1](https://github.com/Ousret/charset_normalizer/compare/2.1.0...2.1.1) (2022-08-19)
### Deprecated
- Function `normalize` scheduled for removal in 3.0
### Changed
- Removed useless call to decode in fn is_unprintable (#206)
### Fixed
- Third-party library (i18n xgettext) crashing not recognizing utf_8 (PEP 263) with underscore from [@aleksandernovikov](https://github.com/aleksandernovikov) (#204)
## [2.1.0](https://github.com/Ousret/charset_normalizer/compare/2.0.12...2.1.0) (2022-06-19)
### Added
- Output the Unicode table version when running the CLI with `--version` (PR #194)
### Changed
- Re-use decoded buffer for single byte character sets from [@nijel](https://github.com/nijel) (PR #175)
- Fixing some performance bottlenecks from [@deedy5](https://github.com/deedy5) (PR #183)
### Fixed
- Workaround potential bug in cpython with Zero Width No-Break Space located in Arabic Presentation Forms-B, Unicode 1.1 not acknowledged as space (PR #175)
- CLI default threshold aligned with the API threshold from [@oleksandr-kuzmenko](https://github.com/oleksandr-kuzmenko) (PR #181)
### Removed
- Support for Python 3.5 (PR #192)
### Deprecated
- Use of backport unicodedata from `unicodedata2` as Python is quickly catching up, scheduled for removal in 3.0 (PR #194)
## [2.0.12](https://github.com/Ousret/charset_normalizer/compare/2.0.11...2.0.12) (2022-02-12)
### Fixed
- ASCII miss-detection on rare cases (PR #170)
## [2.0.11](https://github.com/Ousret/charset_normalizer/compare/2.0.10...2.0.11) (2022-01-30)
### Added
- Explicit support for Python 3.11 (PR #164)
### Changed
- The logging behavior have been completely reviewed, now using only TRACE and DEBUG levels (PR #163 #165)
## [2.0.10](https://github.com/Ousret/charset_normalizer/compare/2.0.9...2.0.10) (2022-01-04)
### Fixed
- Fallback match entries might lead to UnicodeDecodeError for large bytes sequence (PR #154)
### Changed
- Skipping the language-detection (CD) on ASCII (PR #155)
## [2.0.9](https://github.com/Ousret/charset_normalizer/compare/2.0.8...2.0.9) (2021-12-03)
### Changed
- Moderating the logging impact (since 2.0.8) for specific environments (PR #147)
### Fixed
- Wrong logging level applied when setting kwarg `explain` to True (PR #146)
## [2.0.8](https://github.com/Ousret/charset_normalizer/compare/2.0.7...2.0.8) (2021-11-24)
### Changed
- Improvement over Vietnamese detection (PR #126)
- MD improvement on trailing data and long foreign (non-pure latin) data (PR #124)
- Efficiency improvements in cd/alphabet_languages from [@adbar](https://github.com/adbar) (PR #122)
- call sum() without an intermediary list following PEP 289 recommendations from [@adbar](https://github.com/adbar) (PR #129)
- Code style as refactored by Sourcery-AI (PR #131)
- Minor adjustment on the MD around european words (PR #133)
- Remove and replace SRTs from assets / tests (PR #139)
- Initialize the library logger with a `NullHandler` by default from [@nmaynes](https://github.com/nmaynes) (PR #135)
- Setting kwarg `explain` to True will add provisionally (bounded to function lifespan) a specific stream handler (PR #135)
### Fixed
- Fix large (misleading) sequence giving UnicodeDecodeError (PR #137)
- Avoid using too insignificant chunk (PR #137)
### Added
- Add and expose function `set_logging_handler` to configure a specific StreamHandler from [@nmaynes](https://github.com/nmaynes) (PR #135)
- Add `CHANGELOG.md` entries, format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) (PR #141)
## [2.0.7](https://github.com/Ousret/charset_normalizer/compare/2.0.6...2.0.7) (2021-10-11)
### Added
- Add support for Kazakh (Cyrillic) language detection (PR #109)
### Changed
- Further, improve inferring the language from a given single-byte code page (PR #112)
- Vainly trying to leverage PEP263 when PEP3120 is not supported (PR #116)
- Refactoring for potential performance improvements in loops from [@adbar](https://github.com/adbar) (PR #113)
- Various detection improvement (MD+CD) (PR #117)
### Removed
- Remove redundant logging entry about detected language(s) (PR #115)
### Fixed
- Fix a minor inconsistency between Python 3.5 and other versions regarding language detection (PR #117 #102)
## [2.0.6](https://github.com/Ousret/charset_normalizer/compare/2.0.5...2.0.6) (2021-09-18)
### Fixed
- Unforeseen regression with the loss of the backward-compatibility with some older minor of Python 3.5.x (PR #100)
- Fix CLI crash when using --minimal output in certain cases (PR #103)
### Changed
- Minor improvement to the detection efficiency (less than 1%) (PR #106 #101)
## [2.0.5](https://github.com/Ousret/charset_normalizer/compare/2.0.4...2.0.5) (2021-09-14)
### Changed
- The project now comply with: flake8, mypy, isort and black to ensure a better overall quality (PR #81)
- The BC-support with v1.x was improved, the old staticmethods are restored (PR #82)
- The Unicode detection is slightly improved (PR #93)
- Add syntax sugar \_\_bool\_\_ for results CharsetMatches list-container (PR #91)
### Removed
- The project no longer raise warning on tiny content given for detection, will be simply logged as warning instead (PR #92)
### Fixed
- In some rare case, the chunks extractor could cut in the middle of a multi-byte character and could mislead the mess detection (PR #95)
- Some rare 'space' characters could trip up the UnprintablePlugin/Mess detection (PR #96)
- The MANIFEST.in was not exhaustive (PR #78)
## [2.0.4](https://github.com/Ousret/charset_normalizer/compare/2.0.3...2.0.4) (2021-07-30)
### Fixed
- The CLI no longer raise an unexpected exception when no encoding has been found (PR #70)
- Fix accessing the 'alphabets' property when the payload contains surrogate characters (PR #68)
- The logger could mislead (explain=True) on detected languages and the impact of one MBCS match (PR #72)
- Submatch factoring could be wrong in rare edge cases (PR #72)
- Multiple files given to the CLI were ignored when publishing results to STDOUT. (After the first path) (PR #72)
- Fix line endings from CRLF to LF for certain project files (PR #67)
### Changed
- Adjust the MD to lower the sensitivity, thus improving the global detection reliability (PR #69 #76)
- Allow fallback on specified encoding if any (PR #71)
## [2.0.3](https://github.com/Ousret/charset_normalizer/compare/2.0.2...2.0.3) (2021-07-16)
### Changed
- Part of the detection mechanism has been improved to be less sensitive, resulting in more accurate detection results. Especially ASCII. (PR #63)
- According to the community wishes, the detection will fall back on ASCII or UTF-8 in a last-resort case. (PR #64)
## [2.0.2](https://github.com/Ousret/charset_normalizer/compare/2.0.1...2.0.2) (2021-07-15)
### Fixed
- Empty/Too small JSON payload miss-detection fixed. Report from [@tseaver](https://github.com/tseaver) (PR #59)
### Changed
- Don't inject unicodedata2 into sys.modules from [@akx](https://github.com/akx) (PR #57)
## [2.0.1](https://github.com/Ousret/charset_normalizer/compare/2.0.0...2.0.1) (2021-07-13)
### Fixed
- Make it work where there isn't a filesystem available, dropping assets frequencies.json. Report from [@sethmlarson](https://github.com/sethmlarson). (PR #55)
- Using explain=False permanently disable the verbose output in the current runtime (PR #47)
- One log entry (language target preemptive) was not show in logs when using explain=True (PR #47)
- Fix undesired exception (ValueError) on getitem of instance CharsetMatches (PR #52)
### Changed
- Public function normalize default args values were not aligned with from_bytes (PR #53)
### Added
- You may now use charset aliases in cp_isolation and cp_exclusion arguments (PR #47)
## [2.0.0](https://github.com/Ousret/charset_normalizer/compare/1.4.1...2.0.0) (2021-07-02)
### Changed
- 4x to 5 times faster than the previous 1.4.0 release. At least 2x faster than Chardet.
- Accent has been made on UTF-8 detection, should perform rather instantaneous.
- The backward compatibility with Chardet has been greatly improved. The legacy detect function returns an identical charset name whenever possible.
- The detection mechanism has been slightly improved, now Turkish content is detected correctly (most of the time)
- The program has been rewritten to ease the readability and maintainability. (+Using static typing)+
- utf_7 detection has been reinstated.
### Removed
- This package no longer require anything when used with Python 3.5 (Dropped cached_property)
- Removed support for these languages: Catalan, Esperanto, Kazakh, Baque, Volapük, Azeri, Galician, Nynorsk, Macedonian, and Serbocroatian.
- The exception hook on UnicodeDecodeError has been removed.
### Deprecated
- Methods coherence_non_latin, w_counter, chaos_secondary_pass of the class CharsetMatch are now deprecated and scheduled for removal in v3.0
### Fixed
- The CLI output used the relative path of the file(s). Should be absolute.
## [1.4.1](https://github.com/Ousret/charset_normalizer/compare/1.4.0...1.4.1) (2021-05-28)
### Fixed
- Logger configuration/usage no longer conflict with others (PR #44)
## [1.4.0](https://github.com/Ousret/charset_normalizer/compare/1.3.9...1.4.0) (2021-05-21)
### Removed
- Using standard logging instead of using the package loguru.
- Dropping nose test framework in favor of the maintained pytest.
- Choose to not use dragonmapper package to help with gibberish Chinese/CJK text.
- Require cached_property only for Python 3.5 due to constraint. Dropping for every other interpreter version.
- Stop support for UTF-7 that does not contain a SIG.
- Dropping PrettyTable, replaced with pure JSON output in CLI.
### Fixed
- BOM marker in a CharsetNormalizerMatch instance could be False in rare cases even if obviously present. Due to the sub-match factoring process.
- Not searching properly for the BOM when trying utf32/16 parent codec.
### Changed
- Improving the package final size by compressing frequencies.json.
- Huge improvement over the larges payload.
### Added
- CLI now produces JSON consumable output.
- Return ASCII if given sequences fit. Given reasonable confidence.
## [1.3.9](https://github.com/Ousret/charset_normalizer/compare/1.3.8...1.3.9) (2021-05-13)
### Fixed
- In some very rare cases, you may end up getting encode/decode errors due to a bad bytes payload (PR #40)
## [1.3.8](https://github.com/Ousret/charset_normalizer/compare/1.3.7...1.3.8) (2021-05-12)
### Fixed
- Empty given payload for detection may cause an exception if trying to access the `alphabets` property. (PR #39)
## [1.3.7](https://github.com/Ousret/charset_normalizer/compare/1.3.6...1.3.7) (2021-05-12)
### Fixed
- The legacy detect function should return UTF-8-SIG if sig is present in the payload. (PR #38)
## [1.3.6](https://github.com/Ousret/charset_normalizer/compare/1.3.5...1.3.6) (2021-02-09)
### Changed
- Amend the previous release to allow prettytable 2.0 (PR #35)
## [1.3.5](https://github.com/Ousret/charset_normalizer/compare/1.3.4...1.3.5) (2021-02-08)
### Fixed
- Fix error while using the package with a python pre-release interpreter (PR #33)
### Changed
- Dependencies refactoring, constraints revised.
### Added
- Add python 3.9 and 3.10 to the supported interpreters
MIT License
Copyright (c) 2019 TAHRI Ahmed R.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
@@ -0,0 +1,35 @@
../../bin/normalizer,sha256=O1tLXvRzeuQHDVSDjsuiUko8eeXdZtA_eGTgJcdT5qs,233
charset_normalizer-3.3.2.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
charset_normalizer-3.3.2.dist-info/LICENSE,sha256=6zGgxaT7Cbik4yBV0lweX5w1iidS_vPNcgIT0cz-4kE,1070
charset_normalizer-3.3.2.dist-info/METADATA,sha256=cfLhl5A6SI-F0oclm8w8ux9wshL1nipdeCdVnYb4AaA,33550
charset_normalizer-3.3.2.dist-info/RECORD,,
charset_normalizer-3.3.2.dist-info/WHEEL,sha256=4ZiCdXIWMxJyEClivrQv1QAHZpQh8kVYU92_ZAVwaok,152
charset_normalizer-3.3.2.dist-info/entry_points.txt,sha256=ADSTKrkXZ3hhdOVFi6DcUEHQRS0xfxDIE_pEz4wLIXA,65
charset_normalizer-3.3.2.dist-info/top_level.txt,sha256=7ASyzePr8_xuZWJsnqJjIBtyV8vhEo0wBCv1MPRRi3Q,19
charset_normalizer/__init__.py,sha256=UzI3xC8PhmcLRMzSgPb6minTmRq0kWznnCBJ8ZCc2XI,1577
charset_normalizer/__main__.py,sha256=JxY8bleaENOFlLRb9HfoeZCzAMnn2A1oGR5Xm2eyqg0,73
charset_normalizer/__pycache__/__init__.cpython-312.pyc,,
charset_normalizer/__pycache__/__main__.cpython-312.pyc,,
charset_normalizer/__pycache__/api.cpython-312.pyc,,
charset_normalizer/__pycache__/cd.cpython-312.pyc,,
charset_normalizer/__pycache__/constant.cpython-312.pyc,,
charset_normalizer/__pycache__/legacy.cpython-312.pyc,,
charset_normalizer/__pycache__/md.cpython-312.pyc,,
charset_normalizer/__pycache__/models.cpython-312.pyc,,
charset_normalizer/__pycache__/utils.cpython-312.pyc,,
charset_normalizer/__pycache__/version.cpython-312.pyc,,
charset_normalizer/api.py,sha256=WOlWjy6wT8SeMYFpaGbXZFN1TMXa-s8vZYfkL4G29iQ,21097
charset_normalizer/cd.py,sha256=xwZliZcTQFA3jU0c00PRiu9MNxXTFxQkFLWmMW24ZzI,12560
charset_normalizer/cli/__init__.py,sha256=D5ERp8P62llm2FuoMzydZ7d9rs8cvvLXqE-1_6oViPc,100
charset_normalizer/cli/__main__.py,sha256=2F-xURZJzo063Ye-2RLJ2wcmURpbKeAzKwpiws65dAs,9744
charset_normalizer/cli/__pycache__/__init__.cpython-312.pyc,,
charset_normalizer/cli/__pycache__/__main__.cpython-312.pyc,,
charset_normalizer/constant.py,sha256=p0IsOVcEbPWYPOdWhnhRbjK1YVBy6fs05C5vKC-zoxU,40481
charset_normalizer/legacy.py,sha256=T-QuVMsMeDiQEk8WSszMrzVJg_14AMeSkmHdRYhdl1k,2071
charset_normalizer/md.cpython-312-x86_64-linux-gnu.so,sha256=W654QTU3QZI6eWJ0fanScAr0_O6sL0I61fyRSdC-39Y,16064
charset_normalizer/md.py,sha256=NkSuVLK13_a8c7BxZ4cGIQ5vOtGIWOdh22WZEvjp-7U,19624
charset_normalizer/md__mypyc.cpython-312-x86_64-linux-gnu.so,sha256=IlObIV4dmRhFV8V7H-zK4rTxPzTSi9JmrWZD26JQfxI,272640
charset_normalizer/models.py,sha256=I5i0s4aKCCgLPY2tUY3pwkgFA-BUbbNxQ7hVkVTt62s,11624
charset_normalizer/py.typed,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
charset_normalizer/utils.py,sha256=teiosMqzKjXyAHXnGdjSBOgnBZwx-SkBbCLrx0UXy8M,11894
charset_normalizer/version.py,sha256=iHKUfHD3kDRSyrh_BN2ojh43TA5-UZQjvbVIEFfpHDs,79
@@ -0,0 +1,6 @@
Wheel-Version: 1.0
Generator: bdist_wheel (0.41.2)
Root-Is-Purelib: false
Tag: cp312-cp312-manylinux_2_17_x86_64
Tag: cp312-cp312-manylinux2014_x86_64
@@ -0,0 +1,2 @@
[console_scripts]
normalizer = charset_normalizer.cli:cli_detect
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""
Charset-Normalizer
~~~~~~~~~~~~~~
The Real First Universal Charset Detector.
A library that helps you read text from an unknown charset encoding.
Motivated by chardet, This package is trying to resolve the issue by taking a new approach.
All IANA character set names for which the Python core library provides codecs are supported.
Basic usage:
>>> from charset_normalizer import from_bytes
>>> results = from_bytes('Bсеки човек има право на образование. Oбразованието!'.encode('utf_8'))
>>> best_guess = results.best()
>>> str(best_guess)
'Bсеки човек има право на образование. Oбразованието!'
Others methods and usages are available - see the full documentation
at <https://github.com/Ousret/charset_normalizer>.
:copyright: (c) 2021 by Ahmed TAHRI
:license: MIT, see LICENSE for more details.
"""
import logging
from .api import from_bytes, from_fp, from_path, is_binary
from .legacy import detect
from .models import CharsetMatch, CharsetMatches
from .utils import set_logging_handler
from .version import VERSION, __version__
__all__ = (
"from_fp",
"from_path",
"from_bytes",
"is_binary",
"detect",
"CharsetMatch",
"CharsetMatches",
"__version__",
"VERSION",
"set_logging_handler",
)
# Attach a NullHandler to the top level logger by default
# https://docs.python.org/3.3/howto/logging.html#configuring-logging-for-a-library
logging.getLogger("charset_normalizer").addHandler(logging.NullHandler())
@@ -0,0 +1,4 @@
from .cli import cli_detect
if __name__ == "__main__":
cli_detect()
@@ -0,0 +1,626 @@
import logging
from os import PathLike
from typing import BinaryIO, List, Optional, Set, Union
from .cd import (
coherence_ratio,
encoding_languages,
mb_encoding_languages,
merge_coherence_ratios,
)
from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE
from .md import mess_ratio
from .models import CharsetMatch, CharsetMatches
from .utils import (
any_specified_encoding,
cut_sequence_chunks,
iana_name,
identify_sig_or_bom,
is_cp_similar,
is_multi_byte_encoding,
should_strip_sig_or_bom,
)
# Will most likely be controversial
# logging.addLevelName(TRACE, "TRACE")
logger = logging.getLogger("charset_normalizer")
explain_handler = logging.StreamHandler()
explain_handler.setFormatter(
logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
)
def from_bytes(
sequences: Union[bytes, bytearray],
steps: int = 5,
chunk_size: int = 512,
threshold: float = 0.2,
cp_isolation: Optional[List[str]] = None,
cp_exclusion: Optional[List[str]] = None,
preemptive_behaviour: bool = True,
explain: bool = False,
language_threshold: float = 0.1,
enable_fallback: bool = True,
) -> CharsetMatches:
"""
Given a raw bytes sequence, return the best possibles charset usable to render str objects.
If there is no results, it is a strong indicator that the source is binary/not text.
By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
but never take it for granted. Can improve the performance.
You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
purpose.
This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
Custom logging format and handler can be set manually.
"""
if not isinstance(sequences, (bytearray, bytes)):
raise TypeError(
"Expected object of type bytes or bytearray, got: {0}".format(
type(sequences)
)
)
if explain:
previous_logger_level: int = logger.level
logger.addHandler(explain_handler)
logger.setLevel(TRACE)
length: int = len(sequences)
if length == 0:
logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
if explain:
logger.removeHandler(explain_handler)
logger.setLevel(previous_logger_level or logging.WARNING)
return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
if cp_isolation is not None:
logger.log(
TRACE,
"cp_isolation is set. use this flag for debugging purpose. "
"limited list of encoding allowed : %s.",
", ".join(cp_isolation),
)
cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
else:
cp_isolation = []
if cp_exclusion is not None:
logger.log(
TRACE,
"cp_exclusion is set. use this flag for debugging purpose. "
"limited list of encoding excluded : %s.",
", ".join(cp_exclusion),
)
cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
else:
cp_exclusion = []
if length <= (chunk_size * steps):
logger.log(
TRACE,
"override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
steps,
chunk_size,
length,
)
steps = 1
chunk_size = length
if steps > 1 and length / steps < chunk_size:
chunk_size = int(length / steps)
is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
if is_too_small_sequence:
logger.log(
TRACE,
"Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
length
),
)
elif is_too_large_sequence:
logger.log(
TRACE,
"Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
length
),
)
prioritized_encodings: List[str] = []
specified_encoding: Optional[str] = (
any_specified_encoding(sequences) if preemptive_behaviour else None
)
if specified_encoding is not None:
prioritized_encodings.append(specified_encoding)
logger.log(
TRACE,
"Detected declarative mark in sequence. Priority +1 given for %s.",
specified_encoding,
)
tested: Set[str] = set()
tested_but_hard_failure: List[str] = []
tested_but_soft_failure: List[str] = []
fallback_ascii: Optional[CharsetMatch] = None
fallback_u8: Optional[CharsetMatch] = None
fallback_specified: Optional[CharsetMatch] = None
results: CharsetMatches = CharsetMatches()
sig_encoding, sig_payload = identify_sig_or_bom(sequences)
if sig_encoding is not None:
prioritized_encodings.append(sig_encoding)
logger.log(
TRACE,
"Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
len(sig_payload),
sig_encoding,
)
prioritized_encodings.append("ascii")
if "utf_8" not in prioritized_encodings:
prioritized_encodings.append("utf_8")
for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
if cp_isolation and encoding_iana not in cp_isolation:
continue
if cp_exclusion and encoding_iana in cp_exclusion:
continue
if encoding_iana in tested:
continue
tested.add(encoding_iana)
decoded_payload: Optional[str] = None
bom_or_sig_available: bool = sig_encoding == encoding_iana
strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
encoding_iana
)
if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
logger.log(
TRACE,
"Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
encoding_iana,
)
continue
if encoding_iana in {"utf_7"} and not bom_or_sig_available:
logger.log(
TRACE,
"Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
encoding_iana,
)
continue
try:
is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
except (ModuleNotFoundError, ImportError):
logger.log(
TRACE,
"Encoding %s does not provide an IncrementalDecoder",
encoding_iana,
)
continue
try:
if is_too_large_sequence and is_multi_byte_decoder is False:
str(
sequences[: int(50e4)]
if strip_sig_or_bom is False
else sequences[len(sig_payload) : int(50e4)],
encoding=encoding_iana,
)
else:
decoded_payload = str(
sequences
if strip_sig_or_bom is False
else sequences[len(sig_payload) :],
encoding=encoding_iana,
)
except (UnicodeDecodeError, LookupError) as e:
if not isinstance(e, LookupError):
logger.log(
TRACE,
"Code page %s does not fit given bytes sequence at ALL. %s",
encoding_iana,
str(e),
)
tested_but_hard_failure.append(encoding_iana)
continue
similar_soft_failure_test: bool = False
for encoding_soft_failed in tested_but_soft_failure:
if is_cp_similar(encoding_iana, encoding_soft_failed):
similar_soft_failure_test = True
break
if similar_soft_failure_test:
logger.log(
TRACE,
"%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
encoding_iana,
encoding_soft_failed,
)
continue
r_ = range(
0 if not bom_or_sig_available else len(sig_payload),
length,
int(length / steps),
)
multi_byte_bonus: bool = (
is_multi_byte_decoder
and decoded_payload is not None
and len(decoded_payload) < length
)
if multi_byte_bonus:
logger.log(
TRACE,
"Code page %s is a multi byte encoding table and it appear that at least one character "
"was encoded using n-bytes.",
encoding_iana,
)
max_chunk_gave_up: int = int(len(r_) / 4)
max_chunk_gave_up = max(max_chunk_gave_up, 2)
early_stop_count: int = 0
lazy_str_hard_failure = False
md_chunks: List[str] = []
md_ratios = []
try:
for chunk in cut_sequence_chunks(
sequences,
encoding_iana,
r_,
chunk_size,
bom_or_sig_available,
strip_sig_or_bom,
sig_payload,
is_multi_byte_decoder,
decoded_payload,
):
md_chunks.append(chunk)
md_ratios.append(
mess_ratio(
chunk,
threshold,
explain is True and 1 <= len(cp_isolation) <= 2,
)
)
if md_ratios[-1] >= threshold:
early_stop_count += 1
if (early_stop_count >= max_chunk_gave_up) or (
bom_or_sig_available and strip_sig_or_bom is False
):
break
except (
UnicodeDecodeError
) as e: # Lazy str loading may have missed something there
logger.log(
TRACE,
"LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
encoding_iana,
str(e),
)
early_stop_count = max_chunk_gave_up
lazy_str_hard_failure = True
# We might want to check the sequence again with the whole content
# Only if initial MD tests passes
if (
not lazy_str_hard_failure
and is_too_large_sequence
and not is_multi_byte_decoder
):
try:
sequences[int(50e3) :].decode(encoding_iana, errors="strict")
except UnicodeDecodeError as e:
logger.log(
TRACE,
"LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
encoding_iana,
str(e),
)
tested_but_hard_failure.append(encoding_iana)
continue
mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
tested_but_soft_failure.append(encoding_iana)
logger.log(
TRACE,
"%s was excluded because of initial chaos probing. Gave up %i time(s). "
"Computed mean chaos is %f %%.",
encoding_iana,
early_stop_count,
round(mean_mess_ratio * 100, ndigits=3),
)
# Preparing those fallbacks in case we got nothing.
if (
enable_fallback
and encoding_iana in ["ascii", "utf_8", specified_encoding]
and not lazy_str_hard_failure
):
fallback_entry = CharsetMatch(
sequences, encoding_iana, threshold, False, [], decoded_payload
)
if encoding_iana == specified_encoding:
fallback_specified = fallback_entry
elif encoding_iana == "ascii":
fallback_ascii = fallback_entry
else:
fallback_u8 = fallback_entry
continue
logger.log(
TRACE,
"%s passed initial chaos probing. Mean measured chaos is %f %%",
encoding_iana,
round(mean_mess_ratio * 100, ndigits=3),
)
if not is_multi_byte_decoder:
target_languages: List[str] = encoding_languages(encoding_iana)
else:
target_languages = mb_encoding_languages(encoding_iana)
if target_languages:
logger.log(
TRACE,
"{} should target any language(s) of {}".format(
encoding_iana, str(target_languages)
),
)
cd_ratios = []
# We shall skip the CD when its about ASCII
# Most of the time its not relevant to run "language-detection" on it.
if encoding_iana != "ascii":
for chunk in md_chunks:
chunk_languages = coherence_ratio(
chunk,
language_threshold,
",".join(target_languages) if target_languages else None,
)
cd_ratios.append(chunk_languages)
cd_ratios_merged = merge_coherence_ratios(cd_ratios)
if cd_ratios_merged:
logger.log(
TRACE,
"We detected language {} using {}".format(
cd_ratios_merged, encoding_iana
),
)
results.append(
CharsetMatch(
sequences,
encoding_iana,
mean_mess_ratio,
bom_or_sig_available,
cd_ratios_merged,
decoded_payload,
)
)
if (
encoding_iana in [specified_encoding, "ascii", "utf_8"]
and mean_mess_ratio < 0.1
):
logger.debug(
"Encoding detection: %s is most likely the one.", encoding_iana
)
if explain:
logger.removeHandler(explain_handler)
logger.setLevel(previous_logger_level)
return CharsetMatches([results[encoding_iana]])
if encoding_iana == sig_encoding:
logger.debug(
"Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
"the beginning of the sequence.",
encoding_iana,
)
if explain:
logger.removeHandler(explain_handler)
logger.setLevel(previous_logger_level)
return CharsetMatches([results[encoding_iana]])
if len(results) == 0:
if fallback_u8 or fallback_ascii or fallback_specified:
logger.log(
TRACE,
"Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
)
if fallback_specified:
logger.debug(
"Encoding detection: %s will be used as a fallback match",
fallback_specified.encoding,
)
results.append(fallback_specified)
elif (
(fallback_u8 and fallback_ascii is None)
or (
fallback_u8
and fallback_ascii
and fallback_u8.fingerprint != fallback_ascii.fingerprint
)
or (fallback_u8 is not None)
):
logger.debug("Encoding detection: utf_8 will be used as a fallback match")
results.append(fallback_u8)
elif fallback_ascii:
logger.debug("Encoding detection: ascii will be used as a fallback match")
results.append(fallback_ascii)
if results:
logger.debug(
"Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
results.best().encoding, # type: ignore
len(results) - 1,
)
else:
logger.debug("Encoding detection: Unable to determine any suitable charset.")
if explain:
logger.removeHandler(explain_handler)
logger.setLevel(previous_logger_level)
return results
def from_fp(
fp: BinaryIO,
steps: int = 5,
chunk_size: int = 512,
threshold: float = 0.20,
cp_isolation: Optional[List[str]] = None,
cp_exclusion: Optional[List[str]] = None,
preemptive_behaviour: bool = True,
explain: bool = False,
language_threshold: float = 0.1,
enable_fallback: bool = True,
) -> CharsetMatches:
"""
Same thing than the function from_bytes but using a file pointer that is already ready.
Will not close the file pointer.
"""
return from_bytes(
fp.read(),
steps,
chunk_size,
threshold,
cp_isolation,
cp_exclusion,
preemptive_behaviour,
explain,
language_threshold,
enable_fallback,
)
def from_path(
path: Union[str, bytes, PathLike], # type: ignore[type-arg]
steps: int = 5,
chunk_size: int = 512,
threshold: float = 0.20,
cp_isolation: Optional[List[str]] = None,
cp_exclusion: Optional[List[str]] = None,
preemptive_behaviour: bool = True,
explain: bool = False,
language_threshold: float = 0.1,
enable_fallback: bool = True,
) -> CharsetMatches:
"""
Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
Can raise IOError.
"""
with open(path, "rb") as fp:
return from_fp(
fp,
steps,
chunk_size,
threshold,
cp_isolation,
cp_exclusion,
preemptive_behaviour,
explain,
language_threshold,
enable_fallback,
)
def is_binary(
fp_or_path_or_payload: Union[PathLike, str, BinaryIO, bytes], # type: ignore[type-arg]
steps: int = 5,
chunk_size: int = 512,
threshold: float = 0.20,
cp_isolation: Optional[List[str]] = None,
cp_exclusion: Optional[List[str]] = None,
preemptive_behaviour: bool = True,
explain: bool = False,
language_threshold: float = 0.1,
enable_fallback: bool = False,
) -> bool:
"""
Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
are disabled to be stricter around ASCII-compatible but unlikely to be a string.
"""
if isinstance(fp_or_path_or_payload, (str, PathLike)):
guesses = from_path(
fp_or_path_or_payload,
steps=steps,
chunk_size=chunk_size,
threshold=threshold,
cp_isolation=cp_isolation,
cp_exclusion=cp_exclusion,
preemptive_behaviour=preemptive_behaviour,
explain=explain,
language_threshold=language_threshold,
enable_fallback=enable_fallback,
)
elif isinstance(
fp_or_path_or_payload,
(
bytes,
bytearray,
),
):
guesses = from_bytes(
fp_or_path_or_payload,
steps=steps,
chunk_size=chunk_size,
threshold=threshold,
cp_isolation=cp_isolation,
cp_exclusion=cp_exclusion,
preemptive_behaviour=preemptive_behaviour,
explain=explain,
language_threshold=language_threshold,
enable_fallback=enable_fallback,
)
else:
guesses = from_fp(
fp_or_path_or_payload,
steps=steps,
chunk_size=chunk_size,
threshold=threshold,
cp_isolation=cp_isolation,
cp_exclusion=cp_exclusion,
preemptive_behaviour=preemptive_behaviour,
explain=explain,
language_threshold=language_threshold,
enable_fallback=enable_fallback,
)
return not guesses
@@ -0,0 +1,395 @@
import importlib
from codecs import IncrementalDecoder
from collections import Counter
from functools import lru_cache
from typing import Counter as TypeCounter, Dict, List, Optional, Tuple
from .constant import (
FREQUENCIES,
KO_NAMES,
LANGUAGE_SUPPORTED_COUNT,
TOO_SMALL_SEQUENCE,
ZH_NAMES,
)
from .md import is_suspiciously_successive_range
from .models import CoherenceMatches
from .utils import (
is_accentuated,
is_latin,
is_multi_byte_encoding,
is_unicode_range_secondary,
unicode_range,
)
def encoding_unicode_range(iana_name: str) -> List[str]:
"""
Return associated unicode ranges in a single byte code page.
"""
if is_multi_byte_encoding(iana_name):
raise IOError("Function not supported on multi-byte code page")
decoder = importlib.import_module(
"encodings.{}".format(iana_name)
).IncrementalDecoder
p: IncrementalDecoder = decoder(errors="ignore")
seen_ranges: Dict[str, int] = {}
character_count: int = 0
for i in range(0x40, 0xFF):
chunk: str = p.decode(bytes([i]))
if chunk:
character_range: Optional[str] = unicode_range(chunk)
if character_range is None:
continue
if is_unicode_range_secondary(character_range) is False:
if character_range not in seen_ranges:
seen_ranges[character_range] = 0
seen_ranges[character_range] += 1
character_count += 1
return sorted(
[
character_range
for character_range in seen_ranges
if seen_ranges[character_range] / character_count >= 0.15
]
)
def unicode_range_languages(primary_range: str) -> List[str]:
"""
Return inferred languages used with a unicode range.
"""
languages: List[str] = []
for language, characters in FREQUENCIES.items():
for character in characters:
if unicode_range(character) == primary_range:
languages.append(language)
break
return languages
@lru_cache()
def encoding_languages(iana_name: str) -> List[str]:
"""
Single-byte encoding language association. Some code page are heavily linked to particular language(s).
This function does the correspondence.
"""
unicode_ranges: List[str] = encoding_unicode_range(iana_name)
primary_range: Optional[str] = None
for specified_range in unicode_ranges:
if "Latin" not in specified_range:
primary_range = specified_range
break
if primary_range is None:
return ["Latin Based"]
return unicode_range_languages(primary_range)
@lru_cache()
def mb_encoding_languages(iana_name: str) -> List[str]:
"""
Multi-byte encoding language association. Some code page are heavily linked to particular language(s).
This function does the correspondence.
"""
if (
iana_name.startswith("shift_")
or iana_name.startswith("iso2022_jp")
or iana_name.startswith("euc_j")
or iana_name == "cp932"
):
return ["Japanese"]
if iana_name.startswith("gb") or iana_name in ZH_NAMES:
return ["Chinese"]
if iana_name.startswith("iso2022_kr") or iana_name in KO_NAMES:
return ["Korean"]
return []
@lru_cache(maxsize=LANGUAGE_SUPPORTED_COUNT)
def get_target_features(language: str) -> Tuple[bool, bool]:
"""
Determine main aspects from a supported language if it contains accents and if is pure Latin.
"""
target_have_accents: bool = False
target_pure_latin: bool = True
for character in FREQUENCIES[language]:
if not target_have_accents and is_accentuated(character):
target_have_accents = True
if target_pure_latin and is_latin(character) is False:
target_pure_latin = False
return target_have_accents, target_pure_latin
def alphabet_languages(
characters: List[str], ignore_non_latin: bool = False
) -> List[str]:
"""
Return associated languages associated to given characters.
"""
languages: List[Tuple[str, float]] = []
source_have_accents = any(is_accentuated(character) for character in characters)
for language, language_characters in FREQUENCIES.items():
target_have_accents, target_pure_latin = get_target_features(language)
if ignore_non_latin and target_pure_latin is False:
continue
if target_have_accents is False and source_have_accents:
continue
character_count: int = len(language_characters)
character_match_count: int = len(
[c for c in language_characters if c in characters]
)
ratio: float = character_match_count / character_count
if ratio >= 0.2:
languages.append((language, ratio))
languages = sorted(languages, key=lambda x: x[1], reverse=True)
return [compatible_language[0] for compatible_language in languages]
def characters_popularity_compare(
language: str, ordered_characters: List[str]
) -> float:
"""
Determine if a ordered characters list (by occurrence from most appearance to rarest) match a particular language.
The result is a ratio between 0. (absolutely no correspondence) and 1. (near perfect fit).
Beware that is function is not strict on the match in order to ease the detection. (Meaning close match is 1.)
"""
if language not in FREQUENCIES:
raise ValueError("{} not available".format(language))
character_approved_count: int = 0
FREQUENCIES_language_set = set(FREQUENCIES[language])
ordered_characters_count: int = len(ordered_characters)
target_language_characters_count: int = len(FREQUENCIES[language])
large_alphabet: bool = target_language_characters_count > 26
for character, character_rank in zip(
ordered_characters, range(0, ordered_characters_count)
):
if character not in FREQUENCIES_language_set:
continue
character_rank_in_language: int = FREQUENCIES[language].index(character)
expected_projection_ratio: float = (
target_language_characters_count / ordered_characters_count
)
character_rank_projection: int = int(character_rank * expected_projection_ratio)
if (
large_alphabet is False
and abs(character_rank_projection - character_rank_in_language) > 4
):
continue
if (
large_alphabet is True
and abs(character_rank_projection - character_rank_in_language)
< target_language_characters_count / 3
):
character_approved_count += 1
continue
characters_before_source: List[str] = FREQUENCIES[language][
0:character_rank_in_language
]
characters_after_source: List[str] = FREQUENCIES[language][
character_rank_in_language:
]
characters_before: List[str] = ordered_characters[0:character_rank]
characters_after: List[str] = ordered_characters[character_rank:]
before_match_count: int = len(
set(characters_before) & set(characters_before_source)
)
after_match_count: int = len(
set(characters_after) & set(characters_after_source)
)
if len(characters_before_source) == 0 and before_match_count <= 4:
character_approved_count += 1
continue
if len(characters_after_source) == 0 and after_match_count <= 4:
character_approved_count += 1
continue
if (
before_match_count / len(characters_before_source) >= 0.4
or after_match_count / len(characters_after_source) >= 0.4
):
character_approved_count += 1
continue
return character_approved_count / len(ordered_characters)
def alpha_unicode_split(decoded_sequence: str) -> List[str]:
"""
Given a decoded text sequence, return a list of str. Unicode range / alphabet separation.
Ex. a text containing English/Latin with a bit a Hebrew will return two items in the resulting list;
One containing the latin letters and the other hebrew.
"""
layers: Dict[str, str] = {}
for character in decoded_sequence:
if character.isalpha() is False:
continue
character_range: Optional[str] = unicode_range(character)
if character_range is None:
continue
layer_target_range: Optional[str] = None
for discovered_range in layers:
if (
is_suspiciously_successive_range(discovered_range, character_range)
is False
):
layer_target_range = discovered_range
break
if layer_target_range is None:
layer_target_range = character_range
if layer_target_range not in layers:
layers[layer_target_range] = character.lower()
continue
layers[layer_target_range] += character.lower()
return list(layers.values())
def merge_coherence_ratios(results: List[CoherenceMatches]) -> CoherenceMatches:
"""
This function merge results previously given by the function coherence_ratio.
The return type is the same as coherence_ratio.
"""
per_language_ratios: Dict[str, List[float]] = {}
for result in results:
for sub_result in result:
language, ratio = sub_result
if language not in per_language_ratios:
per_language_ratios[language] = [ratio]
continue
per_language_ratios[language].append(ratio)
merge = [
(
language,
round(
sum(per_language_ratios[language]) / len(per_language_ratios[language]),
4,
),
)
for language in per_language_ratios
]
return sorted(merge, key=lambda x: x[1], reverse=True)
def filter_alt_coherence_matches(results: CoherenceMatches) -> CoherenceMatches:
"""
We shall NOT return "English—" in CoherenceMatches because it is an alternative
of "English". This function only keeps the best match and remove the em-dash in it.
"""
index_results: Dict[str, List[float]] = dict()
for result in results:
language, ratio = result
no_em_name: str = language.replace("", "")
if no_em_name not in index_results:
index_results[no_em_name] = []
index_results[no_em_name].append(ratio)
if any(len(index_results[e]) > 1 for e in index_results):
filtered_results: CoherenceMatches = []
for language in index_results:
filtered_results.append((language, max(index_results[language])))
return filtered_results
return results
@lru_cache(maxsize=2048)
def coherence_ratio(
decoded_sequence: str, threshold: float = 0.1, lg_inclusion: Optional[str] = None
) -> CoherenceMatches:
"""
Detect ANY language that can be identified in given sequence. The sequence will be analysed by layers.
A layer = Character extraction by alphabets/ranges.
"""
results: List[Tuple[str, float]] = []
ignore_non_latin: bool = False
sufficient_match_count: int = 0
lg_inclusion_list = lg_inclusion.split(",") if lg_inclusion is not None else []
if "Latin Based" in lg_inclusion_list:
ignore_non_latin = True
lg_inclusion_list.remove("Latin Based")
for layer in alpha_unicode_split(decoded_sequence):
sequence_frequencies: TypeCounter[str] = Counter(layer)
most_common = sequence_frequencies.most_common()
character_count: int = sum(o for c, o in most_common)
if character_count <= TOO_SMALL_SEQUENCE:
continue
popular_character_ordered: List[str] = [c for c, o in most_common]
for language in lg_inclusion_list or alphabet_languages(
popular_character_ordered, ignore_non_latin
):
ratio: float = characters_popularity_compare(
language, popular_character_ordered
)
if ratio < threshold:
continue
elif ratio >= 0.8:
sufficient_match_count += 1
results.append((language, round(ratio, 4)))
if sufficient_match_count >= 3:
break
return sorted(
filter_alt_coherence_matches(results), key=lambda x: x[1], reverse=True
)
@@ -0,0 +1,6 @@
from .__main__ import cli_detect, query_yes_no
__all__ = (
"cli_detect",
"query_yes_no",
)
@@ -0,0 +1,296 @@
import argparse
import sys
from json import dumps
from os.path import abspath, basename, dirname, join, realpath
from platform import python_version
from typing import List, Optional
from unicodedata import unidata_version
import charset_normalizer.md as md_module
from charset_normalizer import from_fp
from charset_normalizer.models import CliDetectionResult
from charset_normalizer.version import __version__
def query_yes_no(question: str, default: str = "yes") -> bool:
"""Ask a yes/no question via input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
Credit goes to (c) https://stackoverflow.com/questions/3041986/apt-command-line-interface-like-yes-no-input
"""
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
def cli_detect(argv: Optional[List[str]] = None) -> int:
"""
CLI assistant using ARGV and ArgumentParser
:param argv:
:return: 0 if everything is fine, anything else equal trouble
"""
parser = argparse.ArgumentParser(
description="The Real First Universal Charset Detector. "
"Discover originating encoding used on text file. "
"Normalize text to unicode."
)
parser.add_argument(
"files", type=argparse.FileType("rb"), nargs="+", help="File(s) to be analysed"
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
default=False,
dest="verbose",
help="Display complementary information about file if any. "
"Stdout will contain logs about the detection process.",
)
parser.add_argument(
"-a",
"--with-alternative",
action="store_true",
default=False,
dest="alternatives",
help="Output complementary possibilities if any. Top-level JSON WILL be a list.",
)
parser.add_argument(
"-n",
"--normalize",
action="store_true",
default=False,
dest="normalize",
help="Permit to normalize input file. If not set, program does not write anything.",
)
parser.add_argument(
"-m",
"--minimal",
action="store_true",
default=False,
dest="minimal",
help="Only output the charset detected to STDOUT. Disabling JSON output.",
)
parser.add_argument(
"-r",
"--replace",
action="store_true",
default=False,
dest="replace",
help="Replace file when trying to normalize it instead of creating a new one.",
)
parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
dest="force",
help="Replace file without asking if you are sure, use this flag with caution.",
)
parser.add_argument(
"-t",
"--threshold",
action="store",
default=0.2,
type=float,
dest="threshold",
help="Define a custom maximum amount of chaos allowed in decoded content. 0. <= chaos <= 1.",
)
parser.add_argument(
"--version",
action="version",
version="Charset-Normalizer {} - Python {} - Unicode {} - SpeedUp {}".format(
__version__,
python_version(),
unidata_version,
"OFF" if md_module.__file__.lower().endswith(".py") else "ON",
),
help="Show version information and exit.",
)
args = parser.parse_args(argv)
if args.replace is True and args.normalize is False:
print("Use --replace in addition of --normalize only.", file=sys.stderr)
return 1
if args.force is True and args.replace is False:
print("Use --force in addition of --replace only.", file=sys.stderr)
return 1
if args.threshold < 0.0 or args.threshold > 1.0:
print("--threshold VALUE should be between 0. AND 1.", file=sys.stderr)
return 1
x_ = []
for my_file in args.files:
matches = from_fp(my_file, threshold=args.threshold, explain=args.verbose)
best_guess = matches.best()
if best_guess is None:
print(
'Unable to identify originating encoding for "{}". {}'.format(
my_file.name,
"Maybe try increasing maximum amount of chaos."
if args.threshold < 1.0
else "",
),
file=sys.stderr,
)
x_.append(
CliDetectionResult(
abspath(my_file.name),
None,
[],
[],
"Unknown",
[],
False,
1.0,
0.0,
None,
True,
)
)
else:
x_.append(
CliDetectionResult(
abspath(my_file.name),
best_guess.encoding,
best_guess.encoding_aliases,
[
cp
for cp in best_guess.could_be_from_charset
if cp != best_guess.encoding
],
best_guess.language,
best_guess.alphabets,
best_guess.bom,
best_guess.percent_chaos,
best_guess.percent_coherence,
None,
True,
)
)
if len(matches) > 1 and args.alternatives:
for el in matches:
if el != best_guess:
x_.append(
CliDetectionResult(
abspath(my_file.name),
el.encoding,
el.encoding_aliases,
[
cp
for cp in el.could_be_from_charset
if cp != el.encoding
],
el.language,
el.alphabets,
el.bom,
el.percent_chaos,
el.percent_coherence,
None,
False,
)
)
if args.normalize is True:
if best_guess.encoding.startswith("utf") is True:
print(
'"{}" file does not need to be normalized, as it already came from unicode.'.format(
my_file.name
),
file=sys.stderr,
)
if my_file.closed is False:
my_file.close()
continue
dir_path = dirname(realpath(my_file.name))
file_name = basename(realpath(my_file.name))
o_: List[str] = file_name.split(".")
if args.replace is False:
o_.insert(-1, best_guess.encoding)
if my_file.closed is False:
my_file.close()
elif (
args.force is False
and query_yes_no(
'Are you sure to normalize "{}" by replacing it ?'.format(
my_file.name
),
"no",
)
is False
):
if my_file.closed is False:
my_file.close()
continue
try:
x_[0].unicode_path = join(dir_path, ".".join(o_))
with open(x_[0].unicode_path, "w", encoding="utf-8") as fp:
fp.write(str(best_guess))
except IOError as e:
print(str(e), file=sys.stderr)
if my_file.closed is False:
my_file.close()
return 2
if my_file.closed is False:
my_file.close()
if args.minimal is False:
print(
dumps(
[el.__dict__ for el in x_] if len(x_) > 1 else x_[0].__dict__,
ensure_ascii=True,
indent=4,
)
)
else:
for my_file in args.files:
print(
", ".join(
[
el.encoding or "undefined"
for el in x_
if el.path == abspath(my_file.name)
]
)
)
return 0
if __name__ == "__main__":
cli_detect()
@@ -0,0 +1,54 @@
from typing import Any, Dict, Optional, Union
from warnings import warn
from .api import from_bytes
from .constant import CHARDET_CORRESPONDENCE
def detect(
byte_str: bytes, should_rename_legacy: bool = False, **kwargs: Any
) -> Dict[str, Optional[Union[str, float]]]:
"""
chardet legacy method
Detect the encoding of the given byte string. It should be mostly backward-compatible.
Encoding name will match Chardet own writing whenever possible. (Not on encoding name unsupported by it)
This function is deprecated and should be used to migrate your project easily, consult the documentation for
further information. Not planned for removal.
:param byte_str: The byte sequence to examine.
:param should_rename_legacy: Should we rename legacy encodings
to their more modern equivalents?
"""
if len(kwargs):
warn(
f"charset-normalizer disregard arguments '{','.join(list(kwargs.keys()))}' in legacy function detect()"
)
if not isinstance(byte_str, (bytearray, bytes)):
raise TypeError( # pragma: nocover
"Expected object of type bytes or bytearray, got: "
"{0}".format(type(byte_str))
)
if isinstance(byte_str, bytearray):
byte_str = bytes(byte_str)
r = from_bytes(byte_str).best()
encoding = r.encoding if r is not None else None
language = r.language if r is not None and r.language != "Unknown" else ""
confidence = 1.0 - r.chaos if r is not None else None
# Note: CharsetNormalizer does not return 'UTF-8-SIG' as the sig get stripped in the detection/normalization process
# but chardet does return 'utf-8-sig' and it is a valid codec name.
if r is not None and encoding == "utf_8" and r.bom:
encoding += "_sig"
if should_rename_legacy is False and encoding in CHARDET_CORRESPONDENCE:
encoding = CHARDET_CORRESPONDENCE[encoding]
return {
"encoding": encoding,
"language": language,
"confidence": confidence,
}

Some files were not shown because too many files have changed in this diff Show More