如何自動移除非活躍的 IAM Identity Center 使用者?
5 分的閱讀內容
0
我想自動移除 90 天沒有登入過的 AWS IAM Identity Center 使用者。
簡短描述
若要自動移除非活躍的 IAM Identity Center 使用者,請為 AWS Lambda 建立一個執行角色,讓它能代表您執行相關操作。然後,建立一個 Lambda 函式,並為該函式建立一個 Amazon EventBridge 規則,使其能依指定的排程執行。
解決方法
建立執行角色
請完成下列步驟:
- 使用 AWS Identity and Access Management (IAM) 主控台建立執行角色。
- 將下列權限新增至 IAM 角色的政策中:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "cloudtrail:LookupEvents", "organizations:ListAccounts", "sso:ListAccountAssignments", "sso:ListPermissionSets", "sso:ListInstances", "sso:DeleteAccountAssignment", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "identitystore:ListUsers", "identitystore:DeleteUser" ], "Resource": "*" } ] }
上述政策陳述式中的權限允許執行角色執行下列動作:
- 檢查 CloudTrail 中的 UserAuthentication 事件,以識別 90 天內未曾登入的使用者。
- 如果使用者在過去 90 天內沒有發生任何 UserAuthentication 事件,則將該使用者標記為非活躍。
- 將非活躍使用者和未曾登入 AWS IAM Identity Center 存取入口網站的新使用者新增至刪除佇列。
- 檢查每個非活躍使用者是否有指派的 AWS 帳戶或應用程式。
- 移除指派後,再刪除該使用者。
重要:
- 如果您使用外部身分提供者作為身分來源,則必須在外部身分提供者層級刪除 AWS Identity Center SAML 應用程式中未經授權的使用者。
- 如果您使用 Active Directory 作為身分識別來源,則必須將目標使用者及其關聯的群組成員資格從同步範圍中移除。
移除未經授權和目標使用者後,您將無法重新佈建他們。
建立 Lambda 函式並附加執行角色
使用 Lambda 主控台建立 Lambda 函式。在 Runtime (執行時期),選擇 Python 3.13。在內建程式碼編輯器中,輸入以下 Python 程式碼:
import boto3 import json from datetime import datetime, timedelta from botocore.exceptions import ClientError # Initialize AWS clients sso_admin_client = boto3.client('sso-admin') identitystore_client = boto3.client('identitystore') cloudtrail_client = boto3.client('cloudtrail') org_client = boto3.client('organizations') # Define the time threshold (90 days ago) THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90) def get_identity_center_info(): """Fetch the Identity Center directory ID and Instance ARN.""" response = sso_admin_client.list_instances() if not response['Instances']: raise Exception("No Identity Center instance found.") instance = response['Instances'][0] return instance['InstanceArn'], instance['IdentityStoreId'] def list_users(identity_store_id): """List all users in AWS Identity Center.""" users = [] paginator = identitystore_client.get_paginator('list_users') for page in paginator.paginate(IdentityStoreId=identity_store_id): users.extend(page['Users']) return users def check_user_authentication(username): """ Check if a user has UserAuthentication logged in CloudTrail in the last 90 days. Returns True if the user has authenticated, otherwise False. """ if not username: print("Warning: Username is empty. Skipping authentication check.") return False try: paginator = cloudtrail_client.get_paginator('lookup_events') for page in paginator.paginate( LookupAttributes=[ {'AttributeKey': 'EventName', 'AttributeValue': 'UserAuthentication'} ], StartTime=THRESHOLD_DATE, EndTime=datetime.utcnow() ): for event in page['Events']: # Parse the CloudTrail event event_detail = json.loads(event['CloudTrailEvent']) username_in_event = event_detail.get('additionalEventData', {}).get('UserName') # Match the username to confirm the event belongs to the user if username_in_event == username: return True except Exception as e: print(f"Error checking authentication for user '{username}': {e}") return False def find_user_account_assignments(instance_arn, principal_id): """Find all account assignments for a specific user.""" user_assignments = [] # Get all accounts accounts = [] try: paginator = org_client.get_paginator('list_accounts') for page in paginator.paginate(): accounts.extend([account['Id'] for account in page['Accounts']]) except ClientError as e: print(f"Error listing accounts: {e}") return user_assignments # Get all permission sets permission_sets = [] try: paginator = sso_admin_client.get_paginator('list_permission_sets') for page in paginator.paginate(InstanceArn=instance_arn): permission_sets.extend(page['PermissionSets']) except ClientError as e: print(f"Error listing permission sets: {e}") return user_assignments # Check each account and permission set combination for the user for account_id in accounts: for permission_set_arn in permission_sets: try: paginator = sso_admin_client.get_paginator('list_account_assignments') for page in paginator.paginate( InstanceArn=instance_arn, AccountId=account_id, PermissionSetArn=permission_set_arn ): # Filter assignments for the specific user for assignment in page['AccountAssignments']: if assignment['PrincipalId'] == principal_id and assignment['PrincipalType'] == 'USER': user_assignments.append({ 'AccountId': account_id, 'PermissionSetArn': permission_set_arn, 'PrincipalId': principal_id }) except ClientError as e: print(f"Error listing assignments for account {account_id}, permission set {permission_set_arn}: {e}") continue return user_assignments def remove_user_assignments(instance_arn, assignments): """Remove account assignments for a specific user.""" success = True for assignment in assignments: try: print(f"Removing assignment: Account={assignment['AccountId']}, PermissionSet={assignment['PermissionSetArn']}") sso_admin_client.delete_account_assignment( InstanceArn=instance_arn, TargetId=assignment['AccountId'], TargetType='AWS_ACCOUNT', PermissionSetArn=assignment['PermissionSetArn'], PrincipalType='USER', PrincipalId=assignment['PrincipalId'] ) except ClientError as e: print(f"Error removing assignment: {e}") success = False return success def delete_user(identity_store_id, user_id): """Delete a user from AWS Identity Center.""" try: identitystore_client.delete_user( IdentityStoreId=identity_store_id, UserId=user_id ) print(f"Deleted user with ID: {user_id}") return True except ClientError as e: print(f"Error deleting user with ID {user_id}: {e}") return False def lambda_handler(event, context): """AWS Lambda entry point.""" try: # Step 1: Get Identity Center directory ID and Instance ARN instance_arn, identity_store_id = get_identity_center_info() print(f"Found Identity Center instance: {instance_arn}") # Step 2: List all users users = list_users(identity_store_id) print(f"Found {len(users)} users in Identity Center") inactive_users = 0 deleted_users = 0 for user in users: user_name = user.get('UserName') user_id = user.get('UserId') # Step 3: Check if the user has authenticated recently if check_user_authentication(user_name): print(f"User '{user_name}' (ID: {user_id}) has authenticated recently. Skipping.") continue inactive_users += 1 print(f"\nProcessing inactive user: '{user_name}' (ID: {user_id})") # Step 4: Find all account assignments for this user user_assignments = find_user_account_assignments(instance_arn, user_id) if user_assignments: print(f"Found {len(user_assignments)} assignments for user '{user_name}'") # Step 5: Remove the user's assignments if remove_user_assignments(instance_arn, user_assignments): print(f"Successfully removed all assignments for user '{user_name}'") else: print(f"Failed to remove some assignments for user '{user_name}'. Skipping deletion.") continue else: print(f"No assignments found for user '{user_name}'") # Step 6: Delete the user if delete_user(identity_store_id, user_id): deleted_users += 1 print(f"\nSummary: Found {inactive_users} inactive users, successfully deleted {deleted_users} users") except Exception as e: print(f"Error in main execution: {e}")
當您建立 Lambda 函式時,Lamba 會建立一個具有最低權限的執行角色。請將您的函式更新為使用您所建立的執行角色。
建立一個 EventBridge 排程規則
請完成下列步驟:
- 開啟 Amazon EventBridge console (Amazon EventBridge 主控台)。
- 在導覽窗格中,選擇 Rules (規則),然後選擇 Create rule (建立規則)。
- 輸入名稱和描述。
**注意:**規則名稱在同一個 AWS 區域及相同事件匯流排中,不能與其他規則重複。 - 在 Event bus (事件匯流排) 中,選取 AWS default event bus (AWS 預設事件匯流排)。
- 在 Rule type (規則類型) 中,選擇 Schedule (排程)。
- 選擇 Next (下一步)。
- 在 Schedule pattern (排程模式),選擇 Recurring schedule (定期排程)。
- 在 Schedule type (排程類型) 中,選擇 Cron-based schedule (Cron 式排程)。
- 在 Cron expression (Cron 表達式),指定 cron (0 0 1 * ?*) 以設定每月執行一次。
**注意:**有關 Cron 值的資訊,請參閱 Cron 表達式。 - 選擇 Next (下一步)。
- 選取 AWS Lambda 作為目標。
- 選取您的 Lambda 函式。
- 選擇 Next (下一步)。
- 檢查並選擇 Create rule (建立規則)。
測試 EventBridge 排程規則
建立排程規則後,請測試該規則以確保自動化功能正常運作。設定一個距離目前時間僅幾分鐘的 Cron 表達式。
例如,如果您在上午 11:57 開始測試,則將表達式設為 cron (59 11 * * ?*),讓它在上午 11:59 執行。
確認自動化功能正常運作後,請根據生產排程修改規則的 Cron 表達式。
相關資訊
- 語言
- 中文 (繁體)

AWS 官方已更新 3 個月前
沒有評論
相關內容
- 已提問 1 年前
AWS 官方已更新 6 個月前