跳至内容

如何自动移除非活动的 IAM Identity Center 用户?

5 分钟阅读
0

我想自动移除 90 天内未登录的 AWS IAM Identity Center 用户。

简短描述

要自动移除非活动的 IAM Identity Center 用户,请为 AWS Lambda 创建一个执行角色来代您执行操作。然后,创建一个 Lambda 函数和一条 Amazon EventBridge 规则,使该函数按指定的计划运行。

解决方法

创建执行角色

完成以下步骤:

  1. 使用 AWS Identity and Access Management (IAM) 控制台创建执行角色
  2. 将以下权限添加到 IAM 角色的策略中:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "cloudtrail:LookupEvents",
            "organizations:ListAccounts",
            "sso:ListAccountAssignments",
            "sso:ListPermissionSets",
            "sso:ListInstances",
            "sso:DeleteAccountAssignment",
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents",
            "identitystore:ListUsers",
            "identitystore:DeleteUser"
          ],
          "Resource": "*"
        }
      ]
    }

上述策略语句中的权限允许执行角色执行以下操作:

  • 查看 CloudTrail 中是否存在 UserAuthentication 事件,以识别 90 天内未登录的用户。
  • 如果用户在过去 90 天内没有 UserAuthentication 事件,请将其标记为非活动。
  • 将非活动用户和尚未登录 AWS IAM Identity Center 访问门户的新用户添加到删除队列中。
  • 检查每个非活动用户的 AWS 账户或应用程序分配。
  • 移除这些分配,然后删除用户。

重要事项:

  • 如果您使用外部身份提供者作为身份来源,则必须在外部身份提供者级别从 AWS Identity Center SAML 应用程序中删除未授权的用户。
  • 如果您使用 Active Directory 作为身份来源,则必须从同步范围中移除目标用户及其关联的组成员资格。

移除未授权用户和目标用户后,将无法重新配置这些用户。

创建 Lambda 函数,并附加执行角色

使用 Lambda 控制台创建 Lambda 函数。对于 Runtime(运行时),选择 Python 3.13。在内置代码编辑器中,输入以下 Python 代码:

import boto3
import json
from datetime import datetime, timedelta
from botocore.exceptions import ClientError

# Initialize AWS clients
sso_admin_client = boto3.client('sso-admin')
identitystore_client = boto3.client('identitystore')
cloudtrail_client = boto3.client('cloudtrail')
org_client = boto3.client('organizations')

# Define the time threshold (90 days ago)
THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90)

def get_identity_center_info():
    """Fetch the Identity Center directory ID and Instance ARN."""
    response = sso_admin_client.list_instances()
    if not response['Instances']:
        raise Exception("No Identity Center instance found.")
    instance = response['Instances'][0]
    return instance['InstanceArn'], instance['IdentityStoreId']

def list_users(identity_store_id):
    """List all users in AWS Identity Center."""
    users = []
    paginator = identitystore_client.get_paginator('list_users')
    for page in paginator.paginate(IdentityStoreId=identity_store_id):
        users.extend(page['Users'])
    return users

def check_user_authentication(username):
    """
    Check if a user has UserAuthentication logged in CloudTrail in the last 90 days.
    Returns True if the user has authenticated, otherwise False.
    """
    if not username:
        print("Warning: Username is empty. Skipping authentication check.")
        return False

    try:
        paginator = cloudtrail_client.get_paginator('lookup_events')
        for page in paginator.paginate(
            LookupAttributes=[
                {'AttributeKey': 'EventName', 'AttributeValue': 'UserAuthentication'}
            ],
            StartTime=THRESHOLD_DATE,
            EndTime=datetime.utcnow()
        ):
            for event in page['Events']:
                # Parse the CloudTrail event
                event_detail = json.loads(event['CloudTrailEvent'])
                username_in_event = event_detail.get('additionalEventData', {}).get('UserName')

                # Match the username to confirm the event belongs to the user
                if username_in_event == username:
                    return True
    except Exception as e:
        print(f"Error checking authentication for user '{username}': {e}")
    return False

def find_user_account_assignments(instance_arn, principal_id):
    """Find all account assignments for a specific user."""
    user_assignments = []

    # Get all accounts
    accounts = []
    try:
        paginator = org_client.get_paginator('list_accounts')
        for page in paginator.paginate():
            accounts.extend([account['Id'] for account in page['Accounts']])
    except ClientError as e:
        print(f"Error listing accounts: {e}")
        return user_assignments

    # Get all permission sets
    permission_sets = []
    try:
        paginator = sso_admin_client.get_paginator('list_permission_sets')
        for page in paginator.paginate(InstanceArn=instance_arn):
            permission_sets.extend(page['PermissionSets'])
    except ClientError as e:
        print(f"Error listing permission sets: {e}")
        return user_assignments

    # Check each account and permission set combination for the user
    for account_id in accounts:
        for permission_set_arn in permission_sets:
            try:
                paginator = sso_admin_client.get_paginator('list_account_assignments')
                for page in paginator.paginate(
                    InstanceArn=instance_arn,
                    AccountId=account_id,
                    PermissionSetArn=permission_set_arn
                ):
                    # Filter assignments for the specific user
                    for assignment in page['AccountAssignments']:
                        if assignment['PrincipalId'] == principal_id and assignment['PrincipalType'] == 'USER':
                            user_assignments.append({
                                'AccountId': account_id,
                                'PermissionSetArn': permission_set_arn,
                                'PrincipalId': principal_id
                            })
            except ClientError as e:
                print(f"Error listing assignments for account {account_id}, permission set {permission_set_arn}: {e}")
                continue

    return user_assignments

def remove_user_assignments(instance_arn, assignments):
    """Remove account assignments for a specific user."""
    success = True
    for assignment in assignments:
        try:
            print(f"Removing assignment: Account={assignment['AccountId']}, PermissionSet={assignment['PermissionSetArn']}")
            sso_admin_client.delete_account_assignment(
                InstanceArn=instance_arn,
                TargetId=assignment['AccountId'],
                TargetType='AWS_ACCOUNT',
                PermissionSetArn=assignment['PermissionSetArn'],
                PrincipalType='USER',
                PrincipalId=assignment['PrincipalId']
            )
        except ClientError as e:
            print(f"Error removing assignment: {e}")
            success = False

    return success

def delete_user(identity_store_id, user_id):
    """Delete a user from AWS Identity Center."""
    try:
        identitystore_client.delete_user(
            IdentityStoreId=identity_store_id,
            UserId=user_id
        )
        print(f"Deleted user with ID: {user_id}")
        return True
    except ClientError as e:
        print(f"Error deleting user with ID {user_id}: {e}")
        return False

def lambda_handler(event, context):
    """AWS Lambda entry point."""
    try:
        # Step 1: Get Identity Center directory ID and Instance ARN
        instance_arn, identity_store_id = get_identity_center_info()
        print(f"Found Identity Center instance: {instance_arn}")

        # Step 2: List all users
        users = list_users(identity_store_id)
        print(f"Found {len(users)} users in Identity Center")

        inactive_users = 0
        deleted_users = 0

        for user in users:
            user_name = user.get('UserName')
            user_id = user.get('UserId')

            # Step 3: Check if the user has authenticated recently
            if check_user_authentication(user_name):
                print(f"User '{user_name}' (ID: {user_id}) has authenticated recently. Skipping.")
                continue

            inactive_users += 1
            print(f"\nProcessing inactive user: '{user_name}' (ID: {user_id})")

            # Step 4: Find all account assignments for this user
            user_assignments = find_user_account_assignments(instance_arn, user_id)

            if user_assignments:
                print(f"Found {len(user_assignments)} assignments for user '{user_name}'")

                # Step 5: Remove the user's assignments
                if remove_user_assignments(instance_arn, user_assignments):
                    print(f"Successfully removed all assignments for user '{user_name}'")
                else:
                    print(f"Failed to remove some assignments for user '{user_name}'. Skipping deletion.")
                    continue
            else:
                print(f"No assignments found for user '{user_name}'")

            # Step 6: Delete the user
            if delete_user(identity_store_id, user_id):
                deleted_users += 1

        print(f"\nSummary: Found {inactive_users} inactive users, successfully deleted {deleted_users} users")

    except Exception as e:
        print(f"Error in main execution: {e}")

创建 Lambda 函数时,Lamba 会创建一个具有最低权限的执行角色。更新您的函数以使用您创建的执行角色。

创建 EventBridge 计划规则

完成以下步骤:

  1. 打开 Amazon EventBridge 控制台。
  2. 在导航窗格中,选择 Rules(规则),然后选择 Create rule(创建规则)。
  3. 输入名称和描述。
    **注意:**规则不能与同一 AWS 区域和同一事件总线上的其他规则同名。
  4. 对于 Event bus(事件总线),选择 AWS default event bus(AWS 默认事件总线)。
  5. 对于 Rule type(规则类型),选择 Schedule(计划)。
  6. 选择 Next(下一步)。
  7. 对于 Schedule pattern(计划模式),选择 Recurring schedule(定期计划)。
  8. 对于 Schedule type(计划类型),选择 Cron-based schedule(基于 Cron 的计划)。
  9. 对于Cron expression(Cron 表达式),指定 cron(0 0 1 * ? *),以每月运行一次。
    **注意:**有关 cron 值的信息,请参阅 Cron 表达式
  10. 选择 Next(下一步)。
  11. 选择 AWS Lambda 作为目标。
  12. 选择您的 Lambda 函数。
  13. 选择 Next(下一步)。
  14. 查看并选择 Create rule(创建规则)。

测试 EventBridge 计划规则

创建计划规则后,请测试该规则,以确保自动化能正常运行。将 cron 表达式设置为与当前时间仅间隔几分钟的时间。

例如,如果您在上午 11:57 开始测试,请将表达式设置为 cron(59 11 * * ? *),以便在上午 11:59 启动。

确认自动化能正常运行后,请将规则的 cron 表达式修改为您的生产计划。

相关信息

安全、身份与合规性的最佳实践

Security Hub CSPM 的标准参考

管理您的身份来源

定期查看并移除未使用的用户、角色、权限、策略和凭证

AWS 官方已更新 3 个月前