Saltar al contenido

¿Cómo elimino automáticamente los usuarios inactivos de IAM Identity Center?

8 minutos de lectura
0

Quiero eliminar automáticamente los usuarios de AWS IAM Identity Center que no hayan iniciado sesión durante 90 días.

Descripción corta

Para eliminar automáticamente los usuarios inactivos de IAM Identity Center, crea un rol de ejecución para que AWS Lambda realice acciones por ti. A continuación, crea una función de Lambda y una regla de Amazon EventBridge para que la función se ejecute según una programación específica.

Resolución

Creación de un rol de ejecución

Sigue estos pasos:

  1. Utiliza la consola de AWS Identity and Access Management (IAM) para crear el rol de ejecución.
  2. Adjunta los siguientes permisos a la política del rol de IAM:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "cloudtrail:LookupEvents",
                    "sso:ListAccountAssignments",
                    "sso:ListPermissionSets",
                    "organizations:ListAccounts",
                    "sso:ListInstances",
                    "sso:DeleteAccountAssignment",
                    "identitystore:DeleteUser",
                    "sso-directory:ListUsers",
                    "sso-directory:DeleteUser",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "identitystore:ListUsers"
                ],
                "Resource": "*"
            }
        ]
    }

Los permisos de la instrucción de política anterior permiten al rol de ejecución realizar las siguientes acciones:

  • Consultar CloudTrail para ver los eventos de UserAuthentication para identificar los usuarios que no han iniciado sesión durante 90 días.
  • Marcar los usuarios como inactivos si no han tenido ningún evento UserAuthentication en los últimos 90 días.
  • Agregar a una cola de eliminación los usuarios inactivos y los usuarios nuevos que no hayan iniciado sesión en el portal de acceso de AWS IAM Identity Center.
  • Comprobar las asignaciones de cuentas o aplicaciones de AWS para cada usuario inactivo.
  • Eliminar las asignaciones y, a continuación, eliminar el usuario.

Importante:

Cómo crear una función de Lambda y adjuntar el rol de ejecución

Utiliza la consola de Lambda para crear la función de Lambda. En Versión ejecutable, selecciona Python 3.13. En el editor de código integrado, introduce el siguiente código Python:

import json
from datetime import datetime, timedelta
import boto3
from botocore.exceptions import ClientError
# Create boto3 clients to call AWS services used by the script.
sso_admin = boto3.client("sso-admin")
identitystore = boto3.client("identitystore")
cloudtrail = boto3.client("cloudtrail")
org = boto3.client("organizations")
# Threshold to consider a user "active": any authentication after THRESHOLD_DATE counts.
THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90)
def get_identity_center_info():
    """
    Retrieve the first Identity Center (SSO) instance's ARN and IdentityStoreId.
    """
    resp = sso_admin.list_instances()                         
    inst = resp.get("Instances") or []                     
    if not inst:
        raise RuntimeError("No Identity Center instance found")  
    # Return the InstanceArn and IdentityStoreId of the first instance found
    return inst[0]["InstanceArn"], inst[0]["IdentityStoreId"]
def list_users(identity_store_id):
    """
    List all users from the given Identity Store (IdentityCenter).
    Returns a list of users as returned by identitystore.list_users.
    """
    users = []
    # Use a paginator to handle large user directories
    for page in identitystore.get_paginator("list_users").paginate(IdentityStoreId=identity_store_id):
        users.extend(page.get("Users", []))                  
    return users
def build_active_sets():
    """
    Scan CloudTrail LookupEvents for UserAuthentication events in the time window
    and build two sets:
      - active_ids: set of userId values found at userIdentity.onBehalfOf.userId
      - active_names: set of usernames found at additionalEventData.UserName
    Scanning CloudTrail once is more efficient than querying per user.
    """
    active_ids = set()    # seen onBehalfOf.userId values
    active_names = set()  # seen additionalEventData.UserName values
    paginator = cloudtrail.get_paginator("lookup_events")
    try:
        # Iterate through pages of UserAuthentication events within the timeframe
        for page in paginator.paginate(
            LookupAttributes=[{"AttributeKey": "EventName", "AttributeValue": "UserAuthentication"}],
            StartTime=THRESHOLD_DATE,
            EndTime=datetime.utcnow(),
        ):
            for ev in page.get("Events", []):               
                cte = ev.get("CloudTrailEvent")            
                try:
                    # If CloudTrailEvent is a string, parse to dict; otherwise use as-is or empty dict
                    detail = json.loads(cte) if isinstance(cte, str) else (cte or {})
                except (ValueError, TypeError):
                    # Skip malformed or unexpected event content
                    continue
                # Extract the userId from userIdentity.onBehalfOf.userId if present (preferred)
                uid = detail.get("userIdentity", {}).get("onBehalfOf", {}).get("userId")
                # Extract username from additionalEventData.UserName as a fallback
                uname = detail.get("additionalEventData", {}).get("UserName")
                if uid:
                    active_ids.add(uid)                    
                if uname:
                    active_names.add(uname)                 
    except ClientError as e:
        # Bubble up failures in CloudTrail lookup as RuntimeError so caller can handle/log
        raise RuntimeError(f"CloudTrail lookup failed: {e}")
    return active_ids, active_names
def list_accounts():
    """
    Return a list of AWS Account IDs in the Organization.
    Uses organizations.list_accounts paginator to handle many accounts.
    """
    accounts = []
    for page in org.get_paginator("list_accounts").paginate():
        accounts.extend([a["Id"] for a in page.get("Accounts", [])])  
    return accounts
def list_permission_sets(instance_arn):
    """
    Return a list of PermissionSet ARNs for the Identity Center instance.
    Uses sso-admin.list_permission_sets paginator to support many permission sets.
    """
    perms = []
    for page in sso_admin.get_paginator("list_permission_sets").paginate(InstanceArn=instance_arn):
        perms.extend(page.get("PermissionSets", []))
    return perms
def remove_user_assignments(instance_arn, principal_id):
    """
    Remove all SSO account assignments for the given principal_id (user).
    Iterates every account and permission set, lists assignments, and deletes any
    assignment that matches the user PrincipalId and is of type USER.
    Returns True if no deletion errors occurred, False otherwise.
    """
    accounts = list_accounts()                         
    perms = list_permission_sets(instance_arn)           
    success = True
    for acct in accounts:
        for perm in perms:
            try:
                # List assignments for the (account, permission set) pair
                paginator = sso_admin.get_paginator("list_account_assignments")
                for page in paginator.paginate(InstanceArn=instance_arn, AccountId=acct, PermissionSetArn=perm):
                    for a in page.get("AccountAssignments", []):
                        # If this assignment is for the user, delete it
                        if a.get("PrincipalType") == "USER" and a.get("PrincipalId") == principal_id:
                            sso_admin.delete_account_assignment(
                                InstanceArn=instance_arn,
                                TargetId=acct,
                                TargetType="AWS_ACCOUNT",
                                PermissionSetArn=perm,
                                PrincipalType="USER",
                                PrincipalId=principal_id,
                            )
                            print(f"Removed assignment: user={principal_id} account={acct} permission_set={perm}")
            except ClientError as e:
                # Log the failure but continue attempting other assignments
                print(f"Warning: failed removing assignments for acct={acct} perm={perm}: {e}")
                success = False
    return success
def delete_user(identity_store_id, user_id):
    """
    Delete the user from the Identity Store using identitystore.delete_user.
    Returns True on success, False on failure.
    """
    try:
        identitystore.delete_user(IdentityStoreId=identity_store_id, UserId=user_id)
        print(f"Deleted user: {user_id}")
        return True
    except ClientError as e:
        print(f"Error deleting user {user_id}: {e}")
        return False
def lambda_handler(event=None, context=None):
    """
    Main entry point: discover Identity Center instance, fetch users,
    build active user sets from CloudTrail, then remove and delete inactive users.
    """
    # Get Identity Center instance ARN and the identity store ID
    instance_arn, identity_store_id = get_identity_center_info()
    # Retrieve all users from the identity store
    users = list_users(identity_store_id)
    print(f"Found {len(users)} users; scanning CloudTrail UserAuthentication events since {THRESHOLD_DATE.isoformat()}")
    # Build sets of active userIds and usernames by scanning CloudTrail once
    active_ids, active_names = build_active_sets()
    inactive_count = deleted_count = 0
    # Iterate through every user in the identity store
    for u in users:
        user_id = u.get("UserId")        # unique identifier for the user in the Identity Store
        user_name = u.get("UserName")    # may be None if not set or not logged in events
        # If user_id was seen in CloudTrail active_ids OR username was seen in active_names, skip deletion
        if (user_id and user_id in active_ids) or (user_name and user_name in active_names):
            # This user authenticated recently; treat as active
            continue
        # Mark user as inactive and process cleanup
        inactive_count += 1
        print(f"\nProcessing inactive user: id={user_id} username={user_name}")
        # Remove assignments across accounts and permission sets
        if remove_user_assignments(instance_arn, user_id):
            # If assignment removal succeeded, delete the user
            if delete_user(identity_store_id, user_id):
                deleted_count += 1
        else:
            # If we couldn't clean up assignments, avoid deleting to prevent orphaned assignments
            print(f"Skipping deletion for {user_id} due to assignment removal failures")
    print(f"\nSummary: inactive={inactive_count} deleted={deleted_count}")
if __name__ == "__main__":
    # Run the handler for local testing or invocation as a script
    lambda_handler()

Al crear una función de Lambda, Lamba crea un rol de ejecución con permisos mínimos. Actualiza tu función para usar el rol de ejecución que has creado.

Creación de una regla programada de EventBridge

Sigue estos pasos:

  1. Abre la consola de Amazon EventBridge.
  2. En el panel de navegación, selecciona Reglas y, a continuación, selecciona Crear una regla.
  3. Introduce un nombre y una descripción.<br id=hardline_break/> Nota: Una regla no puede tener el mismo nombre que otra regla de la misma región de AWS y del mismo bus de eventos.
  4. En Bus de eventos, elige Bus de eventos predeterminado de AWS.
  5. En Tipo de regla selecciona Programación.
  6. Selecciona Siguiente.
  7. En Patrón de programación, elige Programación recurrente.
  8. A continuación, en Tipo de programación selecciona Programación basada en cron.
  9. En expresión Cron, especifica cron(0 0 1 * ? *) para que se ejecute mensualmente.<br id=hardline_break/> Nota: Para obtener información sobre los valores cron, consulta Expresiones cron.
  10. Selecciona Siguiente.
  11. Selecciona AWS Lambda como destino.
  12. Selecciona tu función de Lambda.
  13. Selecciona Siguiente.
  14. Revisa y elige Crear regla.

Probar la regla programada de EventBridge

Después de crear la regla programada, pruébala para asegurarte de que la automatización funciona. Establece una expresión cron para un tiempo que esté a solo unos minutos de la hora actual.

Por ejemplo, si comienzas la prueba a las 11:57 h., define la expresión en cron(59 11 * * ? *) para que se inicie a las 11:59 h. 

Tras confirmar que la automatización funciona, modifica la expresión cron de la regla para adaptarla a tu programación de producción.

Información relacionada

Prácticas recomendadas para la seguridad, la identidad y el cumplimiento

Referencia de estándares para Security Hub CSPM

Administración del origen de identidad

Revisar y eliminar periódicamente usuarios, roles, permisos, políticas y credenciales no utilizados

OFICIAL DE AWSActualizada hace 3 meses