Skip to content

How do I automatically remove inactive IAM Identity Center users?

8 minute read
0

I want to automatically remove AWS IAM Identity Center users who haven't signed in for 90 days.

Short description

To automatically remove inactive IAM Identity Center users, create an execution role for AWS Lambda to perform actions for you. Then, create a Lambda function and an Amazon EventBridge rule for the function to run on a specified schedule.

Resolution

Create an execution role

Complete the following steps:

  1. Use the AWS Identity and Access Management (IAM) console to create the execution role.
  2. Add the following permissions to the IAM role's policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "cloudtrail:LookupEvents",
                    "sso:ListAccountAssignments",
                    "sso:ListPermissionSets",
                    "organizations:ListAccounts",
                    "sso:ListInstances",
                    "sso:DeleteAccountAssignment",
                    "identitystore:DeleteUser",
                    "sso-directory:ListUsers",
                    "sso-directory:DeleteUser",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "identitystore:ListUsers"
                ],
                "Resource": "*"
            }
        ]
    }

The permissions in the preceding policy statement allow the execution role to perform the following actions:

  • Check CloudTrail for UserAuthentication events to identify users who haven't signed in for 90 days.
  • Mark users as inactive if they had no UserAuthentication event in the past 90 days.
  • Add to a deletion queue inactive users and new users who haven't signed in to the AWS IAM Identity Center access portal.
  • Check for AWS account or application assignments for each inactive user.
  • Remove assignments, and then delete the user.

Important:

  • If you use an external identity provider as your identity source, then you must delete unauthorized users from the AWS Identity Center SAML application at the external identity provider level. 
  • If you use Active Directory as your identity source, then you must remove target users and their associated group memberships from the sync scope. 

Create a Lambda function, and attach the execution role

Use the Lambda console to create the Lambda function. For Runtime, choose Python 3.13. In the built-in code editor, enter the following Python code:

import json
from datetime import datetime, timedelta

import boto3
from botocore.exceptions import ClientError

# Create boto3 clients to call AWS services used by the script.
sso_admin = boto3.client("sso-admin")
identitystore = boto3.client("identitystore")
cloudtrail = boto3.client("cloudtrail")
org = boto3.client("organizations")

# Threshold to consider a user "active": any authentication after THRESHOLD_DATE counts.
THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90)


def get_identity_center_info():
    """
    Retrieve the first Identity Center (SSO) instance's ARN and IdentityStoreId.
    """
    resp = sso_admin.list_instances()                         
    inst = resp.get("Instances") or []                     
    if not inst:
        raise RuntimeError("No Identity Center instance found")  
    # Return the InstanceArn and IdentityStoreId of the first instance found
    return inst[0]["InstanceArn"], inst[0]["IdentityStoreId"]


def list_users(identity_store_id):
    """
    List all users from the given Identity Store (IdentityCenter).
    Returns a list of users as returned by identitystore.list_users.
    """
    users = []
    # Use a paginator to handle large user directories
    for page in identitystore.get_paginator("list_users").paginate(IdentityStoreId=identity_store_id):
        users.extend(page.get("Users", []))                  
    return users


def build_active_sets():
    """
    Scan CloudTrail LookupEvents for UserAuthentication events in the time window
    and build two sets:
      - active_ids: set of userId values found at userIdentity.onBehalfOf.userId
      - active_names: set of usernames found at additionalEventData.UserName

    Scanning CloudTrail once is more efficient than querying per user.
    """
    active_ids = set()    # seen onBehalfOf.userId values
    active_names = set()  # seen additionalEventData.UserName values

    paginator = cloudtrail.get_paginator("lookup_events")
    try:
        # Iterate through pages of UserAuthentication events within the timeframe
        for page in paginator.paginate(
            LookupAttributes=[{"AttributeKey": "EventName", "AttributeValue": "UserAuthentication"}],
            StartTime=THRESHOLD_DATE,
            EndTime=datetime.utcnow(),
        ):
            for ev in page.get("Events", []):               
                cte = ev.get("CloudTrailEvent")            
                try:
                    # If CloudTrailEvent is a string, parse to dict; otherwise use as-is or empty dict
                    detail = json.loads(cte) if isinstance(cte, str) else (cte or {})
                except (ValueError, TypeError):
                    # Skip malformed or unexpected event content
                    continue

                # Extract the userId from userIdentity.onBehalfOf.userId if present (preferred)
                uid = detail.get("userIdentity", {}).get("onBehalfOf", {}).get("userId")
                # Extract username from additionalEventData.UserName as a fallback
                uname = detail.get("additionalEventData", {}).get("UserName")

                if uid:
                    active_ids.add(uid)                    
                if uname:
                    active_names.add(uname)                 
    except ClientError as e:
        # Bubble up failures in CloudTrail lookup as RuntimeError so caller can handle/log
        raise RuntimeError(f"CloudTrail lookup failed: {e}")
    return active_ids, active_names


def list_accounts():
    """
    Return a list of AWS Account IDs in the Organization.
    Uses organizations.list_accounts paginator to handle many accounts.
    """
    accounts = []
    for page in org.get_paginator("list_accounts").paginate():
        accounts.extend([a["Id"] for a in page.get("Accounts", [])])  
    return accounts


def list_permission_sets(instance_arn):
    """
    Return a list of PermissionSet ARNs for the Identity Center instance.
    Uses sso-admin.list_permission_sets paginator to support many permission sets.
    """
    perms = []
    for page in sso_admin.get_paginator("list_permission_sets").paginate(InstanceArn=instance_arn):
        perms.extend(page.get("PermissionSets", []))
    return perms


def remove_user_assignments(instance_arn, principal_id):
    """
    Remove all SSO account assignments for the given principal_id (user).
    Iterates every account and permission set, lists assignments, and deletes any
    assignment that matches the user PrincipalId and is of type USER.

    Returns True if no deletion errors occurred, False otherwise.
    """
    accounts = list_accounts()                         
    perms = list_permission_sets(instance_arn)           
    success = True

    for acct in accounts:
        for perm in perms:
            try:
                # List assignments for the (account, permission set) pair
                paginator = sso_admin.get_paginator("list_account_assignments")
                for page in paginator.paginate(InstanceArn=instance_arn, AccountId=acct, PermissionSetArn=perm):
                    for a in page.get("AccountAssignments", []):
                        # If this assignment is for the user, delete it
                        if a.get("PrincipalType") == "USER" and a.get("PrincipalId") == principal_id:
                            sso_admin.delete_account_assignment(
                                InstanceArn=instance_arn,
                                TargetId=acct,
                                TargetType="AWS_ACCOUNT",
                                PermissionSetArn=perm,
                                PrincipalType="USER",
                                PrincipalId=principal_id,
                            )
                            print(f"Removed assignment: user={principal_id} account={acct} permission_set={perm}")
            except ClientError as e:
                # Log the failure but continue attempting other assignments
                print(f"Warning: failed removing assignments for acct={acct} perm={perm}: {e}")
                success = False
    return success


def delete_user(identity_store_id, user_id):
    """
    Delete the user from the Identity Store using identitystore.delete_user.
    Returns True on success, False on failure.
    """
    try:
        identitystore.delete_user(IdentityStoreId=identity_store_id, UserId=user_id)
        print(f"Deleted user: {user_id}")
        return True
    except ClientError as e:
        print(f"Error deleting user {user_id}: {e}")
        return False


def lambda_handler(event=None, context=None):
    """
    Main entry point: discover Identity Center instance, fetch users,
    build active user sets from CloudTrail, then remove and delete inactive users.
    """
    # Get Identity Center instance ARN and the identity store ID
    instance_arn, identity_store_id = get_identity_center_info()

    # Retrieve all users from the identity store
    users = list_users(identity_store_id)
    print(f"Found {len(users)} users; scanning CloudTrail UserAuthentication events since {THRESHOLD_DATE.isoformat()}")

    # Build sets of active userIds and usernames by scanning CloudTrail once
    active_ids, active_names = build_active_sets()

    inactive_count = deleted_count = 0

    # Iterate through every user in the identity store
    for u in users:
        user_id = u.get("UserId")        # unique identifier for the user in the Identity Store
        user_name = u.get("UserName")    # may be None if not set or not logged in events

        # If user_id was seen in CloudTrail active_ids OR username was seen in active_names, skip deletion
        if (user_id and user_id in active_ids) or (user_name and user_name in active_names):
            # This user authenticated recently; treat as active
            continue

        # Mark user as inactive and process cleanup
        inactive_count += 1
        print(f"\nProcessing inactive user: id={user_id} username={user_name}")

        # Remove assignments across accounts and permission sets
        if remove_user_assignments(instance_arn, user_id):
            # If assignment removal succeeded, delete the user
            if delete_user(identity_store_id, user_id):
                deleted_count += 1
        else:
            # If we couldn't clean up assignments, avoid deleting to prevent orphaned assignments
            print(f"Skipping deletion for {user_id} due to assignment removal failures")

    print(f"\nSummary: inactive={inactive_count} deleted={deleted_count}")


if __name__ == "__main__":
    # Run the handler for local testing or invocation as a script
    lambda_handler()

When you create a Lambda function, Lamba creates an execution role with minimum permissions. Update your function to use the execution role that you created.

Create an EventBridge scheduled rule

Complete the following steps:

  1. Open the Amazon EventBridge console.
  2. In the navigation pane, choose Rules, and then choose Create rule.
  3. Enter a name and description.
    Note: A rule can't have the same name as another rule in the same AWS Region and on the same event bus.
  4. For Event bus, select AWS default event bus.
  5. For Rule type, choose Schedule.
  6. Choose Next.
  7. For Schedule pattern, choose Recurring schedule.
  8. For Schedule type, choose Cron-based schedule.
  9. For Cron expression, specify cron(0 0 1 * ? *) to run monthly.
    Note: For information about cron values, see Cron expressions.
  10. Choose Next.
  11. Select AWS Lambda as the target.
  12. Select your Lambda function.
  13. Choose Next.
  14. Review and choose Create rule.

Test the EventBridge scheduled rule

After you create your scheduled rule, test the rule to make sure that the automation works. Set a cron expression for a time that's only a few minutes from the current time.

For example, if you start the test at 11:57 AM, then set the expression to cron(59 11 * * ? *) to initiate at 11:59 AM. 

After you confirm that your automation works, modify your rule's cron expression to your production schedule.

Related information

Best practices for security, identity, and compliance

Standards reference for Security Hub CSPM

Manage your identity source

Regularly review and remove unused users, roles, permissions, policies, and credentials

AWS OFFICIALUpdated 6 months ago
2 Comments

After running the same script, all users are getting deleted, even though they are still active.

replied 8 months ago

Hello Akash, Thank you for your feedback.

Based on your comment, it appears you’re using either the IAM Identity Center native directory or an Active Directory connected directory as your identity source. In the original sample code, the automation depended on the Username attribute from the UserAuthentication event in CloudTrail. However, this attribute is not logged for users from Active Directory nor the native Identity Center directory. As a result, the workflow didn’t function as expected in those environments without code adjustments.

To fix this gap, we’ve updated the sample code to query both the UserID and Username attributes when looking up users in the CloudTrail event logs. With this update, the automation now works seamlessly across all AWS IAM Identity Center identity sources.

replied 6 months ago