#!/usr/bin/env python3 """ When elasticache valkey is deployed without encryption in transit, IAM auth cannot be used. To enable TLS after deployment, it takes 20min to set it to prefer TLS, and another 5 min to set it to require TLS. When TLS is enabled, configuration endpoint address is changed IAM auth is available only after TLS is required. One needs to manually associate the cluster with the elasticache usergroup It took me a day to figure out how to connect to elasticache/valkey using iam auth, even with help from perplexity. It should not be this difficult. The following code is based on valkey's example, which for some reason did not enable TLS. Other considerations * Elasticache userid must be the same as username * The signed request must have validity of 15min References * https://github.com/valkey-io/valkey-py/blob/main/docs/examples/connection_examples.ipynb """ from typing import Tuple, Union from urllib.parse import ParseResult, urlencode, urlunparse import botocore.session import valkey from botocore.model import ServiceId from botocore.signers import RequestSigner from cachetools import TTLCache, cached import ssl class ElastiCacheIAMProvider(valkey.CredentialProvider): def __init__(self, user, cluster_name, region="ap-east-1"): self.user = user self.cluster_name = cluster_name self.region = region session = botocore.session.get_session() self.request_signer = RequestSigner( ServiceId("elasticache"), self.region, "elasticache", "v4", session.get_credentials(), session.get_component("event_emitter"), ) # Generated IAM tokens are valid for 15 minutes @cached(cache=TTLCache(maxsize=128, ttl=900)) def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]: query_params = {"Action": "connect", "User": self.user} url = urlunparse( ParseResult( scheme="https", netloc=self.cluster_name, path="/", query=urlencode(query_params), params="", fragment="", ) ) signed_url = self.request_signer.generate_presigned_url( {"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}}, operation_name="connect", expires_in=900, region_name=self.region, ) # RequestSigner only seems to work if the URL has a protocol, but # Elasticache only accepts the URL without a protocol # So strip it off the signed URL before returning return (self.user, signed_url.removeprefix("https://")) username = "cacheuser2" cluster_name = "cache002" endpoint = "clustercfg.cache002.rw4ynm.ape1.cache.amazonaws.com" creds_provider = ElastiCacheIAMProvider(user=username, cluster_name=cluster_name) user_connection = valkey.Valkey( host=endpoint, port=6379, credential_provider=creds_provider, ssl=True, ssl_cert_reqs=ssl.CERT_NONE) pong = user_connection.ping() print(f"Redis ping response: {pong}") user_connection.set('foo', 'helloworld') value = user_connection.get('foo') print(f"Value for 'foo': {value.decode()}")