Global outage event
If you're experiencing issues with your AWS services, then please refer to the AWS Health Dashboard. You can find the overall status of ongoing outages, the health of AWS services, and the latest updates from AWS engineers.
如何自动移除非活动的 IAM Identity Center 用户?
6 分钟阅读
0
我想自动移除 90 天内未登录的 AWS IAM Identity Center 用户。
简短描述
要自动移除非活动的 IAM Identity Center 用户,请为 AWS Lambda 创建一个执行角色来代您执行操作。然后,创建 Lambda 函数和 Amazon EventBridge 规则,使该函数按指定的计划运行。
解决方法
创建执行角色
完成以下步骤:
- 使用 AWS Identity and Access Management (IAM) 控制台创建执行角色。
- 将以下权限添加到 IAM 角色的策略中:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "cloudtrail:LookupEvents", "sso:ListAccountAssignments", "sso:ListPermissionSets", "organizations:ListAccounts", "sso:ListInstances", "sso:DeleteAccountAssignment", "identitystore:DeleteUser", "sso-directory:ListUsers", "sso-directory:DeleteUser", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "identitystore:ListUsers" ], "Resource": "*" } ] }
上述策略语句中的权限允许执行角色执行以下操作:
- 查看 CloudTrail 中是否存在 UserAuthentication 事件,识别 90 天内未登录的用户。
- 如果用户在过去 90 天内没有 UserAuthentication 事件,请将其标记为非活动。
- 将非活动用户和尚未登录 AWS IAM Identity Center 访问门户的新用户添加到删除队列中。
- 检查每个非活动用户的 AWS 账户或应用程序分配。
- 移除这些分配,然后删除用户。
重要事项:
- 如果您使用外部身份提供者作为身份来源,则必须在外部身份提供者级别从 AWS Identity Center SAML 应用程序中删除未授权的用户。
- 如果您使用 Active Directory 作为身份来源,则必须从同步范围中移除目标用户及其关联的组成员资格。
创建 Lambda 函数并附加执行角色
使用 Lambda 控制台创建 Lambda 函数。对于 Runtime(运行时),选择 Python 3.13。在内置代码编辑器中,输入以下 Python 代码:
import json from datetime import datetime, timedelta import boto3 from botocore.exceptions import ClientError # Create boto3 clients to call AWS services used by the script. sso_admin = boto3.client("sso-admin") identitystore = boto3.client("identitystore") cloudtrail = boto3.client("cloudtrail") org = boto3.client("organizations") # Threshold to consider a user "active": any authentication after THRESHOLD_DATE counts. THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90) def get_identity_center_info(): """ Retrieve the first Identity Center (SSO) instance's ARN and IdentityStoreId. """ resp = sso_admin.list_instances() inst = resp.get("Instances") or [] if not inst: raise RuntimeError("No Identity Center instance found") # Return the InstanceArn and IdentityStoreId of the first instance found return inst[0]["InstanceArn"], inst[0]["IdentityStoreId"] def list_users(identity_store_id): """ List all users from the given Identity Store (IdentityCenter). Returns a list of users as returned by identitystore.list_users. """ users = [] # Use a paginator to handle large user directories for page in identitystore.get_paginator("list_users").paginate(IdentityStoreId=identity_store_id): users.extend(page.get("Users", [])) return users def build_active_sets(): """ Scan CloudTrail LookupEvents for UserAuthentication events in the time window and build two sets: - active_ids: set of userId values found at userIdentity.onBehalfOf.userId - active_names: set of usernames found at additionalEventData.UserName Scanning CloudTrail once is more efficient than querying per user. """ active_ids = set() # seen onBehalfOf.userId values active_names = set() # seen additionalEventData.UserName values paginator = cloudtrail.get_paginator("lookup_events") try: # Iterate through pages of UserAuthentication events within the timeframe for page in paginator.paginate( LookupAttributes=[{"AttributeKey": "EventName", "AttributeValue": "UserAuthentication"}], StartTime=THRESHOLD_DATE, EndTime=datetime.utcnow(), ): for ev in page.get("Events", []): cte = ev.get("CloudTrailEvent") try: # If CloudTrailEvent is a string, parse to dict; otherwise use as-is or empty dict detail = json.loads(cte) if isinstance(cte, str) else (cte or {}) except (ValueError, TypeError): # Skip malformed or unexpected event content continue # Extract the userId from userIdentity.onBehalfOf.userId if present (preferred) uid = detail.get("userIdentity", {}).get("onBehalfOf", {}).get("userId") # Extract username from additionalEventData.UserName as a fallback uname = detail.get("additionalEventData", {}).get("UserName") if uid: active_ids.add(uid) if uname: active_names.add(uname) except ClientError as e: # Bubble up failures in CloudTrail lookup as RuntimeError so caller can handle/log raise RuntimeError(f"CloudTrail lookup failed: {e}") return active_ids, active_names def list_accounts(): """ Return a list of AWS Account IDs in the Organization. Uses organizations.list_accounts paginator to handle many accounts. """ accounts = [] for page in org.get_paginator("list_accounts").paginate(): accounts.extend([a["Id"] for a in page.get("Accounts", [])]) return accounts def list_permission_sets(instance_arn): """ Return a list of PermissionSet ARNs for the Identity Center instance. Uses sso-admin.list_permission_sets paginator to support many permission sets. """ perms = [] for page in sso_admin.get_paginator("list_permission_sets").paginate(InstanceArn=instance_arn): perms.extend(page.get("PermissionSets", [])) return perms def remove_user_assignments(instance_arn, principal_id): """ Remove all SSO account assignments for the given principal_id (user). Iterates every account and permission set, lists assignments, and deletes any assignment that matches the user PrincipalId and is of type USER. Returns True if no deletion errors occurred, False otherwise. """ accounts = list_accounts() perms = list_permission_sets(instance_arn) success = True for acct in accounts: for perm in perms: try: # List assignments for the (account, permission set) pair paginator = sso_admin.get_paginator("list_account_assignments") for page in paginator.paginate(InstanceArn=instance_arn, AccountId=acct, PermissionSetArn=perm): for a in page.get("AccountAssignments", []): # If this assignment is for the user, delete it if a.get("PrincipalType") == "USER" and a.get("PrincipalId") == principal_id: sso_admin.delete_account_assignment( InstanceArn=instance_arn, TargetId=acct, TargetType="AWS_ACCOUNT", PermissionSetArn=perm, PrincipalType="USER", PrincipalId=principal_id, ) print(f"Removed assignment: user={principal_id} account={acct} permission_set={perm}") except ClientError as e: # Log the failure but continue attempting other assignments print(f"Warning: failed removing assignments for acct={acct} perm={perm}: {e}") success = False return success def delete_user(identity_store_id, user_id): """ Delete the user from the Identity Store using identitystore.delete_user. Returns True on success, False on failure. """ try: identitystore.delete_user(IdentityStoreId=identity_store_id, UserId=user_id) print(f"Deleted user: {user_id}") return True except ClientError as e: print(f"Error deleting user {user_id}: {e}") return False def lambda_handler(event=None, context=None): """ Main entry point: discover Identity Center instance, fetch users, build active user sets from CloudTrail, then remove and delete inactive users. """ # Get Identity Center instance ARN and the identity store ID instance_arn, identity_store_id = get_identity_center_info() # Retrieve all users from the identity store users = list_users(identity_store_id) print(f"Found {len(users)} users; scanning CloudTrail UserAuthentication events since {THRESHOLD_DATE.isoformat()}") # Build sets of active userIds and usernames by scanning CloudTrail once active_ids, active_names = build_active_sets() inactive_count = deleted_count = 0 # Iterate through every user in the identity store for u in users: user_id = u.get("UserId") # unique identifier for the user in the Identity Store user_name = u.get("UserName") # may be None if not set or not logged in events # If user_id was seen in CloudTrail active_ids OR username was seen in active_names, skip deletion if (user_id and user_id in active_ids) or (user_name and user_name in active_names): # This user authenticated recently; treat as active continue # Mark user as inactive and process cleanup inactive_count += 1 print(f"\nProcessing inactive user: id={user_id} username={user_name}") # Remove assignments across accounts and permission sets if remove_user_assignments(instance_arn, user_id): # If assignment removal succeeded, delete the user if delete_user(identity_store_id, user_id): deleted_count += 1 else: # If we couldn't clean up assignments, avoid deleting to prevent orphaned assignments print(f"Skipping deletion for {user_id} due to assignment removal failures") print(f"\nSummary: inactive={inactive_count} deleted={deleted_count}") if __name__ == "__main__": # Run the handler for local testing or invocation as a script lambda_handler()
创建 Lambda 函数时,Lamba 会创建一个具有最低权限的执行角色。更新您的函数以使用您创建的执行角色。
创建 EventBridge 计划规则
完成以下步骤:
- 打开 Amazon EventBridge 控制台。
- 在导航窗格中,选择 Rules(规则),然后选择 Create rule(创建规则)。
- 输入名称和描述。<br id=hardline_break/> **注意:**规则不能与同一 AWS 区域和同一事件总线上的其他规则同名。
- 对于 Event bus(事件总线),选择 AWS default event bus(AWS 默认事件总线)。
- 对于 Rule type(规则类型),选择 Schedule(计划)。
- 选择 Next(下一步)。
- 对于 Schedule pattern(计划模式),选择 Recurring schedule(定期计划)。
- 对于 Schedule type(计划类型),选择 Cron-based schedule(基于 Cron 的计划)。
- 对于Cron expression(Cron 表达式),指定 cron(0 0 1 * ? *) 以每月运行一次。<br id=hardline_break/> **注意:**有关 cron 值的信息,请参阅 Cron 表达式。
- 选择 Next(下一步)。
- 选择 AWS Lambda 作为目标。
- 选择您的 Lambda 函数。
- 选择 Next(下一步)。
- 查看并选择 Create rule(创建规则)。
测试 EventBridge 计划规则
创建计划规则后,请测试该规则,确保自动化能正常运行。将 cron 表达式设置为与当前时间仅间隔几分钟的时间。
例如,如果您在上午 11:57 开始测试,请将表达式设置为 cron(59 11 * * ? *),以便在上午 11:59 启动。
确认自动化能正常运行后,请将规则的 cron 表达式修改为您的生产计划。
相关信息
- 语言
- 中文 (简体)

AWS 官方已更新 5 个月前
没有评论
相关内容
AWS 官方已更新 6 个月前