跳至內容

如何自動移除非活躍的 IAM Identity Center 使用者?

5 分的閱讀內容
0

我想自動移除 90 天沒有登入過的 AWS IAM Identity Center 使用者。

簡短描述

若要自動移除非活躍的 IAM Identity Center 使用者,請為 AWS Lambda 建立一個執行角色,讓它能代表您執行相關操作。然後,建立一個 Lambda 函式,並為該函式建立一個 Amazon EventBridge 規則,使其能依指定的排程執行。

解決方法

建立執行角色

請完成下列步驟:

  1. 使用 AWS Identity and Access Management (IAM) 主控台建立執行角色
  2. 將下列權限新增至 IAM 角色的政策中:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "cloudtrail:LookupEvents",
            "organizations:ListAccounts",
            "sso:ListAccountAssignments",
            "sso:ListPermissionSets",
            "sso:ListInstances",
            "sso:DeleteAccountAssignment",
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents",
            "identitystore:ListUsers",
            "identitystore:DeleteUser"
          ],
          "Resource": "*"
        }
      ]
    }

上述政策陳述式中的權限允許執行角色執行下列動作:

  • 檢查 CloudTrail 中的 UserAuthentication 事件,以識別 90 天內未曾登入的使用者。
  • 如果使用者在過去 90 天內沒有發生任何 UserAuthentication 事件,則將該使用者標記為非活躍。
  • 將非活躍使用者和未曾登入 AWS IAM Identity Center 存取入口網站的新使用者新增至刪除佇列。
  • 檢查每個非活躍使用者是否有指派的 AWS 帳戶或應用程式。
  • 移除指派後,再刪除該使用者。

重要:

移除未經授權和目標使用者後,您將無法重新佈建他們。

建立 Lambda 函式並附加執行角色

使用 Lambda 主控台建立 Lambda 函式。在 Runtime (執行時期),選擇 Python 3.13。在內建程式碼編輯器中,輸入以下 Python 程式碼:

import boto3
import json
from datetime import datetime, timedelta
from botocore.exceptions import ClientError

# Initialize AWS clients
sso_admin_client = boto3.client('sso-admin')
identitystore_client = boto3.client('identitystore')
cloudtrail_client = boto3.client('cloudtrail')
org_client = boto3.client('organizations')

# Define the time threshold (90 days ago)
THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90)

def get_identity_center_info():
    """Fetch the Identity Center directory ID and Instance ARN."""
    response = sso_admin_client.list_instances()
    if not response['Instances']:
        raise Exception("No Identity Center instance found.")
    instance = response['Instances'][0]
    return instance['InstanceArn'], instance['IdentityStoreId']

def list_users(identity_store_id):
    """List all users in AWS Identity Center."""
    users = []
    paginator = identitystore_client.get_paginator('list_users')
    for page in paginator.paginate(IdentityStoreId=identity_store_id):
        users.extend(page['Users'])
    return users

def check_user_authentication(username):
    """
    Check if a user has UserAuthentication logged in CloudTrail in the last 90 days.
    Returns True if the user has authenticated, otherwise False.
    """
    if not username:
        print("Warning: Username is empty. Skipping authentication check.")
        return False

    try:
        paginator = cloudtrail_client.get_paginator('lookup_events')
        for page in paginator.paginate(
            LookupAttributes=[
                {'AttributeKey': 'EventName', 'AttributeValue': 'UserAuthentication'}
            ],
            StartTime=THRESHOLD_DATE,
            EndTime=datetime.utcnow()
        ):
            for event in page['Events']:
                # Parse the CloudTrail event
                event_detail = json.loads(event['CloudTrailEvent'])
                username_in_event = event_detail.get('additionalEventData', {}).get('UserName')

                # Match the username to confirm the event belongs to the user
                if username_in_event == username:
                    return True
    except Exception as e:
        print(f"Error checking authentication for user '{username}': {e}")
    return False

def find_user_account_assignments(instance_arn, principal_id):
    """Find all account assignments for a specific user."""
    user_assignments = []

    # Get all accounts
    accounts = []
    try:
        paginator = org_client.get_paginator('list_accounts')
        for page in paginator.paginate():
            accounts.extend([account['Id'] for account in page['Accounts']])
    except ClientError as e:
        print(f"Error listing accounts: {e}")
        return user_assignments

    # Get all permission sets
    permission_sets = []
    try:
        paginator = sso_admin_client.get_paginator('list_permission_sets')
        for page in paginator.paginate(InstanceArn=instance_arn):
            permission_sets.extend(page['PermissionSets'])
    except ClientError as e:
        print(f"Error listing permission sets: {e}")
        return user_assignments

    # Check each account and permission set combination for the user
    for account_id in accounts:
        for permission_set_arn in permission_sets:
            try:
                paginator = sso_admin_client.get_paginator('list_account_assignments')
                for page in paginator.paginate(
                    InstanceArn=instance_arn,
                    AccountId=account_id,
                    PermissionSetArn=permission_set_arn
                ):
                    # Filter assignments for the specific user
                    for assignment in page['AccountAssignments']:
                        if assignment['PrincipalId'] == principal_id and assignment['PrincipalType'] == 'USER':
                            user_assignments.append({
                                'AccountId': account_id,
                                'PermissionSetArn': permission_set_arn,
                                'PrincipalId': principal_id
                            })
            except ClientError as e:
                print(f"Error listing assignments for account {account_id}, permission set {permission_set_arn}: {e}")
                continue

    return user_assignments

def remove_user_assignments(instance_arn, assignments):
    """Remove account assignments for a specific user."""
    success = True
    for assignment in assignments:
        try:
            print(f"Removing assignment: Account={assignment['AccountId']}, PermissionSet={assignment['PermissionSetArn']}")
            sso_admin_client.delete_account_assignment(
                InstanceArn=instance_arn,
                TargetId=assignment['AccountId'],
                TargetType='AWS_ACCOUNT',
                PermissionSetArn=assignment['PermissionSetArn'],
                PrincipalType='USER',
                PrincipalId=assignment['PrincipalId']
            )
        except ClientError as e:
            print(f"Error removing assignment: {e}")
            success = False

    return success

def delete_user(identity_store_id, user_id):
    """Delete a user from AWS Identity Center."""
    try:
        identitystore_client.delete_user(
            IdentityStoreId=identity_store_id,
            UserId=user_id
        )
        print(f"Deleted user with ID: {user_id}")
        return True
    except ClientError as e:
        print(f"Error deleting user with ID {user_id}: {e}")
        return False

def lambda_handler(event, context):
    """AWS Lambda entry point."""
    try:
        # Step 1: Get Identity Center directory ID and Instance ARN
        instance_arn, identity_store_id = get_identity_center_info()
        print(f"Found Identity Center instance: {instance_arn}")

        # Step 2: List all users
        users = list_users(identity_store_id)
        print(f"Found {len(users)} users in Identity Center")

        inactive_users = 0
        deleted_users = 0

        for user in users:
            user_name = user.get('UserName')
            user_id = user.get('UserId')

            # Step 3: Check if the user has authenticated recently
            if check_user_authentication(user_name):
                print(f"User '{user_name}' (ID: {user_id}) has authenticated recently. Skipping.")
                continue

            inactive_users += 1
            print(f"\nProcessing inactive user: '{user_name}' (ID: {user_id})")

            # Step 4: Find all account assignments for this user
            user_assignments = find_user_account_assignments(instance_arn, user_id)

            if user_assignments:
                print(f"Found {len(user_assignments)} assignments for user '{user_name}'")

                # Step 5: Remove the user's assignments
                if remove_user_assignments(instance_arn, user_assignments):
                    print(f"Successfully removed all assignments for user '{user_name}'")
                else:
                    print(f"Failed to remove some assignments for user '{user_name}'. Skipping deletion.")
                    continue
            else:
                print(f"No assignments found for user '{user_name}'")

            # Step 6: Delete the user
            if delete_user(identity_store_id, user_id):
                deleted_users += 1

        print(f"\nSummary: Found {inactive_users} inactive users, successfully deleted {deleted_users} users")

    except Exception as e:
        print(f"Error in main execution: {e}")

當您建立 Lambda 函式時,Lamba 會建立一個具有最低權限的執行角色。請將您的函式更新為使用您所建立的執行角色。

建立一個 EventBridge 排程規則

請完成下列步驟:

  1. 開啟 Amazon EventBridge console (Amazon EventBridge 主控台)。
  2. 在導覽窗格中,選擇 Rules (規則),然後選擇 Create rule (建立規則)。
  3. 輸入名稱和描述。
    **注意:**規則名稱在同一個 AWS 區域及相同事件匯流排中,不能與其他規則重複。
  4. Event bus (事件匯流排) 中,選取 AWS default event bus (AWS 預設事件匯流排)。
  5. Rule type (規則類型) 中,選擇 Schedule (排程)。
  6. 選擇 Next (下一步)。
  7. Schedule pattern (排程模式),選擇 Recurring schedule (定期排程)。
  8. Schedule type (排程類型) 中,選擇 Cron-based schedule (Cron 式排程)。
  9. Cron expression (Cron 表達式),指定 cron (0 0 1 * ?*) 以設定每月執行一次。
    **注意:**有關 Cron 值的資訊,請參閱 Cron 表達式
  10. 選擇 Next (下一步)。
  11. 選取 AWS Lambda 作為目標。
  12. 選取您的 Lambda 函式。
  13. 選擇 Next (下一步)。
  14. 檢查並選擇 Create rule (建立規則)。

測試 EventBridge 排程規則

建立排程規則後,請測試該規則以確保自動化功能正常運作。設定一個距離目前時間僅幾分鐘的 Cron 表達式。

例如,如果您在上午 11:57 開始測試,則將表達式設為 cron (59 11 * * ?*),讓它在上午 11:59 執行。

確認自動化功能正常運作後,請根據生產排程修改規則的 Cron 表達式。

相關資訊

安全性、身分和合規性的最佳實務

Security Hub CSPM 的標準參考

管理您的身分來源

定期檢視並移除未使用的使用者、角色、權限、政策和憑證

AWS 官方已更新 3 個月前