NEW: Elasticache and IAM auth

This commit is contained in:
xpk
2025-08-11 21:21:05 +08:00
parent 03797d6e8c
commit d5e24c4825
+84
View File
@@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""
When elasticache valkey is deployed without encryption in transit, IAM auth cannot be used.
To enable TLS after deployment, it takes 20min to set it to prefer TLS, and another 5 min to set it to require
When TLS is enabled, configuration endpoint address is changed
IAM auth is available only after TLS is required. One needs to manually associate the cluster with the elasticache usergroup
Other considerations
* Elasticache userid must be the same as username
* The signed request must have validity of 15min
References
* https://github.com/valkey-io/valkey-py/blob/main/docs/examples/connection_examples.ipynb
"""
from typing import Tuple, Union
from urllib.parse import ParseResult, urlencode, urlunparse
import botocore.session
import valkey
from botocore.model import ServiceId
from botocore.signers import RequestSigner
from cachetools import TTLCache, cached
import ssl
class ElastiCacheIAMProvider(valkey.CredentialProvider):
def __init__(self, user, cluster_name, region="ap-east-1"):
self.user = user
self.cluster_name = cluster_name
self.region = region
session = botocore.session.get_session()
self.request_signer = RequestSigner(
ServiceId("elasticache"),
self.region,
"elasticache",
"v4",
session.get_credentials(),
session.get_component("event_emitter"),
)
# Generated IAM tokens are valid for 15 minutes
@cached(cache=TTLCache(maxsize=128, ttl=900))
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
query_params = {"Action": "connect", "User": self.user}
url = urlunparse(
ParseResult(
scheme="https",
netloc=self.cluster_name,
path="/",
query=urlencode(query_params),
params="",
fragment="",
)
)
signed_url = self.request_signer.generate_presigned_url(
{"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}},
operation_name="connect",
expires_in=900,
region_name=self.region,
)
# RequestSigner only seems to work if the URL has a protocol, but
# Elasticache only accepts the URL without a protocol
# So strip it off the signed URL before returning
return (self.user, signed_url.removeprefix("https://"))
username = "cacheuser2"
cluster_name = "cache002"
endpoint = "clustercfg.cache002.rw4ynm.ape1.cache.amazonaws.com"
creds_provider = ElastiCacheIAMProvider(user=username, cluster_name=cluster_name)
user_connection = valkey.Valkey(
host=endpoint,
port=6379,
credential_provider=creds_provider,
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE)
pong = user_connection.ping()
print(f"Redis ping response: {pong}")
user_connection.set('foo', 'helloworld')
value = user_connection.get('foo')
print(f"Value for 'foo': {value.decode()}")