90 lines
3.2 KiB
Python
90 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""
|
|
When elasticache valkey is deployed without encryption in transit, IAM auth cannot be used.
|
|
To enable TLS after deployment, it takes 20min to set it to prefer TLS, and another 5 min to set it to require TLS.
|
|
When TLS is enabled, configuration endpoint address is changed
|
|
IAM auth is available only after TLS is required. One needs to manually associate the cluster with the elasticache usergroup
|
|
|
|
It took me a day to figure out how to connect to elasticache/valkey using iam auth, even with help from perplexity. It should
|
|
not be this difficult.
|
|
|
|
The following code is based on valkey's example, which for some reason did not enable TLS.
|
|
|
|
Other considerations
|
|
* Elasticache userid must be the same as username
|
|
* The signed request must have validity of 15min
|
|
|
|
References
|
|
* https://github.com/valkey-io/valkey-py/blob/main/docs/examples/connection_examples.ipynb
|
|
"""
|
|
|
|
from typing import Tuple, Union
|
|
from urllib.parse import ParseResult, urlencode, urlunparse
|
|
|
|
import botocore.session
|
|
import valkey
|
|
from botocore.model import ServiceId
|
|
from botocore.signers import RequestSigner
|
|
from cachetools import TTLCache, cached
|
|
import ssl
|
|
|
|
class ElastiCacheIAMProvider(valkey.CredentialProvider):
|
|
def __init__(self, user, cluster_name, region="ap-east-1"):
|
|
self.user = user
|
|
self.cluster_name = cluster_name
|
|
self.region = region
|
|
|
|
session = botocore.session.get_session()
|
|
self.request_signer = RequestSigner(
|
|
ServiceId("elasticache"),
|
|
self.region,
|
|
"elasticache",
|
|
"v4",
|
|
session.get_credentials(),
|
|
session.get_component("event_emitter"),
|
|
)
|
|
|
|
# Generated IAM tokens are valid for 15 minutes
|
|
@cached(cache=TTLCache(maxsize=128, ttl=900))
|
|
def get_credentials(self) -> Union[Tuple[str], Tuple[str, str]]:
|
|
query_params = {"Action": "connect", "User": self.user}
|
|
url = urlunparse(
|
|
ParseResult(
|
|
scheme="https",
|
|
netloc=self.cluster_name,
|
|
path="/",
|
|
query=urlencode(query_params),
|
|
params="",
|
|
fragment="",
|
|
)
|
|
)
|
|
signed_url = self.request_signer.generate_presigned_url(
|
|
{"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}},
|
|
operation_name="connect",
|
|
expires_in=900,
|
|
region_name=self.region,
|
|
)
|
|
# RequestSigner only seems to work if the URL has a protocol, but
|
|
# Elasticache only accepts the URL without a protocol
|
|
# So strip it off the signed URL before returning
|
|
return (self.user, signed_url.removeprefix("https://"))
|
|
|
|
username = "cacheuser2"
|
|
cluster_name = "cache002"
|
|
endpoint = "clustercfg.cache002.rw4ynm.ape1.cache.amazonaws.com"
|
|
creds_provider = ElastiCacheIAMProvider(user=username, cluster_name=cluster_name)
|
|
user_connection = valkey.Valkey(
|
|
host=endpoint,
|
|
port=6379,
|
|
credential_provider=creds_provider,
|
|
ssl=True,
|
|
ssl_cert_reqs=ssl.CERT_NONE)
|
|
pong = user_connection.ping()
|
|
print(f"Redis ping response: {pong}")
|
|
|
|
user_connection.set('foo', 'helloworld')
|
|
value = user_connection.get('foo')
|
|
print(f"Value for 'foo': {value.decode()}")
|
|
|