スキップしてコンテンツを表示

非アクティブな IAM Identity Center ユーザーを自動的に削除する方法を教えてください。

所要時間5分
0

90 日間サインインしていない AWS IAM Identity Center ユーザーを自動的に削除したいです。

簡単な説明

非アクティブな IAM Identity Center ユーザーを自動的に削除するには、AWS Lambda がユーザーに代わってアクションを行うための実行ロールを作成します。次に、Lambda 関数と Amazon EventBridge ルールを作成し、指定したスケジュールで関数を実行させます。

解決策

実行ロールを作成する

次の手順を実行します。

  1. AWS Identity and Access Management (IAM) コンソールを使用して実行ロールを作成します。
  2. IAM ロールのポリシーに次の権限を追加します。
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "cloudtrail:LookupEvents",
            "organizations:ListAccounts",
            "sso:ListAccountAssignments",
            "sso:ListPermissionSets",
            "sso:ListInstances",
            "sso:DeleteAccountAssignment",
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents",
            "identitystore:ListUsers",
            "identitystore:DeleteUser"
          ],
          "Resource": "*"
        }
      ]
    }

上記のポリシーステートメントの権限により、実行ロールは次のアクションを行うことができます。

  • CloudTrail イベントで UserAuthentication イベントの有無を確認し、90 日間サインインしていないユーザーを特定する。
  • 過去 90 日間に UserAuthentication イベントが発生しなかったユーザーを非アクティブとしてマークする。
  • AWS IAM Identity Center アクセスポータルにサインインしていない非アクティブなユーザーおよび新規ユーザーを削除キューに追加する。
  • 非アクティブな各ユーザーに関する AWS アカウントまたはアプリケーションの割り当てを確認する。
  • 割り当ての削除およびユーザーの削除。

重要:

  • 外部 ID プロバイダーを ID ソースとして使用する場合は、その外部 ID プロバイダーレベルで、AWS Identity Center SAML アプリケーションから権限のないユーザーを削除する必要があります。
  • Active Directory を ID ソースとして使用する場合は、同期スコープから、ターゲットユーザーおよび関連するグループのメンバーシップを除外する必要があります。

権限のないユーザーとターゲットユーザーを除外した後は、そのユーザーを再度プロビジョニングすることはできません。

Lambda 関数を作成し、実行ロールをアタッチする

Lambda コンソールを使用して Lambda 関数を作成します。[ランタイム]Python 3.13 を選択します。組み込みコードエディタに次の Python コードを入力します。

import boto3
import json
from datetime import datetime, timedelta
from botocore.exceptions import ClientError

# Initialize AWS clients
sso_admin_client = boto3.client('sso-admin')
identitystore_client = boto3.client('identitystore')
cloudtrail_client = boto3.client('cloudtrail')
org_client = boto3.client('organizations')

# Define the time threshold (90 days ago)
THRESHOLD_DATE = datetime.utcnow() - timedelta(days=90)

def get_identity_center_info():
    """Fetch the Identity Center directory ID and Instance ARN."""
    response = sso_admin_client.list_instances()
    if not response['Instances']:
        raise Exception("No Identity Center instance found.")
    instance = response['Instances'][0]
    return instance['InstanceArn'], instance['IdentityStoreId']

def list_users(identity_store_id):
    """List all users in AWS Identity Center."""
    users = []
    paginator = identitystore_client.get_paginator('list_users')
    for page in paginator.paginate(IdentityStoreId=identity_store_id):
        users.extend(page['Users'])
    return users

def check_user_authentication(username):
    """
    Check if a user has UserAuthentication logged in CloudTrail in the last 90 days.
    Returns True if the user has authenticated, otherwise False.
    """
    if not username:
        print("Warning: Username is empty. Skipping authentication check.")
        return False

    try:
        paginator = cloudtrail_client.get_paginator('lookup_events')
        for page in paginator.paginate(
            LookupAttributes=[
                {'AttributeKey': 'EventName', 'AttributeValue': 'UserAuthentication'}
            ],
            StartTime=THRESHOLD_DATE,
            EndTime=datetime.utcnow()
        ):
            for event in page['Events']:
                # Parse the CloudTrail event
                event_detail = json.loads(event['CloudTrailEvent'])
                username_in_event = event_detail.get('additionalEventData', {}).get('UserName')

                # Match the username to confirm the event belongs to the user
                if username_in_event == username:
                    return True
    except Exception as e:
        print(f"Error checking authentication for user '{username}': {e}")
    return False

def find_user_account_assignments(instance_arn, principal_id):
    """Find all account assignments for a specific user."""
    user_assignments = []

    # Get all accounts
    accounts = []
    try:
        paginator = org_client.get_paginator('list_accounts')
        for page in paginator.paginate():
            accounts.extend([account['Id'] for account in page['Accounts']])
    except ClientError as e:
        print(f"Error listing accounts: {e}")
        return user_assignments

    # Get all permission sets
    permission_sets = []
    try:
        paginator = sso_admin_client.get_paginator('list_permission_sets')
        for page in paginator.paginate(InstanceArn=instance_arn):
            permission_sets.extend(page['PermissionSets'])
    except ClientError as e:
        print(f"Error listing permission sets: {e}")
        return user_assignments

    # Check each account and permission set combination for the user
    for account_id in accounts:
        for permission_set_arn in permission_sets:
            try:
                paginator = sso_admin_client.get_paginator('list_account_assignments')
                for page in paginator.paginate(
                    InstanceArn=instance_arn,
                    AccountId=account_id,
                    PermissionSetArn=permission_set_arn
                ):
                    # Filter assignments for the specific user
                    for assignment in page['AccountAssignments']:
                        if assignment['PrincipalId'] == principal_id and assignment['PrincipalType'] == 'USER':
                            user_assignments.append({
                                'AccountId': account_id,
                                'PermissionSetArn': permission_set_arn,
                                'PrincipalId': principal_id
                            })
            except ClientError as e:
                print(f"Error listing assignments for account {account_id}, permission set {permission_set_arn}: {e}")
                continue

    return user_assignments

def remove_user_assignments(instance_arn, assignments):
    """Remove account assignments for a specific user."""
    success = True
    for assignment in assignments:
        try:
            print(f"Removing assignment: Account={assignment['AccountId']}, PermissionSet={assignment['PermissionSetArn']}")
            sso_admin_client.delete_account_assignment(
                InstanceArn=instance_arn,
                TargetId=assignment['AccountId'],
                TargetType='AWS_ACCOUNT',
                PermissionSetArn=assignment['PermissionSetArn'],
                PrincipalType='USER',
                PrincipalId=assignment['PrincipalId']
            )
        except ClientError as e:
            print(f"Error removing assignment: {e}")
            success = False

    return success

def delete_user(identity_store_id, user_id):
    """Delete a user from AWS Identity Center."""
    try:
        identitystore_client.delete_user(
            IdentityStoreId=identity_store_id,
            UserId=user_id
        )
        print(f"Deleted user with ID: {user_id}")
        return True
    except ClientError as e:
        print(f"Error deleting user with ID {user_id}: {e}")
        return False

def lambda_handler(event, context):
    """AWS Lambda entry point."""
    try:
        # Step 1: Get Identity Center directory ID and Instance ARN
        instance_arn, identity_store_id = get_identity_center_info()
        print(f"Found Identity Center instance: {instance_arn}")

        # Step 2: List all users
        users = list_users(identity_store_id)
        print(f"Found {len(users)} users in Identity Center")

        inactive_users = 0
        deleted_users = 0

        for user in users:
            user_name = user.get('UserName')
            user_id = user.get('UserId')

            # Step 3: Check if the user has authenticated recently
            if check_user_authentication(user_name):
                print(f"User '{user_name}' (ID: {user_id}) has authenticated recently. Skipping.")
                continue

            inactive_users += 1
            print(f"\nProcessing inactive user: '{user_name}' (ID: {user_id})")

            # Step 4: Find all account assignments for this user
            user_assignments = find_user_account_assignments(instance_arn, user_id)

            if user_assignments:
                print(f"Found {len(user_assignments)} assignments for user '{user_name}'")

                # Step 5: Remove the user's assignments
                if remove_user_assignments(instance_arn, user_assignments):
                    print(f"Successfully removed all assignments for user '{user_name}'")
                else:
                    print(f"Failed to remove some assignments for user '{user_name}'. Skipping deletion.")
                    continue
            else:
                print(f"No assignments found for user '{user_name}'")

            # Step 6: Delete the user
            if delete_user(identity_store_id, user_id):
                deleted_users += 1

        print(f"\nSummary: Found {inactive_users} inactive users, successfully deleted {deleted_users} users")

    except Exception as e:
        print(f"Error in main execution: {e}")

Lambda 関数の作成時に、Lamba は最小特権を持つ実行ロールを作成します。関数を更新し、作成した実行ロールを使用する設定を行います。

EventBridge がスケジュールするルールを作成する

次の手順を実行します。

  1. Amazon EventBridge コンソールを開きます。
  2. ナビゲーションペインで [ルール] を選択し、[ルールを作成] を選択します。
  3. 名前と説明を入力します。
    注: ルールには、同じ AWS リージョン内の同じイベントバスに存在する別のルールと同じ名前を付けることはできません。
  4. [イベントバス][AWS のデフォルトイベントバス] を選択します。
  5. [ルールタイプ][スケジュール] を選択します。
  6. [次へ] を選択します。
  7. [スケジュールパターン][定期的なスケジュール] を選択します。
  8. [スケジュールの種類][cron ベースのスケジュール] を選択します。
  9. [cron 式]cron(0 0 1 * ? *) を指定し、月ごとに実行させます。
    注: cron の値に関する詳細については、「cron 式」を参照してください。
  10. [次へ] を選択します。
  11. AWS Lambda をターゲットとして選択します。
  12. 目的の Lambda 関数を選択します。
  13. [次へ] を選択します。
  14. 確認後、[ルールを作成] を選択します。

EventBridge がスケジュールするルールをテストする

スケジュールされたルールを作成した後、そのルールをテストして自動化が機能することを確認します。現在の時刻から数分後の時刻に cron 式を設定します。

たとえば、午前 11:57 にテストを開始する場合は、式を cron(59 11 * * ? *) に設定し、午前 11:59 に開始させます。

自動化が機能することを確認した後、ルールの cron 式を本番のスケジュールに合わせて変更します。

関連情報

セキュリティ、ID、コンプライアンスのベストプラクティス

Security Hub CSPM の標準リファレンス

ID ソースを管理する

未使用のユーザー、ロール、権限、ポリシー、認証情報を定期的に確認して削除する

AWS公式更新しました 7ヶ月前
コメントはありません

関連するコンテンツ