如何自動移除非活躍的 IAM Identity Center 使用者?
6 分的閱讀內容
0
我想自動移除 90 天沒有登入過的 AWS IAM Identity Center 使用者。
簡短描述
若要自動移除非活躍的 IAM Identity Center 使用者,請為 AWS Lambda 建立一個執行角色,讓它能代表您執行相關操作。然後,建立一個 Lambda 函式,並為該函式建立一個 Amazon EventBridge 規則,使其能依指定的排程執行。
解決方法
建立執行角色
請完成下列步驟:
- 使用 AWS Identity and Access Management (IAM) 主控台建立執行角色。
- 將下列權限新增至 IAM 角色的政策中:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "cloudtrail:LookupEvents", "sso:ListAccountAssignments", "sso:ListPermissionSets", "organizations:ListAccounts", "sso:ListInstances", "sso:DeleteAccountAssignment", "identitystore:DeleteUser", "sso-directory:ListUsers", "sso-directory:DeleteUser", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "identitystore:ListUsers" ], "Resource": "*" } ] }
上述政策陳述式中的權限允許執行角色執行下列動作:
- 檢查 CloudTrail 中的 UserAuthentication 事件,以識別 90 天內未曾登入的使用者。
- 如果使用者在過去 90 天內沒有發生任何 UserAuthentication 事件,則將該使用者標記為非活躍。
- 將非活躍使用者和未曾登入 AWS IAM Identity Center 存取入口網站的新使用者新增至刪除佇列。
- 檢查每個非活躍使用者是否有指派的 AWS 帳戶或應用程式。
- 移除指派後,再刪除該使用者。
重要:
- 如果您使用外部身分提供者作為身分來源,則必須在外部身分提供者層級刪除 AWS Identity Center SAML 應用程式中未經授權的使用者。
- 如果您使用 Active Directory 作為身分識別來源,則必須將目標使用者及其關聯的群組成員資格從同步範圍中移除。
建立 Lambda 函式並附加執行角色
使用 Lambda 主控台建立 Lambda 函式。在 Runtime (執行時期),選擇 Python 3.13。在內建程式碼編輯器中,輸入以下 Python 程式碼:
import json from datetime import datetime, timedelta import boto3 from botocore.exceptions import ClientError # Create boto3 clients to call AWS services used by the script. sso_admin = boto3.client("sso-admin") identitystore = boto3.client("identitystore") cloudtrail = boto3.client("cloudtrail") org = boto3.client("organizations") # Threshold to consider a user "active": any authentication after THRESHOLD_DATE counts. THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90) def get_identity_center_info(): """ Retrieve the first Identity Center (SSO) instance's ARN and IdentityStoreId. """ resp = sso_admin.list_instances() inst = resp.get("Instances") or [] if not inst: raise RuntimeError("No Identity Center instance found") # Return the InstanceArn and IdentityStoreId of the first instance found return inst[0]["InstanceArn"], inst[0]["IdentityStoreId"] def list_users(identity_store_id): """ List all users from the given Identity Store (IdentityCenter). Returns a list of users as returned by identitystore.list_users. """ users = [] # Use a paginator to handle large user directories for page in identitystore.get_paginator("list_users").paginate(IdentityStoreId=identity_store_id): users.extend(page.get("Users", [])) return users def build_active_sets(): """ Scan CloudTrail LookupEvents for UserAuthentication events in the time window and build two sets: - active_ids: set of userId values found at userIdentity.onBehalfOf.userId - active_names: set of usernames found at additionalEventData.UserName Scanning CloudTrail once is more efficient than querying per user. """ active_ids = set() # seen onBehalfOf.userId values active_names = set() # seen additionalEventData.UserName values paginator = cloudtrail.get_paginator("lookup_events") try: # Iterate through pages of UserAuthentication events within the timeframe for page in paginator.paginate( LookupAttributes=[{"AttributeKey": "EventName", "AttributeValue": "UserAuthentication"}], StartTime=THRESHOLD_DATE, EndTime=datetime.utcnow(), ): for ev in page.get("Events", []): cte = ev.get("CloudTrailEvent") try: # If CloudTrailEvent is a string, parse to dict; otherwise use as-is or empty dict detail = json.loads(cte) if isinstance(cte, str) else (cte or {}) except (ValueError, TypeError): # Skip malformed or unexpected event content continue # Extract the userId from userIdentity.onBehalfOf.userId if present (preferred) uid = detail.get("userIdentity", {}).get("onBehalfOf", {}).get("userId") # Extract username from additionalEventData.UserName as a fallback uname = detail.get("additionalEventData", {}).get("UserName") if uid: active_ids.add(uid) if uname: active_names.add(uname) except ClientError as e: # Bubble up failures in CloudTrail lookup as RuntimeError so caller can handle/log raise RuntimeError(f"CloudTrail lookup failed: {e}") return active_ids, active_names def list_accounts(): """ Return a list of AWS Account IDs in the Organization. Uses organizations.list_accounts paginator to handle many accounts. """ accounts = [] for page in org.get_paginator("list_accounts").paginate(): accounts.extend([a["Id"] for a in page.get("Accounts", [])]) return accounts def list_permission_sets(instance_arn): """ Return a list of PermissionSet ARNs for the Identity Center instance. Uses sso-admin.list_permission_sets paginator to support many permission sets. """ perms = [] for page in sso_admin.get_paginator("list_permission_sets").paginate(InstanceArn=instance_arn): perms.extend(page.get("PermissionSets", [])) return perms def remove_user_assignments(instance_arn, principal_id): """ Remove all SSO account assignments for the given principal_id (user). Iterates every account and permission set, lists assignments, and deletes any assignment that matches the user PrincipalId and is of type USER. Returns True if no deletion errors occurred, False otherwise. """ accounts = list_accounts() perms = list_permission_sets(instance_arn) success = True for acct in accounts: for perm in perms: try: # List assignments for the (account, permission set) pair paginator = sso_admin.get_paginator("list_account_assignments") for page in paginator.paginate(InstanceArn=instance_arn, AccountId=acct, PermissionSetArn=perm): for a in page.get("AccountAssignments", []): # If this assignment is for the user, delete it if a.get("PrincipalType") == "USER" and a.get("PrincipalId") == principal_id: sso_admin.delete_account_assignment( InstanceArn=instance_arn, TargetId=acct, TargetType="AWS_ACCOUNT", PermissionSetArn=perm, PrincipalType="USER", PrincipalId=principal_id, ) print(f"Removed assignment: user={principal_id} account={acct} permission_set={perm}") except ClientError as e: # Log the failure but continue attempting other assignments print(f"Warning: failed removing assignments for acct={acct} perm={perm}: {e}") success = False return success def delete_user(identity_store_id, user_id): """ Delete the user from the Identity Store using identitystore.delete_user. Returns True on success, False on failure. """ try: identitystore.delete_user(IdentityStoreId=identity_store_id, UserId=user_id) print(f"Deleted user: {user_id}") return True except ClientError as e: print(f"Error deleting user {user_id}: {e}") return False def lambda_handler(event=None, context=None): """ Main entry point: discover Identity Center instance, fetch users, build active user sets from CloudTrail, then remove and delete inactive users. """ # Get Identity Center instance ARN and the identity store ID instance_arn, identity_store_id = get_identity_center_info() # Retrieve all users from the identity store users = list_users(identity_store_id) print(f"Found {len(users)} users; scanning CloudTrail UserAuthentication events since {THRESHOLD_DATE.isoformat()}") # Build sets of active userIds and usernames by scanning CloudTrail once active_ids, active_names = build_active_sets() inactive_count = deleted_count = 0 # Iterate through every user in the identity store for u in users: user_id = u.get("UserId") # unique identifier for the user in the Identity Store user_name = u.get("UserName") # may be None if not set or not logged in events # If user_id was seen in CloudTrail active_ids OR username was seen in active_names, skip deletion if (user_id and user_id in active_ids) or (user_name and user_name in active_names): # This user authenticated recently; treat as active continue # Mark user as inactive and process cleanup inactive_count += 1 print(f"\nProcessing inactive user: id={user_id} username={user_name}") # Remove assignments across accounts and permission sets if remove_user_assignments(instance_arn, user_id): # If assignment removal succeeded, delete the user if delete_user(identity_store_id, user_id): deleted_count += 1 else: # If we couldn't clean up assignments, avoid deleting to prevent orphaned assignments print(f"Skipping deletion for {user_id} due to assignment removal failures") print(f"\nSummary: inactive={inactive_count} deleted={deleted_count}") if __name__ == "__main__": # Run the handler for local testing or invocation as a script lambda_handler()
當您建立 Lambda 函式時,Lamba 會建立一個具有最低權限的執行角色。請將您的函式更新為使用您所建立的執行角色。
建立一個 EventBridge 排程規則
請完成下列步驟:
- 開啟 Amazon EventBridge console (Amazon EventBridge 主控台)。
- 在導覽窗格中,選擇 Rules (規則),然後選擇 Create rule (建立規則)。
- 輸入名稱和描述。<br id=hardline_break/> **注意:**規則名稱在同一個 AWS 區域及相同事件匯流排中,不能與其他規則重複。
- 在 Event bus (事件匯流排) 中,選取 AWS default event bus (AWS 預設事件匯流排)。
- 在 Rule type (規則類型) 中,選擇 Schedule (排程)。
- 選擇 Next (下一步)。
- 在 Schedule pattern (排程模式),選擇 Recurring schedule (定期排程)。
- 在 Schedule type (排程類型) 中,選擇 Cron-based schedule (Cron 式排程)。
- 在 Cron expression (Cron 表達式),指定 cron (0 0 1 * ?*) 以設定每月執行一次。<br id=hardline_break/> **注意:**有關 Cron 值的資訊,請參閱 Cron 表達式。
- 選擇 Next (下一步)。
- 選取 AWS Lambda 作為目標。
- 選取您的 Lambda 函式。
- 選擇 Next (下一步)。
- 檢查並選擇 Create rule (建立規則)。
測試 EventBridge 排程規則
建立排程規則後,請測試該規則以確保自動化功能正常運作。設定一個距離目前時間僅幾分鐘的 Cron 表達式。
例如,如果您在上午 11:57 開始測試,則將表達式設為 cron (59 11 * * ?*),讓它在上午 11:59 執行。
確認自動化功能正常運作後,請根據生產排程修改規則的 Cron 表達式。
相關資訊
- 語言
- 中文 (繁體)

AWS 官方已更新 5 個月前
沒有評論
相關內容
- 已提問 2 年前