Global outage event
If you’re experiencing issues with your AWS services, then please refer to the AWS Health Dashboard. You can find the overall status of ongoing outages, the health of AWS services, and the latest updates from AWS engineers.
Extract audit logs from RDS PostgreSQL or Aurora PostgreSQL log file, exported to Cloudwatch log, in CSV file and upload them to Amazon S3 bucket.
This article will help you to extract audit information from RDS PostgreSQL/Aurora PostgreSQL log file exported to Cloudwatch and upload that data to Amazon S3 bucket in CSV format. Keeping daily audit information in one csv file for each day.
Introduction
As of per current process audit information for RDS PostgreSQL and Aurora PostgreSQL are logged in postgreSQL log file and that can be exported to cloudwatch logs. Reading audit information from log file is complex as logs file contain huge amount of logging information for database.
This document provides a comprehensive guide on exporting RDS PostgreSQL or Aurora PostgreSQL audit logs from CloudWatch Logs to Amazon S3 in CSV format. The solution automatically processes audit logs in near real-time, making them easily accessible and queryable for compliance, security analysis, and auditing purposes. Using this approach, audit data is extracted to CSV file and append all audit data in one file on daily basis.
Benefits of Exporting to S3
Exporting audit logs to S3 offers several advantages:
- Cost-effective long-term storage
- Structured format (CSV) for easy analysis
- Integration with analytics services (Athena, Redshift, etc.)
- Immutable storage for compliance requirements
- Lifecycle policies for archiving older logs
Architecture
- RDS PostgreSQL/Aurora PostgreSQL generates audit logs and that are sent to CloudWatch Logs if export to cloudwach is enabled
- The Subscription Filter identifies audit log entries and invokes the Lambda function
- Lambda parses the logs, extracts structured data, and converts to CSV format
- Lambda appends the new log entries to a daily CSV file in S3
- Files are organized in S3 using a YYYY/MM/DD folder structure
Prerequisite
-
Make sure you have auditing enabled in your RDS PostgreSQL or Aurora PostgreSQL instance/cluster and extension is already created in database. If not already enable, then kindly enable auditing first for RDS PostgreSQL or Aurora PostgreSQL
-
Make sure postgresql.log export to cloudwatch is enabled. If not please enable exporting postgresql log to cloudwatch.
-
CloudWatch Logs group receiving PostgreSQL logs
-
S3 bucket for storing processed logs
-
IAM permissions to create Lambda functions and CloudWatch Logs subscription filters
Step 1: Create IAM Role for Lambda
Create an IAM role with permissions to:
- Write logs to CloudWatch Logs
- Read from CloudWatch Logs
- Read and write objects in the target S3 bucket
# Create IAM role with trust policy aws iam create-role \ --role-name PostgreSQLAuditLogProcessorRole \ --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }' # Attach policy to IAM role aws iam put-role-policy \ --role-name PostgreSQLAuditLogProcessorRole \ --policy-name LambdaPolicy \ --policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:*:*:*" }, { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::<YOUR_BUCKET_NAME>/*", "arn:aws:s3:::<YOUR_BUCKET_NAME>" ] } ] }'
Step 2: Create Lambda Function
Create a Lambda function to process the audit logs:
- Create a Python file named
postgresql_audit_log_processor.pywith the following code:
import base64, gzip, json, io, csv, re, traceback import boto3 from datetime import datetime from botocore.exceptions import ClientError # Initialize S3 client and constants s3 = boto3.client('s3') TARGET_BUCKET = '<YOUR_BUCKET_NAME>' TARGET_PREFIX = 'audit_logs/' CSV_HEADERS = ['timestamp', 'log_level', 'user', 'database', 'action', 'object_type', 'object_name', 'query'] def parse_postgresql_log(log_event): """Parse PostgreSQL log entry into structured data""" try: message = log_event.get('message', '') if 'AUDIT:' not in message: return None # Extract timestamp and initialize log data timestamp = datetime.fromtimestamp(log_event.get('timestamp', 0) / 1000.0).isoformat() log_data = { 'timestamp': timestamp, 'log_level': 'INFO', 'user': 'unknown', 'database': 'unknown', 'action': 'unknown', 'object_type': 'unknown', 'object_name': 'unknown', 'query': '' } # Extract username and database user_db_match = re.search(r'(\w+)@(\w+):', message) if user_db_match: log_data['user'] = user_db_match.group(1) log_data['database'] = user_db_match.group(2) # Extract audit details audit_match = re.search(r'AUDIT:\s+([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),(.*?)(?:,<none>|$)', message) if audit_match: log_data['action'] = audit_match.group(4) log_data['object_type'] = audit_match.group(6) log_data['object_name'] = audit_match.group(7) # Clean up query query = audit_match.group(8).strip('"').replace('\\"', '"').replace('\\n', ' ').replace('\\t', ' ') log_data['query'] = query # Alternative parsing for TABLE operations if not log_data['query'] and ',TABLE,' in message: parts = message.split(',TABLE,') if len(parts) > 1: table_and_query = parts[1].split(',', 1) if len(table_and_query) > 1: log_data['object_type'] = 'TABLE' log_data['object_name'] = table_and_query[0] query_part = table_and_query[1].split(',<none>')[0] if ',<none>' in table_and_query[1] else table_and_query[1] log_data['query'] = query_part.strip('"') # Determine action from query if not already set if log_data['action'] in ['unknown', 'WRITE']: for action in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP']: if action in message: log_data['action'] = action break return log_data except Exception as e: print(f"Error parsing log: {str(e)}") return None def file_exists(bucket, key): """Check if a file exists in S3""" try: s3.head_object(Bucket=bucket, Key=key) return True except ClientError as e: return False if e.response['Error']['Code'] == '404' else None def append_to_csv_file(bucket, key, csv_rows): """Append rows to an existing CSV file or create a new one if it doesn't exist""" try: # Create CSV content for new rows csv_buffer = io.StringIO() writer = csv.DictWriter(csv_buffer, fieldnames=CSV_HEADERS) # Check if file exists exists = file_exists(bucket, key) if not exists: writer.writeheader() # Write rows for row in csv_rows: writer.writerow(row) # If file exists, append to it if exists: try: # Get existing content response = s3.get_object(Bucket=bucket, Key=key) existing_content = response['Body'].read().decode('utf-8') # Prepare new content (skip header) new_content = csv_buffer.getvalue() if '\n' in new_content and new_content.startswith(CSV_HEADERS[0]): new_content = new_content.split('\n', 1)[1] if '\n' in new_content else '' # Ensure newline at end of existing content if existing_content and not existing_content.endswith('\n'): existing_content += '\n' # Upload combined content s3.put_object(Bucket=bucket, Key=key, Body=existing_content + new_content, ContentType='text/csv') except Exception as e: # Fall back to creating new file on error print(f"Error appending, creating new file: {str(e)}") s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue(), ContentType='text/csv') else: # Create new file s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue(), ContentType='text/csv') except Exception as e: print(f"Error in append_to_csv_file: {str(e)}") raise def lambda_handler(event, context): try: # Handle CloudWatch Logs events if 'awslogs' in event: # Decode and decompress logs compressed_payload = base64.b64decode(event['awslogs']['data']) log_data = json.loads(gzip.decompress(compressed_payload)) # Process log events csv_rows = [] for log_event in log_data['logEvents']: parsed_log = parse_postgresql_log(log_event) if parsed_log: csv_rows.append(parsed_log) # If we have logs to process if csv_rows: date_prefix = datetime.now().strftime('%Y/%m/%d/') s3_key = f"{TARGET_PREFIX}{date_prefix}audit_logs_daily.csv" append_to_csv_file(TARGET_BUCKET, s3_key, csv_rows) return {'statusCode': 200, 'body': f"Processed {len(csv_rows)} audit log entries"} else: return {'statusCode': 200, 'body': "No audit logs to process"} return {'statusCode': 400, 'body': "Invalid event format"} except Exception as e: print(f"Error: {str(e)}") print(traceback.format_exc()) raise
- Create a deployment package:
zip -j postgresql_audit_log_processor.zip postgresql_audit_log_processor.py
- Deploy the Lambda function:
aws lambda create-function \ --function-name PostgreSQLAuditLogProcessor \ --runtime python3.9 \ --role arn:aws:iam::<YOUR_ACCOUNT_ID>:role/PostgreSQLAuditLogProcessorRole \ --handler postgresql_audit_log_processor.lambda_handler \ --description "Lambda function to process PostgreSQL audit logs and append to CSV in S3" \ --timeout 60 \ --memory-size 128 \ --zip-file fileb://postgresql_audit_log_processor.zip \ --region <YOUR_REGION>
Step 3: Add CloudWatch Logs Permission
Grant CloudWatch Logs permission to invoke the Lambda function:
aws lambda add-permission \ --function-name PostgreSQLAuditLogProcessor \ --statement-id CloudWatchLogsToLambda \ --action lambda:InvokeFunction \ --principal logs.<YOUR_REGION>.amazonaws.com \ --source-arn "arn:aws:logs:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:log-group:/aws/rds/cluster/<YOUR_CLUSTER_NAME>/postgresql:*" \ --region <YOUR_REGION>
Step 4: Create CloudWatch Logs Subscription Filter
Create a subscription filter to trigger the Lambda function when audit logs are detected:
aws logs put-subscription-filter \ --filter-name PostgreSQLAuditLogFilter \ --filter-pattern "AUDIT" \ --log-group-name "/aws/rds/cluster/<YOUR_CLUSTER_NAME>/postgresql" \ --destination-arn "arn:aws:lambda:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:function:PostgreSQLAuditLogProcessor" \ --region <YOUR_REGION>
Step 5: Test the Solution
- Generate actual audit logs by connecting to your PostgreSQL database and executing some SQL commands:
-- Create a test table CREATE TABLE test_audit_table (id serial PRIMARY KEY, name text, created_at timestamp DEFAULT current_timestamp); -- Insert some data INSERT INTO test_audit_table (name) VALUES ('Test Record 1'); INSERT INTO test_audit_table (name) VALUES ('Test Record 2'); -- Update data UPDATE test_audit_table SET name = 'Updated Record' WHERE id = 1; -- Delete data DELETE FROM test_audit_table WHERE id = 2;
- Check if the audit logs have been processed and exported to S3:
# List files in S3 bucket aws s3 ls s3://<YOUR_BUCKET_NAME>/audit_logs/ --recursive --region <YOUR_REGION> # Download and view CSV file aws s3 cp s3://<YOUR_BUCKET_NAME>/audit_logs/$(date +%Y/%m/%d)/audit_logs_daily.csv audit_logs_daily.csv --region <YOUR_REGION> cat audit_logs_daily.csv
- Output should like below in csv file :
timestamp log_level user database action object_type object_name query
2025-07-06T05:55:03 INFO postgres postgres INSERT TABLE public.test1 insert into test1 values (6);
2025-07-06T05:55:08 INFO postgres postgres INSERT TABLE public.test1 insert into test1 values (7);
2025-07-06T05:55:12 INFO postgres postgres UPDATE TABLE public.test1 update test1 set id=10 where id<5;
2025-07-06T06:00:46 INFO postgres postgres INSERT TABLE public.test1 insert into test1 values (99);
2025-07-06T06:00:50 INFO postgres postgres INSERT TABLE public.test1 insert into test1 values (88);
2025-07-06T06:00:54 INFO postgres postgres INSERT TABLE public.test1 insert into test1 values (77);
CSV Format and Data Structure
The exported CSV files include the following columns:
timestamp: ISO 8601 timestamp of the audit log entrylog_level: Log level (INFO, WARNING, ERROR, etc.)user: Database user who performed the actiondatabase: Database where the action was performedaction: Type of action (SELECT, INSERT, UPDATE, DELETE, etc.)object_type: Type of object affected (TABLE, INDEX, etc.)object_name: Name of the object affectedquery: Full SQL query that was executed
Troubleshooting
Common Issues and Solutions
-
Lambda function not being triggered:
- Verify the CloudWatch Logs subscription filter is correctly configured
- Check that the log group name matches your Aurora PostgreSQL cluster's log group
- Ensure the filter pattern "AUDIT" is appropriate for your audit logs
-
Lambda function failing:
- Check Lambda function logs in CloudWatch Logs
- Verify IAM permissions for accessing S3 and CloudWatch Logs
- Ensure the S3 bucket exists and is accessible
-
No audit logs in CloudWatch:
- Verify pgAudit extension is enabled in your PostgreSQL cluster
- Check that audit logging parameters are correctly configured
- Ensure log export to CloudWatch Logs is enabled for your RDS/Aurora cluster
-
CSV files not appearing in S3:
- Check Lambda function execution logs for errors
- Verify S3 bucket name and prefix in the Lambda function code
- Ensure Lambda has proper permissions to write to the S3 bucket
Debugging Tips
- Enable detailed CloudWatch Logs for the Lambda function
- Test the Lambda function with a sample event
- Check S3 bucket permissions and policies
- Verify the format of audit logs in CloudWatch Logs
Cost Optimization
- Lambda Optimization: Adjust memory allocation based on processing needs
- S3 Storage Classes: Use appropriate storage classes for different retention periods
- CloudWatch Logs Retention: Set appropriate retention periods for CloudWatch Logs
- Filtering: Adjust the subscription filter to only process relevant audit logs
- Language
- English
Relevant content
- asked 15 days ago
- Accepted Answerasked 3 years ago
AWS OFFICIALUpdated 2 months ago