如何自动移除非活动的 IAM Identity Center 用户?
5 分钟阅读
0
我想自动移除 90 天内未登录的 AWS IAM Identity Center 用户。
简短描述
要自动移除非活动的 IAM Identity Center 用户,请为 AWS Lambda 创建一个执行角色来代您执行操作。然后,创建一个 Lambda 函数和一条 Amazon EventBridge 规则,使该函数按指定的计划运行。
解决方法
创建执行角色
完成以下步骤:
- 使用 AWS Identity and Access Management (IAM) 控制台创建执行角色。
- 将以下权限添加到 IAM 角色的策略中:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "cloudtrail:LookupEvents", "organizations:ListAccounts", "sso:ListAccountAssignments", "sso:ListPermissionSets", "sso:ListInstances", "sso:DeleteAccountAssignment", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "identitystore:ListUsers", "identitystore:DeleteUser" ], "Resource": "*" } ] }
上述策略语句中的权限允许执行角色执行以下操作:
- 查看 CloudTrail 中是否存在 UserAuthentication 事件,以识别 90 天内未登录的用户。
- 如果用户在过去 90 天内没有 UserAuthentication 事件,请将其标记为非活动。
- 将非活动用户和尚未登录 AWS IAM Identity Center 访问门户的新用户添加到删除队列中。
- 检查每个非活动用户的 AWS 账户或应用程序分配。
- 移除这些分配,然后删除用户。
重要事项:
- 如果您使用外部身份提供者作为身份来源,则必须在外部身份提供者级别从 AWS Identity Center SAML 应用程序中删除未授权的用户。
- 如果您使用 Active Directory 作为身份来源,则必须从同步范围中移除目标用户及其关联的组成员资格。
移除未授权用户和目标用户后,将无法重新配置这些用户。
创建 Lambda 函数,并附加执行角色
使用 Lambda 控制台创建 Lambda 函数。对于 Runtime(运行时),选择 Python 3.13。在内置代码编辑器中,输入以下 Python 代码:
import boto3 import json from datetime import datetime, timedelta from botocore.exceptions import ClientError # Initialize AWS clients sso_admin_client = boto3.client('sso-admin') identitystore_client = boto3.client('identitystore') cloudtrail_client = boto3.client('cloudtrail') org_client = boto3.client('organizations') # Define the time threshold (90 days ago) THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90) def get_identity_center_info(): """Fetch the Identity Center directory ID and Instance ARN.""" response = sso_admin_client.list_instances() if not response['Instances']: raise Exception("No Identity Center instance found.") instance = response['Instances'][0] return instance['InstanceArn'], instance['IdentityStoreId'] def list_users(identity_store_id): """List all users in AWS Identity Center.""" users = [] paginator = identitystore_client.get_paginator('list_users') for page in paginator.paginate(IdentityStoreId=identity_store_id): users.extend(page['Users']) return users def check_user_authentication(username): """ Check if a user has UserAuthentication logged in CloudTrail in the last 90 days. Returns True if the user has authenticated, otherwise False. """ if not username: print("Warning: Username is empty. Skipping authentication check.") return False try: paginator = cloudtrail_client.get_paginator('lookup_events') for page in paginator.paginate( LookupAttributes=[ {'AttributeKey': 'EventName', 'AttributeValue': 'UserAuthentication'} ], StartTime=THRESHOLD_DATE, EndTime=datetime.utcnow() ): for event in page['Events']: # Parse the CloudTrail event event_detail = json.loads(event['CloudTrailEvent']) username_in_event = event_detail.get('additionalEventData', {}).get('UserName') # Match the username to confirm the event belongs to the user if username_in_event == username: return True except Exception as e: print(f"Error checking authentication for user '{username}': {e}") return False def find_user_account_assignments(instance_arn, principal_id): """Find all account assignments for a specific user.""" user_assignments = [] # Get all accounts accounts = [] try: paginator = org_client.get_paginator('list_accounts') for page in paginator.paginate(): accounts.extend([account['Id'] for account in page['Accounts']]) except ClientError as e: print(f"Error listing accounts: {e}") return user_assignments # Get all permission sets permission_sets = [] try: paginator = sso_admin_client.get_paginator('list_permission_sets') for page in paginator.paginate(InstanceArn=instance_arn): permission_sets.extend(page['PermissionSets']) except ClientError as e: print(f"Error listing permission sets: {e}") return user_assignments # Check each account and permission set combination for the user for account_id in accounts: for permission_set_arn in permission_sets: try: paginator = sso_admin_client.get_paginator('list_account_assignments') for page in paginator.paginate( InstanceArn=instance_arn, AccountId=account_id, PermissionSetArn=permission_set_arn ): # Filter assignments for the specific user for assignment in page['AccountAssignments']: if assignment['PrincipalId'] == principal_id and assignment['PrincipalType'] == 'USER': user_assignments.append({ 'AccountId': account_id, 'PermissionSetArn': permission_set_arn, 'PrincipalId': principal_id }) except ClientError as e: print(f"Error listing assignments for account {account_id}, permission set {permission_set_arn}: {e}") continue return user_assignments def remove_user_assignments(instance_arn, assignments): """Remove account assignments for a specific user.""" success = True for assignment in assignments: try: print(f"Removing assignment: Account={assignment['AccountId']}, PermissionSet={assignment['PermissionSetArn']}") sso_admin_client.delete_account_assignment( InstanceArn=instance_arn, TargetId=assignment['AccountId'], TargetType='AWS_ACCOUNT', PermissionSetArn=assignment['PermissionSetArn'], PrincipalType='USER', PrincipalId=assignment['PrincipalId'] ) except ClientError as e: print(f"Error removing assignment: {e}") success = False return success def delete_user(identity_store_id, user_id): """Delete a user from AWS Identity Center.""" try: identitystore_client.delete_user( IdentityStoreId=identity_store_id, UserId=user_id ) print(f"Deleted user with ID: {user_id}") return True except ClientError as e: print(f"Error deleting user with ID {user_id}: {e}") return False def lambda_handler(event, context): """AWS Lambda entry point.""" try: # Step 1: Get Identity Center directory ID and Instance ARN instance_arn, identity_store_id = get_identity_center_info() print(f"Found Identity Center instance: {instance_arn}") # Step 2: List all users users = list_users(identity_store_id) print(f"Found {len(users)} users in Identity Center") inactive_users = 0 deleted_users = 0 for user in users: user_name = user.get('UserName') user_id = user.get('UserId') # Step 3: Check if the user has authenticated recently if check_user_authentication(user_name): print(f"User '{user_name}' (ID: {user_id}) has authenticated recently. Skipping.") continue inactive_users += 1 print(f"\nProcessing inactive user: '{user_name}' (ID: {user_id})") # Step 4: Find all account assignments for this user user_assignments = find_user_account_assignments(instance_arn, user_id) if user_assignments: print(f"Found {len(user_assignments)} assignments for user '{user_name}'") # Step 5: Remove the user's assignments if remove_user_assignments(instance_arn, user_assignments): print(f"Successfully removed all assignments for user '{user_name}'") else: print(f"Failed to remove some assignments for user '{user_name}'. Skipping deletion.") continue else: print(f"No assignments found for user '{user_name}'") # Step 6: Delete the user if delete_user(identity_store_id, user_id): deleted_users += 1 print(f"\nSummary: Found {inactive_users} inactive users, successfully deleted {deleted_users} users") except Exception as e: print(f"Error in main execution: {e}")
创建 Lambda 函数时,Lamba 会创建一个具有最低权限的执行角色。更新您的函数以使用您创建的执行角色。
创建 EventBridge 计划规则
完成以下步骤:
- 打开 Amazon EventBridge 控制台。
- 在导航窗格中,选择 Rules(规则),然后选择 Create rule(创建规则)。
- 输入名称和描述。
**注意:**规则不能与同一 AWS 区域和同一事件总线上的其他规则同名。 - 对于 Event bus(事件总线),选择 AWS default event bus(AWS 默认事件总线)。
- 对于 Rule type(规则类型),选择 Schedule(计划)。
- 选择 Next(下一步)。
- 对于 Schedule pattern(计划模式),选择 Recurring schedule(定期计划)。
- 对于 Schedule type(计划类型),选择 Cron-based schedule(基于 Cron 的计划)。
- 对于Cron expression(Cron 表达式),指定 cron(0 0 1 * ? *),以每月运行一次。
**注意:**有关 cron 值的信息,请参阅 Cron 表达式。 - 选择 Next(下一步)。
- 选择 AWS Lambda 作为目标。
- 选择您的 Lambda 函数。
- 选择 Next(下一步)。
- 查看并选择 Create rule(创建规则)。
测试 EventBridge 计划规则
创建计划规则后,请测试该规则,以确保自动化能正常运行。将 cron 表达式设置为与当前时间仅间隔几分钟的时间。
例如,如果您在上午 11:57 开始测试,请将表达式设置为 cron(59 11 * * ? *),以便在上午 11:59 启动。
确认自动化能正常运行后,请将规则的 cron 表达式修改为您的生产计划。
相关信息
- 语言
- 中文 (简体)

AWS 官方已更新 3 个月前
没有评论
相关内容
AWS 官方已更新 6 个月前