I'm in the process of creating a config rule for Noncompliant resources that don't adhere to a custom tagging standard. My current challenge is in automating the remediation process using either Lambda or SSM after the config rule has identified Noncompliant resources.
The issue I'm encountering is that my Lambda function successfully finds and auto tags DynamoDB resources, but I'm encountering errors when attempting to fetch the resources' ARN for other resource types. Additionally, despite trying various triggers such as SNS topics, the Lambda function fails to invoke when new resources are created or for other existing resources to be auto-tagged.
It's worth noting that the Lambda function is written in Python, the SSM document is in YAML format, and the entire infrastructure is provisioned using Terraform. Any insights or assistance in resolving these challenges would be greatly appreciated.
resource "aws_config_organization_managed_rule" "required_tags" {
name = local.required_tags_organization_name
rule_identifier = regex("^[a-zA-Z0-9-_]+$", "REQUIRED_TAGS")
input_parameters = jsonencode({
tag1Key = "enabled",
tag1Value = "true",
tag2Key = "organization",
tag2Value = "",
tag3Key = "cloud_provider",
tag3Value = "aws",
tag4Key = "namespace",
tag4Value = "mgmt",
tag5Key = "parent_name",
tag5Value = "N/A",
tag6Key = "name",
tag6Value = "N/A"
})
lambda_function:
import boto3
import logging
ssm_client = boto3.client('ssm')
config_client = boto3.client('config')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_parameters_from_ssm(account_id, parameter_path):
try:
response = ssm_client.get_parameters_by_path(Path=parameter_path)
return {param['Name'].split('/')[-1]: param['Value'] for param in response['Parameters']}
except Exception as e:
raise ValueError(f"Error fetching SSM parameters: {e}")
def lambda_handler(event, context):
try:
# Extract execution context information if available
current_account_id = None
if 'invoked_function_arn' in context:
current_account_id = context.invoked_function_arn.split(":")[4]
execution_rate = event.get("ExecutionRate", "rate(1 day)")
tagging_parameters_path = event.get("TaggingParametersPath", "/org/tagging/")
parameters = get_parameters_from_ssm(current_account_id, tagging_parameters_path)
org_client = boto3.client('organizations')
accounts = []
paginator = org_client.get_paginator('list_accounts')
for page in paginator.paginate():
for account in page['Accounts']:
accounts.append(account['Id'])
tagged_resources = []
for account_id in accounts:
# Get all Config rules for the current account
response = config_client.describe_config_rules()
all_config_rules = response.get('ConfigRules', [])
for rule in all_config_rules:
if "prod-config-auto-tags" in rule.get('ConfigRuleName', '').lower():
rule_name = rule['ConfigRuleName']
break
else:
raise ValueError("No Config rule found for required tag compliance")
response = config_client.get_compliance_details_by_config_rule(
ConfigRuleName=rule_name,
ComplianceTypes=['NON_COMPLIANT']
)
noncompliant_resources = response.get('EvaluationResults', [])
for resource in noncompliant_resources:
resource_id = resource['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId']
resource_type = resource['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceType']
resource_arn = convert_to_arn(resource_id, resource_type)
if resource_arn:
tag_resource(resource_arn, parameters)
tagged_resources.append({"ResourceARN": resource_arn, "Tags": parameters})
else:
logger.error(f"Error fetching ARN for resource {resource_id}")
return {"message": "Auto-tagging completed.", "tagged_resources": tagged_resources}
except ValueError as ve:
return {"error": str(ve)}
except Exception as e:
return {"error": f"An unexpected error occurred: {e}"}
def convert_to_arn(resource_id, resource_type):
if resource_type == 'AWS::EC2::Instance':
return f'arn:aws:ec2:::{resource_id}'
elif resource_type == 'AWS::S3::Bucket':
return f'arn:aws:s3:::{resource_id}'
elif resource_type == 'AWS::RDS::DBCluster':
return f'arn:aws:rds:::{resource_id}'
elif resource_type == 'AWS::DynamoDB::Table':
return f'arn:aws:dynamodb:::{resource_id}'
elif resource_type.startswith('arn:aws:cloudformation'):
return resource_id
return None
def tag_resource(resource_arn, parameters):
try:
response = config_client.put_evaluations(
Evaluations=[
{
'ComplianceResourceType': 'AWS::Tag',
'ComplianceResourceId': resource_arn,
'ComplianceType': 'COMPLIANT',
'Annotation': 'Resource is compliant'
}
],
ResultToken='string'
)
logger.info(f"Resource {resource_arn} successfully tagged with required tags.")
except Exception as e:
logger.error(f"Error tagging resource {resource_arn}: {e}")
raise ValueError(f"Error tagging resource {resource_arn}: {e}")
.
######### tf
resource "aws_ssm_document" "required_tags" {
name = local.ssm_document_name
document_type = "Automation"
version_name = "default"
document_format = "YAML"
content = <<-CONTENT
schemaVersion: "0.3"
description: "Automated remediation for adding required tags to non-compliant resources"
assumeRole: "arn:aws:iam::XXXXXXXXXX:role/prod-config-auto-tags-ssm-execution-role"
mainSteps:
-
name: RunPythonScript
action: "aws:executeScript"
inputs:
Runtime: "python3.8"
Handler: "lambda_handler"
Script: |
import boto3
import logging
# Initialize AWS clients
ssm_client = boto3.client('ssm')
config_client = boto3.client('config')
# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Helper function to fetch parameters from SSM Parameter Store
def get_parameters_from_ssm(account_id, parameter_path):
try:
response = ssm_client.get_parameters_by_path(Path=parameter_path)
return {param['Name'].split('/')[-1]: param['Value'] for param in response['Parameters']}
except Exception as e:
raise ValueError(f"Error fetching SSM parameters: {e}")
# Main Lambda handler function
def lambda_handler(event, context):
try:
# Extract execution context information if available
current_account_id = None
if 'invoked_function_arn' in context:
current_account_id = context.invoked_function_arn.split(":")[4]
execution_rate = event.get("ExecutionRate", "rate(1 day)")
tagging_parameters_path = event.get("TaggingParametersPath", "/org/tagging/")
# Get parameters for tagging from SSM Parameter Store
parameters = get_parameters_from_ssm(current_account_id, tagging_parameters_path)
# Get list of all AWS accounts in the organization
org_client = boto3.client('organizations')
accounts = []
paginator = org_client.get_paginator('list_accounts')
for page in paginator.paginate():
for account in page['Accounts']:
accounts.append(account['Id'])
# Loop through each account and region
tagged_resources = []
for account_id in accounts:
# Get all Config rules for the current account
response = config_client.describe_config_rules()
all_config_rules = response.get('ConfigRules', [])
# Find the Config rule related to required tag compliance
for rule in all_config_rules:
if "prod-config-auto-tags" in rule.get('ConfigRuleName', '').lower():
rule_name = rule['ConfigRuleName']
break
else:
raise ValueError("No Config rule found for required tag compliance")
# Get noncompliant resources for the specific rule and account
response = config_client.get_compliance_details_by_config_rule(
ConfigRuleName=rule_name,
ComplianceTypes=['NON_COMPLIANT']
)
noncompliant_resources = response.get('EvaluationResults', [])
# Auto-tag noncompliant resources
for resource in noncompliant_resources:
resource_id = resource['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId']
resource_type = resource['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceType']
resource_arn = convert_to_arn(resource_id, resource_type)
if resource_arn:
tag_resource(resource_arn, parameters)
tagged_resources.append({"ResourceARN": resource_arn, "Tags": parameters})
else:
logger.error(f"Error fetching ARN for resource {resource_id}")
return {"message": "Auto-tagging completed.", "tagged_resources": tagged_resources}
except ValueError as ve:
return {"error": str(ve)}
except Exception as e:
return {"error": f"An unexpected error occurred: {e}"}
# Helper function to convert resource ID and type to ARN
def convert_to_arn(resource_id, resource_type):
if resource_type == 'AWS::EC2::Instance':
return f'arn:aws:ec2:::{resource_id}'
elif resource_type == 'AWS::S3::Bucket':
return f'arn:aws:s3:::{resource_id}'
elif resource_type == 'AWS::RDS::DBCluster':
return f'arn:aws:rds:::{resource_id}'
elif resource_type == 'AWS::DynamoDB::Table':
return f'arn:aws:dynamodb:::{resource_id}'
elif resource_type.startswith('arn:aws:cloudformation'):
return resource_id
return None
# Helper function to tag the resource with required tags
def tag_resource(resource_arn, parameters):
try:
response = config_client.put_evaluations(
Evaluations=[
{
'ComplianceResourceType': 'AWS::Tag',
'ComplianceResourceId': resource_arn,
'ComplianceType': 'COMPLIANT',
'Annotation': 'Resource is compliant'
}
],
ResultToken='string'
)
logger.info(f"Resource {resource_arn} successfully tagged with required tags.")
except Exception as e:
logger.error(f"Error tagging resource {resource_arn}: {e}")
raise ValueError(f"Error tagging resource {resource_arn}: {e}")
CONTENT
}
Thank you!