Hello,
I have written a custom secrets backend (cached_secrets_manager.py) and added it to my plugins.zip
folder.
"""
Secrets backend which leverages the AWS provided SecretsManagerBackend but uses the SecretCache.
Helpful Links:
https://github.com/apache/airflow/blob/providers-amazon/2.4.0/airflow/providers/amazon/aws/secrets/secrets_manager.py
https://docs.aws.amazon.com/secretsmanager/latest/userguide/retrieving-secrets_cache-python.html
"""
import boto3
from aws_secretsmanager_caching import SecretCache, SecretCacheConfig
from airflow.providers.amazon.aws.secrets.secrets_manager import (
SecretsManagerBackend,
cached_property,
)
SECRET_REFRESH_INTERVAL_SECONDS = 300
MAX_CACHE_SIZE = 2048
class CachedSecretsManagerBackend(SecretsManagerBackend):
@cached_property
def client(self):
"""Override the client used in the SecretsManagerBacked with a SecretCache."""
session = boto3.session.Session(profile_name=self.profile_name)
secret_cache_config = SecretCacheConfig(
max_cache_size=self.kwargs.pop("max_cache_size", MAX_CACHE_SIZE),
secret_refresh_interval=self.kwargs.pop("secret_refresh_interval_seconds", SECRET_REFRESH_INTERVAL_SECONDS)
)
client = session.client(service_name="secretsmanager", **self.kwargs)
cache = SecretCache(config=secret_cache_config, client=client)
def get_secret_string_patched(SecretId):
secret_string = cache.get_secret_string(SecretId)
self.log.info("Retrieved secret %s from cache.", SecretId)
return {"SecretString": secret_string}
# This function expects the `SecretId` argument and returns a dictionary with the "SecretString" Key
# https://github.com/apache/airflow/blob/providers-amazon/2.4.0/airflow/providers/amazon/aws/secrets/secrets_manager.py#L244
cache.get_secret_value = get_secret_string_patched
# To handle the exceptions when a secret is not found
# https://github.com/apache/airflow/blob/providers-amazon/2.4.0/airflow/providers/amazon/aws/secrets/secrets_manager.py#L248
cache.exceptions = cache._client.exceptions
return cache
To use the above script I need the below package and I've also added it to the requirements.txt
file.
aws-secretsmanager-caching==1.1.1.5
In the start-up script for MWAA I have also added the PYTHONPATH variable as below.
export PYTHONPATH=$AIRFLOW_HOME/plugins:$PYTHONPATH
I have also updated the airflow.cfg
to use the above backend, and the same on the MWAA console.
[secrets]
backend = cached_secrets_manager.CachedSecretsManagerBackend
With the above changes, I am able to get my secrets backend running on my local airflow, MWAA local runner and also an airflow process that I can launch on MWAA using the bash operator.
cli_command_4 = BashOperator(
task_id="bash_command_4",
bash_command='export AIRFLOW__SECRETS__BACKEND="cached_secrets_manager.CachedSecretsManagerBackend"; export PYTHONPATH="$AIRFLOW_HOME/plugins:$PYTHONPATH"; env; airflow scheduler'
)
However, when I try to use it on MWAA it does not work.
Any help would be appreciated.
Thank you.