How to disable Amazon Redshift user after X failed login attempts
If you have security requirements that need you to lock Amazon Redshift users after a configurable X number of failed login attempts, you can use the solution provided in this article.
This solution leverages Amazon Redshift's logging capabilities to Amazon CloudWatch Logs to monitor and respond to authentication failures. When an authentication failure occurs, the logs are forwarded to CloudWatch Logs, and a subscription filter triggers a Lambda function to take action. The Lambda function updates a Redshift table to track the authentication failure history for each user, and if the number of failed login attempts exceeds a configurable threshold, it automatically disables the user's account. This automated response helps prevent further unauthorized access attempts and protects the integrity of the Amazon Redshift data warehouse. Below is the architecture diagram
By implementing this solution, organizations can enhance the overall security of their Amazon Redshift environment, proactively detect and respond to credential compromise attempts, and maintain a comprehensive audit trail of authentication-related events. Below are the steps to implement this solution
Enable audit logging to Cloudwatch in Amazon Redshift
-
Sign in to the AWS Management Console and open the Amazon Redshift console
-
On the navigation menu, choose Clusters, then choose the cluster that you want to update.
-
Choose the Properties tab. On the Database configurations panel, choose Edit, then Edit audit logging.
-
On the Edit audit logging page, choose Turn on and select CloudWatch.
-
Choose all the logs to export.
-
To save your choices, choose Save changes.
Create an IAM Policy and IAM Role that AWS Lambda can assume
Create an IAM policy with permissions for Amazon Redshift
- Navigate to IAM Policies page
- Choose Create policy button
- Choose JSON in Specify permissions screen to open Policy Editor
- Paste the below policy in the Policy Editor and choose Next
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"redshift-data:*",
"redshift:GetClusterCredentialsWithIAM",
"redshift:GetClusterCredentials",
"redshift-serverless:GetCredentials"
],
"Resource": "*"
}
]
}
- In the Review and create page, for Policy name enter a descriptive name like redshift-lambda-authentication-failure-policy and enter a Description to describe the policy
- Choose Create Policy button
Create an IAM role that AWS Lambda can assume and attach the policy created in previous step to it
- Navigate to IAM Roles page
- Choose Create role button
- For Trusted entity type choose AWS Service. For Use case choose Lambda and choose Next
- In Add Permissions page, choose the policy you created in previous step and choose Next
- In the Name, review, and create page, For Role name, enter a descriptive name like redshift-lambda-authentication-failure-role
- For *Description *enter a text that describes the role eg: Allows Lambda functions to call Amazon Redshift on your behalf
- Choose Create role button
Create table in Amazon Redshift that tracks authentication failures Create the following table in you Amazon Redshift data warehouse
CREATE TABLE failed_login_attempts (
username VARCHAR(100) NOT NULL, -- Username of the user who failed to log in
consecutive_failures INT NOT NULL, -- Number of consecutive login failures
last_failure_time TIMESTAMP NOT NULL, -- Timestamp of the last login failure
PRIMARY KEY (username) -- Assuming username uniquely identifies a user's login failures
);
Create a Lambda function that will be executed in the event of authentication failure
- Navigate to AWS Lambda console page
- Choose Create Function
- In the Create Function page, for Function name enter a descriptive name for Eg:RedshiftAuthenticationFailuresCompliance
- For Runtime choose the latest Python version eg: Python3.12 (at the time of this writing)
- Expand Change default execution role section and choose the IAM previously created
- Choose Create function
- Use the below code in Lambda and deploy changes. This code has NO_ATTEMPTS set to 10, which means that a user will be locked after 10 login attempts. You can change it as per your requirement
import time
import boto3
import base64
import gzip
import json
from io import BytesIO
from datetime import datetime
from dateutil import parser
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize the Redshift Data API client
redshift_client = boto3.client('redshift-data')
# Parameters to your Redshift Cluster
CLUSTER_ID = 'redshift-cluster-1'
DATABASE_NAME = 'dev'
DB_USER = 'awsuser'
DISABLED_PASSWORD = 'DisabledPassword1'
DISABLED_VALID_UNTIL_DATE = '1990-01-01'
NO_ATTEMPTS = 10
def lambda_handler(event, context):
logger.info(f"Raw Event is: {event}")
# Step 1: Decode the base64 encoded data
compressed_data = event['awslogs']['data']
decoded_data = base64.b64decode(compressed_data)
# Step 2: Decompress the gzip data
with gzip.GzipFile(fileobj=BytesIO(decoded_data)) as gzipfile:
decompressed_payload = gzipfile.read()
# Step 3: Parse the decompressed data as JSON
payload = json.loads(decompressed_payload)
logger.info(f"Authentication failure event raw record is: {payload}. Starting processing")
for log_event in payload['logEvents']:
log_message = log_event['message']
process_log_message(log_message)
def process_log_message(log_message):
# Assuming the log message is pipe-delimited
columns = log_message.split('|')
# Ensure the message is an authentication failure
failure_time_str = columns[1].strip() # 2nd field is the timestamp of failure
username = columns[6].strip() # 6th field is the username
logger.info(f"Data extracted from raw record. Username is: {username}. Authenication failure timestamp is:{failure_time_str}")
# Convert the timestamp string to a datetime object
failure_time = datetime.strptime(failure_time_str, "%a, %d %b %Y %H:%M:%S:%f")
logger.info(f"Extracted data is standardized. Username is: {username}. Authenication failure timestamp is:{failure_time}")
# Handle the authentication failure for the user
handle_failure(username, failure_time)
def handle_failure(username, failure_time):
# Step 1: Check if there has been a successful login
logger.info(f"Checking if there is a successful login for Username: {username}")
if has_successful_login(username, failure_time):
# Reset the failure count if a successful login was detected
logger.info(f"There is a successful login since last authentication failure. Resetting failure count for username: {username}")
reset_failure_count(username)
else:
# Step 2: Get the current failure count from Redshift
logger.info(f"There is no successful login since last authentication failure. Retrieving total number of authentication failures")
current_failures = get_consecutive_failures(username)
logger.info(f"Total number of authentication failures: {current_failures}")
# Step 3: Increment the failure count
new_failures = current_failures + 1
# Step 4: Insert or update the failure count in Redshift
if current_failures > 0:
logger.info(f"Incrementing total failures by performing an update")
update_failure_count(username, new_failures, failure_time)
else:
logger.info(f"Inserting a row for failure")
insert_failure_record(username, new_failures, failure_time)
# Step 5: Lock the user if consecutive failures reach threshold
if new_failures >= NO_ATTEMPTS:
logger.info(f"Total consecutive failures is greater than or equal to {NO_ATTEMPTS}. Locking user")
lock_user(username)
logger.info(f"Resetting the failure row for user: {username}")
reset_failure_count(username)
def has_successful_login(username, last_failure_time):
# Query Redshift's STL_CONNECTION_LOG table to check for successful logins
query = f"""
SELECT COUNT(*)
FROM stl_connection_log
WHERE username = '{username}'
AND event = 'authenticated'
AND recordtime > '{last_failure_time}';
"""
response = execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
records = response['sql_result']
if records and records[0][0]['longValue'] > 0:
return True
return False
def get_consecutive_failures(username):
query = f"SELECT consecutive_failures FROM failed_login_attempts WHERE username = '{username}';"
response = execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
records = response['sql_result']
if records:
return int(records[0][0]['longValue'])
return 0
def insert_failure_record(username, failures, last_failure_time):
query = f"""
INSERT INTO failed_login_attempts (username, consecutive_failures, last_failure_time)
VALUES ('{username}', {failures}, '{last_failure_time}');
"""
execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
def update_failure_count(username, failures, last_failure_time):
query = f"""
UPDATE failed_login_attempts
SET consecutive_failures = {failures}, last_failure_time = '{last_failure_time}'
WHERE username = '{username}';
"""
execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
def reset_failure_count(username):
query = f"DELETE FROM failed_login_attempts WHERE username = '{username}';"
execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
def lock_user(username):
# Lock the user in Redshift by disabling their access
query = f"ALTER USER {username} PASSWORD '{DISABLED_PASSWORD}' VALID UNTIL '{DISABLED_VALID_UNTIL_DATE}';"
execute_sql_data_api(redshift_client, DATABASE_NAME, query, DB_USER, CLUSTER_ID)
def execute_sql_data_api(redshift_data_api_client, redshift_database_name, query, redshift_user, redshift_cluster_id):
logger.info(f"Submitted Query using data API: {query}")
res = None
try:
res = redshift_data_api_client.execute_statement(Database=redshift_database_name, DbUser=redshift_user, Sql=query, ClusterIdentifier=redshift_cluster_id)
except Exception as e:
logger.info(f"An error occured: {e}")
raise Exception('Error Initiating query:' + e)
logger.info(f"Response from data API: {res}")
query_id = res['Id']
logger.info(f"Statement ID from response is: {query_id}")
done = False
result = {}
# Wait until query is finished or max cycles limit has been reached.
while not done:
time.sleep(1)
desc = redshift_data_api_client.describe_statement(Id=query_id)
logger.info(f"Checking the status for query id: {query_id}")
query_status = desc['Status']
logger.info(f"Query status is: {query_status}")
if query_status == "FAILED":
raise Exception('SQL query failed:' + query_id + ": " + desc['Error'])
elif query_status == "FINISHED":
logger.info(f"Statement response: {desc}")
done = True
result['status'] = query_status
# print result if there is a result (typically from Select statement)
if desc['HasResultSet']:
logger.info(f"Query has result rows. Retrieving them")
response = redshift_data_api_client.get_statement_result(Id=query_id)
logger.info(f"Printing result --> {response['Records']}")
result['sql_result'] = response['Records']
return result
Create a Cloudwatch subscription filter that executed the lambda function in the event of authentication failure
- Navigate to Cloudwatch Log Groups console page
- Enter your Amazon Redshift cluster name in the search bar to see the log groups associated to that cluster as shown in the following screenshot. Choose the log group that ends in connectionlog
- In the page that opens, choose Actions, then Subscription filters and Create Lambda subscription filter as shown in the following screenshot
- In the Create Lambda subscription filter page a. For Lambda function choose the lambda function you created in the previous step b. Chose Other for Log format c. Enter "authentication failure" for Subscription filter pattern d. For Subscription filter name, enter a descriptive name eg: Redshift-Cluster-1 Authentication Failures Filter e. Choose Start Streaming
Relevant content
- Accepted Answerasked a year agolg...
- asked a year agolg...
- AWS OFFICIALUpdated a year ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 10 months ago